HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/__pycache__/test_threading.cpython-37.pyc
B

��g۰�@sdZddlZddlmZmZmZmZddlmZm	Z	ddl
Z
ddlZddlZddl
Z
ddlZddlZddlZddlZddlZddlZddlmZddlmZdZGdd	�d	e�ZGd
d�de
j�ZGdd
�d
ej�ZGdd�de�ZGdd�de�ZGdd�de�ZGdd�de�Z Gdd�de�Z!Gdd�dej"�Z"Gdd�dej#�Z$e�%e
j&dkd�Gdd�dej#��Z'Gdd �d ej(�Z(Gd!d"�d"ej#�Z)Gd#d$�d$ej*�Z*Gd%d&�d&ej+�Z+Gd'd(�d(ej,�Z,Gd)d*�d*ej-�Z-Gd+d,�d,ej�Z.Gd-d.�d.ej�Z/e0d/k�re�1�dS)0z!
Tests for the threading module.
�N)�verbose�
import_module�cpython_only�requires_type_collecting)�assert_python_ok�assert_python_failure)�
lock_tests)�support)Znetbsd5zhp-ux11c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�CountercCs
d|_dS)Nr)�value)�self�r
�//usr/local/lib/python3.7/test/test_threading.py�__init__"szCounter.__init__cCs|jd7_dS)N�)r)rr
r
r�inc$szCounter.inccCs|jd8_dS)Nr)r)rr
r
r�dec&szCounter.deccCs|jS)N)r)rr
r
r�get(szCounter.getN)�__name__�
__module__�__qualname__rrrrr
r
r
rr
!sr
c@seZdZdd�Zdd�ZdS)�
TestThreadcCs,tjj||d�||_||_||_||_dS)N)�name)�	threading�Threadr�testcase�sema�mutex�nrunning)rrrrrrr
r
rr,s
zTestThread.__init__c
Cs�t��d}tr&td|j|df�|j��|j�8|j��trTt|j��d�|j	�
|j��d�WdQRXt�|�tr�td|jd�|j�@|j�
�|j	�|j��d�tr�td	|j|j��f�WdQRXWdQRXdS)
Ng��@ztask %s will run for %.1f usecg��.Aztasks are running�Ztask�donerz$%s is finished. %d tasks are running)�randomr�printrrrrrrrZassertLessEqual�time�sleeprZassertGreaterEqual)rZdelayr
r
r�run3s&


zTestThread.runN)rrrrr%r
r
r
rr+src@seZdZdd�Zdd�ZdS)�BaseTestCasecCstj��|_dS)N)�testr	Zthreading_setup�_threads)rr
r
r�setUpMszBaseTestCase.setUpcCstjj|j�tj��dS)N)r'r	Zthreading_cleanupr(Z
reap_children)rr
r
r�tearDownPszBaseTestCase.tearDownN)rrrr)r*r
r
r
rr&Lsr&c@sneZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Ze�eed�d �d!d"��Ze�eed�d#�d$d%��Zd&d'�Ze�eed�d(�e�eed)�d*�d+d,���Ze�ejekd-�e�eed�d(�e�eed)�d*�d.d/����Zed0d1��Z d2d3�Z!d4d5�Z"d6d7�Z#d8d9�Z$e%d:d;��Z&e%d<d=��Z'd>S)?�ThreadTestscCs�d}tjdd�}t��}t�}g}xRt|�D]F}td|||||�}|�|�|�|j�|�	t
|�d�|��q,Wtr�t
d�xL|D]D}|��|�|���|�|jd�|�|j�|�	t
|�d�q�Wtr�t
d	�|�|��d�dS)
N�
r)rz<thread %d>z^<TestThread\(.*, initial\)>$z!waiting for all tasks to completerz#^<TestThread\(.*, stopped -?\d+\)>$zall tasks done)r�BoundedSemaphore�RLockr
�ranger�append�assertIsNone�ident�assertRegex�repr�startrr"�join�assertFalse�is_alive�assertNotEqual�assertIsNotNone�assertEqualr)rZNUMTASKSrrZ
numrunning�threads�i�tr
r
r�test_various_opsYs,

zThreadTests.test_various_opsc	sr|�t��j���fdd�}t���g�t���*t�|d�}��	�|�
�d|�WdQRXtj�d=dS)Ncs��t��j����dS)N)r0r�
currentThreadr2�setr
)r r2r
r�f{sz9ThreadTests.test_ident_of_no_threading_threads.<locals>.fr
r)r:rr@r2�Eventr	�wait_threads_exit�_thread�start_new_thread�waitr;�_active)rrB�tidr
)r r2r�"test_ident_of_no_threading_threadsxs
z.ThreadTests.test_ident_of_no_threading_threadscCsRtrtd�yt�d�Wn tjk
r:t�d��YnX|��t�d�dS)Nz!with 256 KiB thread stack size...iz4platform does not support changing thread stack sizer)	rr"r�
stack_sizerE�error�unittest�SkipTestr?)rr
r
r�test_various_ops_small_stack�sz(ThreadTests.test_various_ops_small_stackcCsRtrtd�yt�d�Wn tjk
r:t�d��YnX|��t�d�dS)Nzwith 1 MiB thread stack size...iz4platform does not support changing thread stack sizer)	rr"rrKrErLrMrNr?)rr
r
r�test_various_ops_large_stack�sz(ThreadTests.test_various_ops_large_stackc	Cs�dd�}t��}|��t���t�||f�}|��WdQRX|�|tj�|�	tj|tj
�|�tj|���|�
ttj|�d�tj|=dS)NcSst��|��dS)N)r�current_thread�release)rr
r
rrB�sz*ThreadTests.test_foreign_thread.<locals>.f�_DummyThread)r�Lock�acquirer	rDrErF�assertInrH�assertIsInstancerS�
assertTruer8r3r4)rrBrrIr
r
r�test_foreign_thread�s
zThreadTests.test_foreign_threadc	s�td�}|jj}|j|jf|_Gdd�dt��|���}t��}|�	|t
�|�|d�y|||�}xqfWWn�k
r�YnX|�d�y|�
|d�Wntk
r�YnXt���t���G���fdd�dtj�}|�}d	|_|��tr�td
�t�r
td�|d|�}|�
|d�t�r.td
����}|�|�t�rNtd�|�|j�t�rhtd�||j|�}|�
|d�t�r�td��jdd�|�|j�t�r�td�|j�r�|��dS)N�ctypesc@seZdZdS)z<ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.AsyncExcN)rrrr
r
r
r�AsyncExc�sr[rzAsyncExc not raisedrcseZdZ���fdd�ZdS)z:ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.WorkercsTt��|_d|_yx���t�d�qWWn"�k
rNd|_���YnXdS)NFg�������?T)r�	get_ident�id�finishedrAr#r$)r)r[�worker_saw_exception�worker_startedr
rr%�s
z>ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Worker.runN)rrrr%r
)r[r_r`r
r�Worker�sraTz    started worker threadz     trying nonsensical thread id���z,    waiting for worker thread to get startedz"    verifying worker hasn't exitedz2    attempting to raise asynch exception in workerz5    waiting for worker to say it caught the exceptionr,)�timeoutz    all OK -- joining worker)rZ	pythonapiZPyThreadState_SetAsyncExcZc_ulongZ	py_objectZargtypes�	Exceptionrr\rW�intZ
assertGreaterZfailr;�UnboundLocalErrorrCr�daemonr5rr"rGrXr7r^r]r6)	rrZZ
set_async_excZ	exceptionrI�resultrar>�retr
)r[r_r`r�test_PyThreadState_SetAsyncExc�sd





z*ThreadTests.test_PyThreadState_SetAsyncExccCsXdd�}tj}|t_z6tjdd�d�}|�tj|j�|�|tjkd�Wd|t_XdS)NcWst���dS)N)r�ThreadError)�argsr
r
r�fail_new_threadsz7ThreadTests.test_limbo_cleanup.<locals>.fail_new_threadcSsdS)Nr
r
r
r
r�<lambda>�z0ThreadTests.test_limbo_cleanup.<locals>.<lambda>)�targetz:Failed to cleanup _limbo map on failure of Thread.start().)r�_start_new_threadr�assertRaisesrkr5r7Z_limbo)rrmrqr>r
r
r�test_limbo_cleanups
zThreadTests.test_limbo_cleanupcCs(td�tdd�\}}}|�|d�dS)NrZz-caNif 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            �*)rrr;)r�rc�out�errr
r
r�test_finalize_runnning_threadsz)ThreadTests.test_finalize_runnning_threadcCstdd�dS)Nz-caPif 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print('program blocked; aborting')
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            )r)rr
r
r�test_finalize_with_trace>sz$ThreadTests.test_finalize_with_tracecCs0tdd�\}}}|�|��d�|�|d�dS)Nz-ca�if 1:
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print("Woke up, sleep function is:", sleep)

                threading.Thread(target=child).start()
                raise SystemExit
            s5Woke up, sleep function is: <built-in function sleep>ro)rr;�strip)rrurvrwr
r
r�test_join_nondaemon_on_shutdownWs

z+ThreadTests.test_join_nondaemon_on_shutdownc	Cs�tj}t��}zbx\tdd�D]N}t�|d�tjdd�d�}|��|��|�}|�	||d||f�qWWdt�|�XdS)Nr�dg-C��6*?cSsdS)Nr
r
r
r
rrnsroz7ThreadTests.test_enumerate_after_join.<locals>.<lambda>)rpz&#1703448 triggered after %d trials: %s)
r�	enumerate�sys�getswitchintervalr/�setswitchintervalrr5r6�assertNotIn)r�enum�old_intervalr=r>�lr
r
r�test_enumerate_after_joinksz%ThreadTests.test_enumerate_after_joincCs�Gdd�dt�}|dd�}t�|�}|j��~|j|�dt�|��d�|dd�}t�|�}|j��~|j|�dt�|��d�dS)Nc@seZdZdd�Zdd�ZdS)zDThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunctioncSs.||_tj|j|fd|id�|_|j��dS)N�yet_another)rprl�kwargs)�should_raiserr�_run�threadr5)rr�r
r
rr~s
zMThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction.__init__cSs|jr
t�dS)N)r��
SystemExit)rZ	other_refr�r
r
rr��szIThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction._runN)rrrrr�r
r
r
r�RunSelfFunction}s	r�F)r�z%d references still around)�msgT)�object�weakref�refr�r6r1r~�getrefcount)rr�Z
cyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectr
r
r�test_no_refcycle_through_target|s





z+ThreadTests.test_no_refcycle_through_targetc	Csht��}|��|�d�|��|�d�|�td��|��WdQRXt�	�}|�
�t��dS)NTrzuse is_alive())rrZisDaemonZ	setDaemonZgetNameZsetNameZassertWarnsRegex�PendingDeprecationWarningZisAliverCZisSet�activeCount)rr>�er
r
r�test_old_threading_api�s

z"ThreadTests.test_old_threading_apicCs2t��}|�dt|��d|_|�dt|��dS)NrgT)rrr�r4rgrV)rr>r
r
r�test_repr_daemon�szThreadTests.test_repr_daemoncCsHt��}|�|j�tjdd�}|�|j�tjdd�}|�|j�dS)NF)rgT)rrr7rgrX)rr>r
r
r�test_daemon_param�szThreadTests.test_daemon_param�forkztest needs fork()cCs0d}td|�\}}}|�|d�|�|d�dS)Na�if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        z-cro)rr;)r�code�_rvrwr
r
r�test_dummy_thread_after_fork�sz(ThreadTests.test_dummy_thread_after_forkzneeds os.fork()cCs�t��}|�tj|�tj�d�x�td�D]~}tjdd�d�}|�	�t
��}|dkrpt
�|�
�rhdnd�q,|��t
�|d�\}}|�t
�|��|�dt
�|��q,WdS)	Ng���ư>�cSsdS)Nr
r
r
r
rrn�roz6ThreadTests.test_is_alive_after_fork.<locals>.<lambda>)rpr�r,)r~r�
addCleanupr�r'r	r/rrr5�osr��_exitr8r6�waitpidrX�	WIFEXITEDr;�WEXITSTATUS)rr�r=r>�pid�statusr
r
r�test_is_alive_after_fork�sz$ThreadTests.test_is_alive_after_forkcsht��}��|jd���|jt��j���|jt����fdd�}tj|d�}|��|�	�dS)NZ
MainThreadcs��t��jt��j�dS)N)r9r�main_threadr2rQr
)rr
rrB�sz'ThreadTests.test_main_thread.<locals>.f)rp)
rr�r;rr2rQr\rr5r6)r�mainrB�thr
)rr�test_main_thread�szThreadTests.test_main_threadztest needs os.fork()r�ztest needs os.waitpid()cCs@d}td|�\}}}|���dd�}|�|d�|�|d�dS)Nakif 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        z-c�
�rozMainThread
True
True
)r�decode�replacer;)rr�r�rvrw�datar
r
r�test_main_thread_after_fork�s
z'ThreadTests.test_main_thread_after_forkzdue to known OS bugcCs@d}td|�\}}}|���dd�}|�|d�|�|d�dS)Na�if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        z-cr�r�rozThread-1
True
True
)rr�r�r;)rr�r�rvrwr�r
r
r�/test_main_thread_after_fork_from_nonmain_thread
s
z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadcCsBd}td|�\}}}|��}|�|d�|�|��dgd�dS)Na�if 1:
            import gc, threading

            main_thread = threading.current_thread()
            assert main_thread is threading.main_thread()  # sanity check

            class RefCycle:
                def __init__(self):
                    self.cycle = self

                def __del__(self):
                    print("GC:",
                          threading.current_thread() is main_thread,
                          threading.main_thread() is main_thread,
                          threading.enumerate() == [main_thread])

            RefCycle()
            gc.collect()  # sanity check
            x = RefCycle()
        z-crozGC: True True True�)rr�r;�
splitlines)rr�r�rvrwr�r
r
r� test_main_thread_during_shutdown's
z,ThreadTests.test_main_thread_during_shutdowncCs$d}td|�\}}}|�|d�dS)Na�if 1:
            import os
            import threading
            import time
            import random

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_Finalize() is called.
                random_sleep()
                tls.x = Sleeper()
                random_sleep()

            threading.Thread(target=f).start()
            random_sleep()
        z-cro)rr;)rr�rurvrwr
r
r�test_finalization_shutdownEsz&ThreadTests.test_finalization_shutdowncs�t���t�����������fdd�}tj|d�}|�|jd�|�����|�|�	��|j}|�
|jdd�d����|�|jdd�d�|�|�	��|��|�
|�	��|�|j�|�
�dS)Ncs������t�d�dS)Ng{�G�z�?)rRrUr#r$r
)�finish�startedr
rrBnsz'ThreadTests.test_tstate_lock.<locals>.f)rpr)rcF�)rE�
allocate_lockrUrrZassertIs�_tstate_lockr5rXr8r7rRr1r6)rrBr>�tstate_lockr
)r�r�r�test_tstate_lockhs&zThreadTests.test_tstate_lockcs�t���t�����������fdd�}tj|d�}|�����|�dt|�����d}x(t	d�D]}|t|�kr�Pt
�d�qpW|�|t|��|��dS)Ncs������dS)N)rRrUr
)r�r�r
rrB�sz(ThreadTests.test_repr_stopped.<locals>.f)rpr�Zstoppedi�g{�G�z�?)
rEr�rUrrr5rVr4rRr/r#r$r6)rrBr>ZLOOKING_FORr=r
)r�r�r�test_repr_stopped�s"zThreadTests.test_repr_stoppedcs�x�tdd�D]�}t�|���fdd�t|�D�}x|D]}|��q6Wx|D]}|��qLW�fdd�t|�D�}x|D]}|��qxWx|D]}|��q�W|�t�j�qWdS)Nrr,csg|]}tj�jd��qS))rp)rrrU)�.0r�)�bsr
r�
<listcomp>�sz;ThreadTests.test_BoundedSemaphore_limit.<locals>.<listcomp>csg|]}tj�jd��qS))rp)rrrR)r�r�)r�r
rr��s)r/rr-r5r6rr�
ValueErrorrR)r�limitr<r>r
)r�r�test_BoundedSemaphore_limit�s






z'ThreadTests.test_BoundedSemaphore_limitc	s��fdd��dd����fdd��d�_t��}t���z8t���ddl}|���xtd�D]
}��qbWWdt�|�XdS)	Ncs�S)Nr
)�frame�event�arg)�
noop_tracer
rr��sz9ThreadTests.test_frame_tstate_tracing.<locals>.noop_tracecssx
dVqWdS)N�	generatorr
r
r
r
rr��sz8ThreadTests.test_frame_tstate_tracing.<locals>.generatorcs�jdkr���_t�j�S)N)�gen�nextr
)�callbackr�r
rr��s
z7ThreadTests.test_frame_tstate_tracing.<locals>.callbackrr)r�r~�gettrace�settracer�	_testcapiZcall_in_temporary_c_threadr/)rZ	old_tracer�r'r
)r�r�r�r�test_frame_tstate_tracing�s


z%ThreadTests.test_frame_tstate_tracingc
Cs�x�dD]�}|j|d��lt��}tj|j|d�}|��|j}|sR|�|tj�n|�	|tj�|�
�|��|�	|tj�WdQRXqWdS)N)FT)rg)rprg)ZsubTestrrCrrGr5r�rVZ_shutdown_locksr�rAr6)rrgr�r�r�r
r
r�test_shutdown_locks�s
zThreadTests.test_shutdown_locksN)(rrrr?rJrOrPrYrjrsrxryr{r�r�r�r�r�rM�
skipUnless�hasattrr�r�r�r�r��skipIfr~�platform�platforms_to_skipr�rr�r�r�r�r�rr�r�r
r
r
rr+Us<X!
##(r+c@s�eZdZdd�Zdd�Ze�eed�d�e�	e
jekd�dd	���Z
e�eed�d�e�	e
jekd�d
d���Ze�	e
jekd�dd
��Ze�eed�d�e�	e
jekd�dd���Ze�eed�d�dd��ZdS)�ThreadJoinOnShutdowncCs8d|}td|�\}}}|���dd�}|�|d�dS)Na�if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print('end of thread')
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
        
z-cr�r�zend of main
end of thread
)rr�r�r;)r�scriptrurvrwr�r
r
r�
_run_and_join�sz"ThreadJoinOnShutdown._run_and_joincCsd}|�|�dS)Nz�if 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print('end of main')
            )r�)rr�r
r
r�test_1_join_on_shutdowns	z,ThreadJoinOnShutdown.test_1_join_on_shutdownr�zneeds os.fork()zdue to known OS bugcCsd}|�|�dS)NaGif 1:
            childpid = os.fork()
            if childpid != 0:
                os.waitpid(childpid, 0)
                sys.exit(0)

            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print('end of main')
            )r�)rr�r
r
r�test_2_join_in_forked_processsz2ThreadJoinOnShutdown.test_2_join_in_forked_processcCsd}|�|�dS)Na:if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            )r�)rr�r
r
r�!test_3_join_in_forked_from_thread)sz6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadcCs"d}td|�\}}}|�|�dS)Na�if True:
            import os
            import random
            import sys
            import time
            import threading

            thread_has_run = set()

            def random_io():
                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
                while True:
                    in_f = open(os.__file__, 'rb')
                    stuff = in_f.read(200)
                    null_f = open(os.devnull, 'wb')
                    null_f.write(stuff)
                    time.sleep(random.random() / 1995)
                    null_f.close()
                    in_f.close()
                    thread_has_run.add(threading.current_thread())

            def main():
                count = 0
                for _ in range(40):
                    new_thread = threading.Thread(target=random_io)
                    new_thread.daemon = True
                    new_thread.start()
                    count += 1
                while len(thread_has_run) < count:
                    time.sleep(0.001)
                # Trigger process shutdown
                sys.exit(0)

            main()
            z-c)rr7)rr�rurvrwr
r
r�test_4_daemon_threadsBs'z*ThreadJoinOnShutdown.test_4_daemon_threadscCsVdd�}g}x.td�D]"}tj|d�}|�|�|��qWx|D]}|��qBWdS)NcSs,t��}|dkrt�|d�n
t�d�dS)Nr)r�r�r�r�)r�r
r
r�do_fork_and_waitsszIThreadJoinOnShutdown.test_reinit_tls_after_fork.<locals>.do_fork_and_wait�)rp)r/rrr0r5r6)rr�r<r=r>r
r
r�test_reinit_tls_after_forkms	

z/ThreadJoinOnShutdown.test_reinit_tls_after_forkcCs�g}x2td�D]&}tjdd�d�}|�|�|��qWt��}|dkrptt�	��dkrdt�
d�q�t�
d�nt�|d�\}}|�d|�x|D]}|�
�q�WdS)Nr�cSs
t�d�S)Ng333333�?)r#r$r
r
r
rrn�rozKThreadJoinOnShutdown.test_clear_threads_states_after_fork.<locals>.<lambda>)rprr)r/rrr0r5r�r��lenr~�_current_framesr�r�r;r6)rr<r=r>r�r�r�r
r
r�$test_clear_threads_states_after_fork�s

z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)rrrr�r�rMr�r�r�r�r~r�r�r�r�r�r�r�r
r
r
rr��s+r�c@s(eZdZdd�Zdd�Zedd��ZdS)�SubinterpThreadingTestscCsbt��\}}|�tj|�|�tj|�d|f}tj�|�}|�|d�|�t�|d�d�dS)Naif 1:
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
            rr�x)	r��piper��closer'r	�run_in_subinterpr;�read)r�r�wr�rir
r
r�test_threads_join�s
z)SubinterpThreadingTests.test_threads_joincCsbt��\}}|�tj|�|�tj|�d|f}tj�|�}|�|d�|�t�|d�d�dS)Na�if 1:
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                tls.x = Sleeper()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
            rrr�)	r�r�r�r�r'r	r�r;r�)rr�r�r�rir
r
r�test_threads_join_2�s
z+SubinterpThreadingTests.test_threads_join_2c	CsHd}d|f}tj���td|�\}}}WdQRX|�d|���dS)NaAif 1:
            import os
            import threading
            import time

            def f():
                # Make sure the daemon thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(10)
            threading.Thread(target=f, daemon=True).start()
            z[if 1:
            import _testcapi

            _testcapi.run_in_subinterp(%r)
            z-cz:Fatal Python error: Py_EndInterpreter: not the last thread)r'r	ZSuppressCrashReportrrVr�)rZsubinterp_coder�rurvrwr
r
r�test_daemon_threads_fatal_error�s
z7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)rrrr�r�rr�r
r
r
rr��s'r�c@s|eZdZdd�Zdd�Zdd�Zdd�Zd	d
�Ze�	e
jdkoFej
��d�d
d��Zdd�Zedd��Zdd�Zdd�ZdS)�ThreadingExceptionTestscCs*t��}|��|�t|j�|��dS)N)rrr5rr�RuntimeErrorr6)rr�r
r
r�test_start_thread_againsz/ThreadingExceptionTests.test_start_thread_againcCst��}|�t|j�dS)N)rrQrrr�r6)rrQr
r
r�test_joining_current_threadsz3ThreadingExceptionTests.test_joining_current_threadcCst��}|�t|j�dS)N)rrrrr�r6)rr�r
r
r�test_joining_inactive_threadsz4ThreadingExceptionTests.test_joining_inactive_threadcCs.t��}|��|�tt|dd�|��dS)NrgT)rrr5rrr��setattrr6)rr�r
r
r�test_daemonize_active_threadsz4ThreadingExceptionTests.test_daemonize_active_threadcCst��}|�t|j�dS)N)rrTrrr�rR)r�lockr
r
r�test_releasing_unacquired_locksz6ThreadingExceptionTests.test_releasing_unacquired_lock�darwinztest macosx problemcCshd}d}tjtjd|gtjtjd�}|��\}}|���dd�}|�|j	dd|���|�||�dS)	Naif True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RecursionError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            zend of main thread
z-c)�stdout�stderrr�r�rzUnexpected error: )
�
subprocess�Popenr~�
executable�PIPEZcommunicater�r�r;�
returncode)rr�Zexpected_output�pr�r�r�r
r
r�test_recursion_limitsz,ThreadingExceptionTests.test_recursion_limitcCs\d}td|�\}}}|�|d�|��}|�d|�|�d|�|�d|�|�d|�dS)Na�if True:
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            z-crozException in threadz"Traceback (most recent call last):�ZeroDivisionErrorzUnhandled exception)rr;r�rVr�)rr�rurvrwr
r
r�test_print_exception:sz,ThreadingExceptionTests.test_print_exceptioncCs\d}td|�\}}}|�|d�|��}|�d|�|�d|�|�d|�|�d|�dS)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            sys.stderr = None
            running = False
            t.join()
            z-crozException in threadz"Traceback (most recent call last):rzUnhandled exception)rr;r�rVr�)rr�rurvrwr
r
r�%test_print_exception_stderr_is_none_1Usz=ThreadingExceptionTests.test_print_exception_stderr_is_none_1cCs4d}td|�\}}}|�|d�|�d|���dS)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            sys.stderr = None
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            z-crozUnhandled exception)rr;r�r�)rr�rurvrwr
r
r�%test_print_exception_stderr_is_none_2ssz=ThreadingExceptionTests.test_print_exception_stderr_is_none_2csXdd��G�fdd�dtj�}|�}|��|��|�|j�|�|jt�d|_dS)NcSs�dS)Nr
r
r
r
r�
bare_raise�szOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.bare_raisecseZdZdZ�fdd�ZdS)zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558Nc
s8y
��Wn(tk
r2}z
||_Wdd}~XYnXdS)N)rd�exc)rr)rr
rr%�s
zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558.run)rrrrr%r
)rr
r�
Issue27558�sr)rrr5r6r:rrWr�)rrr�r
)rr�#test_bare_raise_in_brand_new_thread�s	z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadN)rrrr�r�r�r�r�rMr�r~r�r'r	Zpython_is_optimizedrrrrrr	r
r
r
rr��s r�c@s$eZdZdd�Zdd�Zdd�ZdS)�
TimerTestscCst�|�g|_t��|_dS)N)r&r)�
callback_argsrrC�callback_event)rr
r
rr)�s
zTimerTests.setUpcCs�t�d|j�}|��|j��|j�d�d|jd<|j�	�t�d|j�}|��|j��|�
t|j�d�|�
|jdifdifg�|�
�|�
�dS)Ng{�G�z�?ZblahZbarZfoor�r
)rZTimer�
_callback_spyr5rrGrlr0r��clearr;r�rr6)rZtimer1Ztimer2r
r
r� test_init_immutable_default_args�s



z+TimerTests.test_init_immutable_default_argscOs*|j�|dd�|��f�|j��dS)N)rr0�copyrrA)rrlr�r
r
rr
�szTimerTests._callback_spyN)rrrr)rr
r
r
r
rr
�sr
c@seZdZeej�ZdS)�	LockTestsN)rrr�staticmethodrrT�locktyper
r
r
rr�src@seZdZeej�ZdS)�PyRLockTestsN)rrrrrZ_PyRLockrr
r
r
rr�srzRLock not implemented in Cc@seZdZeej�ZdS)�CRLockTestsN)rrrrr�_CRLockrr
r
r
rr�src@seZdZeej�ZdS)�
EventTestsN)rrrrrrCZ	eventtyper
r
r
rr�src@seZdZeej�ZdS)�ConditionAsRLockTestsN)rrrrr�	Conditionrr
r
r
rr�src@seZdZeej�ZdS)�ConditionTestsN)rrrrrrZcondtyper
r
r
rr�src@seZdZeej�ZdS)�SemaphoreTestsN)rrrrrZ	Semaphore�semtyper
r
r
rr�src@seZdZeej�ZdS)�BoundedSemaphoreTestsN)rrrrrr-rr
r
r
rr�src@seZdZeej�ZdS)�BarrierTestsN)rrrrrZBarrierZbarriertyper
r
r
rr�src@seZdZdd�ZdS)�MiscTestCasecCs&dh}ddh}tj|td||d�dS)Nrkr@r�)rrE)�extra�	blacklist)r	Zcheck__all__r)rr r!r
r
r�test__all__�s
zMiscTestCase.test__all__N)rrrr"r
r
r
rr�src@s$eZdZdd�Zdd�Zdd�ZdS)�InterruptMainTestsc	CsFdd�}tj|d�}|�t��|��|��WdQRX|��dS)NcSst��dS)N)rE�interrupt_mainr
r
r
r�call_interrupt�szHInterruptMainTests.test_interrupt_main_subthread.<locals>.call_interrupt)rp)rrrr�KeyboardInterruptr5r6)rr%r>r
r
r�test_interrupt_main_subthread�sz0InterruptMainTests.test_interrupt_main_subthreadc	Cs"|�t��t��WdQRXdS)N)rrr&rEr$)rr
r
r�test_interrupt_main_mainthread�sz1InterruptMainTests.test_interrupt_main_mainthreadc
CsVt�tj�}z4t�tjtj�t��t�tjtj�t��Wdt�tj|�XdS)N)�signal�	getsignal�SIGINT�SIG_IGNrEr$�SIG_DFL)rZhandlerr
r
r�test_interrupt_main_noerror�sz.InterruptMainTests.test_interrupt_main_noerrorN)rrrr'r(r.r
r
r
rr#�sr#�__main__)2�__doc__Ztest.supportr'rrrrZtest.support.script_helperrrr!r~rErr#rMr�r�r�r)rr	r�r�r
rrZTestCaser&r+r�r�r�r
rZ
RLockTestsrr�rrrrrrrrrr#rr�r
r
r
r�<module>sZ
!	)'_$