HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.10/test/__pycache__/test_threading.cpython-310.opt-1.pyc
o

�i���@s�dZddlZddlmZddlmZmZmZddlmZddl	m
Z
mZddlZddl
Z
ddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlmZddlmZdd	lmZd
Zee
d�Zdd
�ZGdd�de�Z Gdd�dej!�Z"Gdd�dej#�Z$Gdd�de$�Z%Gdd�de$�Z&Gdd�de$�Z'Gdd�de$�Z(Gdd�dej!�Z)Gdd�de$�Z*Gd d!�d!e$�Z+Gd"d#�d#ej,�Z,Gd$d%�d%ej-�Z.e�/ej0dud&�Gd'd(�d(ej-��Z1Gd)d*�d*ej2�Z2Gd+d,�d,ej-�Z3Gd-d.�d.ej4�Z4Gd/d0�d0ej5�Z5Gd1d2�d2ej6�Z6Gd3d4�d4ej7�Z7Gd5d6�d6ej#�Z8Gd7d8�d8ej#�Z9Gd9d:�d:ej#�Z:e;d;k�rDe�<�dSdS)<z!
Tests for the threading module.
�N)�threading_helper)�verbose�cpython_only�	os_helper)�
import_module)�assert_python_ok�assert_python_failure)�mock)�
lock_tests)�support)Znetbsd5zhp-ux11ZgettotalrefcountcCs|�ttdtj�tjt_dS)N�
excepthook)�
addCleanup�setattr�	threadingr�__excepthook__)�testcase�r�0/usr/local/lib/python3.10/test/test_threading.py�restore_default_excepthook'src@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�CountercC�
d|_dS)Nr��value��selfrrr�__init__.�
zCounter.__init__cCs|jd7_dS�N�rrrrr�inc0�zCounter.inccCs|jd8_dSrrrrrr�dec2r zCounter.deccCs|jS�Nrrrrr�get4szCounter.getN)�__name__�
__module__�__qualname__rrr!r#rrrrr-s
rc@�eZdZdd�Zdd�ZdS)�
TestThreadcCs,tjj||d�||_||_||_||_dS)N��name)r�Threadrr�sema�mutex�nrunning)rr*rr,r-r.rrrr8s

zTestThread.__init__c	Cs6t��d}trtd|j|df�|j�{|j�!|j��tr*t|j��d�|j	�
|j��d�Wd�n1s>wYt�|�trQtd|jd�|j�%|j�
�|j	�|j��d�trrtd	|j|j��f�Wd�n1s|wYWd�dSWd�dS1s�wYdS)
Ng��@ztask %s will run for %.1f usecg��.Aztasks are running�Ztask�donerz$%s is finished. %d tasks are running)�randomr�printr*r,r-r.rr#rZassertLessEqual�time�sleepr!ZassertGreaterEqual)r�delayrrr�run?s6�
�

����"�zTestThread.runN)r$r%r&rr6rrrrr(7sr(c@r')�BaseTestCasecCst��|_dSr")rZthreading_setup�_threadsrrrr�setUpY�zBaseTestCase.setUpcCstj|j�tj��dSr")rZthreading_cleanupr8�testr�
reap_childrenrrrr�tearDown\szBaseTestCase.tearDownN)r$r%r&r9r=rrrrr7Xsr7c@s�eZdZedd��Zedd��Zdd�Zdd�Zd	d
�Zdd�Z	d
d�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd�Zdd �Zd!d"�Ze�eed#�d$�d%d&��Ze�eed#�d'�d(d)��Ze�eed#�d$�d*d+��Zd,d-�Ze�eed#�d.�e�eed/�d0�d1d2���Ze�ej e!vd3�e�eed#�d.�e�eed/�d0�d4d5����Z"d6d7�Z#d8d9�Z$d:d;�Z%d<d=�Z&d>d?�Z'ed@dA��Z(dBdC�Z)dDdE�Z*edFdG��Z+dHdI�Z,dJdK�Z-dLdM�Z.e�e/dN�dOdP��Z0dQdR�Z1dSS)T�ThreadTestscCs>dd�}tjdd�}|�|jd�tjdd�}|�|jd�tj|dd�}|�|jd�tjjtd	d
d��tjdd�}|�|jd
�Wd�n1sMwYtjjtd	dd��t��}|�|jd�Wd�n1sqwYtjjtd	dd��tj|d�}|�|jd�Wd�dS1s�wYdS)NcS�dSr"rrrrr�funce�z#ThreadTests.test_name.<locals>.funcZmyname1r)�{Z123Zmyname2)�targetr*�_counter�)Zreturn_value�zThread-2r/zThread-3��rCzThread-5 (func))rr+�assertEqualr*r	Zpatch�object)rr@�threadrrr�	test_namecs&��"�zThreadTests.test_namecCst��}tj�|t|��dSr")r�Lockr;rZcheck_disallow_instantiation�type�r�lockrrr�test_disallow_instantiation~sz'ThreadTests.test_disallow_instantiationc	Cs.d}tjdd�}t��}t�}g}t|�D]#}td|||||�}|�|�|�|j�|�	t
|�d�|��qttd�r\t
dd�|D��t��hB}|�d|�|�t|�|d	�trbtd
�|D]"}|��|�|���|�|jd�|�|j�|�	t
|�d�qdtr�td
�|�|��d�dS)N�
r/rz<thread %d>z^<TestThread\(.*, initial\)>$�
get_native_idcss�|]}|jVqdSr")�	native_id)�.0�trrr�	<genexpr>�s�z/ThreadTests.test_various_ops.<locals>.<genexpr>rz!waiting for all tasks to completerz#^<TestThread\(.*, stopped -?\d+\)>$zall tasks done)r�BoundedSemaphore�RLockr�ranger(�append�assertIsNone�ident�assertRegex�repr�start�hasattr�setrS�assertNotInrI�lenrr2�join�assertFalse�is_alive�assertNotEqual�assertIsNotNoner#)	rZNUMTASKSr,r-Z
numrunning�threads�irVZ
native_idsrrr�test_various_ops�s4


zThreadTests.test_various_opscs�|�t��j���fdd�}t���g�t���t�|d�}��	�|�
�d|�Wd�n1s6wYtj�d=dS)Ncs��t��j����dSr")r[r�current_threadr]rbr�r0r]rr�f�sz9ThreadTests.test_ident_of_no_threading_threads.<locals>.frr)rirrmr]�Eventr�wait_threads_exit�_thread�start_new_thread�waitrI�_active)rro�tidrrnr�"test_ident_of_no_threading_threads�s
�z.ThreadTests.test_ident_of_no_threading_threadscC�Ltrtd�zt�d�Wn
tjyt�d��w|��t�d�dS)Nz!with 256 KiB thread stack size...i�4platform does not support changing thread stack sizer�	rr2r�
stack_sizerr�error�unittestZSkipTestrlrrrr�test_various_ops_small_stack����z(ThreadTests.test_various_ops_small_stackcCrx)Nzwith 1 MiB thread stack size...iryrrzrrrr�test_various_ops_large_stack�rz(ThreadTests.test_various_ops_large_stackcCs�dd�}t��}|��t���t�||f�}|��Wd�n1s&wY|�|tj�|�	tj|tj
�|�tj|���|�
ttj|�d�tj|=dS)NcSst��|��dSr")rrm�release)r-rrrro�sz*ThreadTests.test_foreign_thread.<locals>.f�_DummyThread)rrM�acquirerrqrrrs�assertInru�assertIsInstancer��
assertTruergr^r_)rror-rvrrr�test_foreign_thread�s

�zThreadTests.test_foreign_threadc	s�td�}|jj}|j|jf|_Gdd�dt��|���}t��}|�	|t
�|�|d�z|||�}		q3�y=Ynwz|�
|d�Wn	tyOYnwt���t���G���fdd	�d	tj�}|�}d|_|��trutd
�tr{td�|d|�}|�
|d�tr�td
����}|�|�tr�td�|�|j�tr�td�||j|�}|�
|d�tr�td��jtjd�|�|j�tr�td�|jr�|��dSdS)N�ctypesc@seZdZdS)z<ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.AsyncExcN)r$r%r&rrrr�AsyncExc�sr�rTzAsyncExc not raisedrcseZdZ���fdd�ZdS)z:ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.WorkercsJt��|_d|_z	���t�d�q
�y$d|_���YdSw)NFTg�������?)r�	get_ident�id�finishedrbr3r4r�r�Zworker_saw_exceptionZworker_startedrrr6s

��z>ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Worker.runN�r$r%r&r6rr�rr�Workersr�z    started worker threadz     trying nonsensical thread id���z,    waiting for worker thread to get startedz"    verifying worker hasn't exitedz2    attempting to raise asynch exception in workerz5    waiting for worker to say it caught the exception��timeoutz    all OK -- joining worker)rZ	pythonapiZPyThreadState_SetAsyncExcZc_ulongZ	py_object�argtypes�	Exceptionrr�r��intZ
assertGreaterZfailrI�UnboundLocalErrorrpr+�daemonr`rr2rtr�rfr�r�r�
SHORT_TIMEOUTre)	rr�Z
set_async_exc�	exceptionrv�resultr�rV�retrr�r�test_PyThreadState_SetAsyncExc�sj

���


�z*ThreadTests.test_PyThreadState_SetAsyncExccCs\dd�}tj}|t_ztjdd�d�}|�tj|j�|�|tjvd�W|t_dS|t_w)NcWst���r")r�ThreadError��argsrrr�fail_new_threadB�z7ThreadTests.test_limbo_cleanup.<locals>.fail_new_threadcSr?r"rrrrr�<lambda>GrAz0ThreadTests.test_limbo_cleanup.<locals>.<lambda>rHz:Failed to cleanup _limbo map on failure of Thread.start().)r�_start_new_threadr+�assertRaisesr�r`rf�_limbo)rr�r�rVrrr�test_limbo_cleanup@s�zThreadTests.test_limbo_cleanupcCs(td�tdd�\}}}|�|d�dS)Nr��-caNif 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            �*)rrrI�r�rc�out�errrrr�test_finalize_running_threadOsz(ThreadTests.test_finalize_running_threadcCstdd�dS)Nr�aPif 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print('program blocked; aborting')
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            )rrrrr�test_finalize_with_tracepsz$ThreadTests.test_finalize_with_tracecCs0tdd�\}}}|�|��d�|�|d�dS)Nr�a�if 1:
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print("Woke up, sleep function is:", sleep)

                threading.Thread(target=child).start()
                raise SystemExit
            s5Woke up, sleep function is: <built-in function sleep>�)rrI�stripr�rrr�test_join_nondaemon_on_shutdown�s


�z+ThreadTests.test_join_nondaemon_on_shutdownc	Cs�tj}t��}z5tdd�D]'}t�|d�tjdd�d�}|��|��|�}|�	||d||f�q
Wt�|�dSt�|�w)Nr�dg-C��6*?cSr?r"rrrrrr��rAz7ThreadTests.test_enumerate_after_join.<locals>.<lambda>rHz&#1703448 triggered after %d trials: %s)
r�	enumerate�sys�getswitchintervalrZ�setswitchintervalr+r`rerc)r�enum�old_intervalrkrV�lrrr�test_enumerate_after_join�s
��	z%ThreadTests.test_enumerate_after_joincCs�Gdd�dt�}t|�|dd�}t�|�}|j��~|j|�dt�|��d�|dd�}t�|�}|j��~|j|�dt�|��d�dS)Nc@r')zDThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunctioncSs.||_tj|j|fd|id�|_|j��dS)N�yet_another)rCr��kwargs)�should_raiserr+�_runrKr`)rr�rrrr�s�zMThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction.__init__cSs|jrt�dSr")r��
SystemExit)rZ	other_refr�rrrr��s�zIThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction._runN)r$r%r&rr�rrrr�RunSelfFunction�s	r�F)r�z%d references still around)�msgT)	rJr�weakref�refrKrer\r��getrefcount)rr�Z
cyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectrrr�test_no_refcycle_through_target�s(



��



�
�z+ThreadTests.test_no_refcycle_through_targetcCs�t��}|�td��|��Wd�n1swY|�td��
|�d�Wd�n1s4wY|�td��|��Wd�n1sNwY|�td��
|�d�Wd�n1siwYt��}|�td��|�	�Wd�n1s�wYt�
�}|��|�td��|��Wd�n1s�wY|�td	��t�
�Wd�n1s�wY|�td
��
t��Wd�dS1s�wYdS)Nzget the daemon attributezset the daemon attributeTzget the name attributezset the name attributer*zuse is_set()zuse notify_all()zuse active_count()zuse current_thread())rr+ZassertWarnsRegex�DeprecationWarning�isDaemon�	setDaemon�getName�setNamerp�isSet�	Conditionr��	notifyAll�activeCount�
currentThread)rrV�e�condrrr�test_old_threading_api�sH�
����
���
�
�
�
"�z"ThreadTests.test_old_threading_apicCs2t��}|�dt|��d|_|�dt|��dS�Nr�T)rr+rcr_r�r��rrVrrr�test_repr_daemon�szThreadTests.test_repr_daemoncCsHt��}|�|j�tjdd�}|�|j�tjdd�}|�|j�dS)NF�r�T)rr+rfr�r�r�rrr�test_daemon_param�szThreadTests.test_daemon_param�fork�needs os.fork()cCs:t�d�}td|�\}}}|�|d�|�|��d�dS)Nat
            import atexit
            import os
            import sys
            from test.support import wait_process

            # Import the threading module to register its "at fork" callback
            import threading

            def exit_handler():
                pid = os.fork()
                if not pid:
                    print("child process ok", file=sys.stderr, flush=True)
                    # child process
                else:
                    wait_process(pid, exitcode=0)

            # exit_handler() will be called after threading._shutdown()
            atexit.register(exit_handler)
        r�r�schild process ok)�textwrap�dedentrrI�rstrip�r�code�_r�r�rrr�test_fork_at_exit�s
zThreadTests.test_fork_at_exitztest needs fork()cCs0d}td|�\}}}|�|d�|�|d�dS)Na�if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        r�r��rrIr�rrr�test_dummy_thread_after_forksz(ThreadTests.test_dummy_thread_after_forkcCs�t��}|�tj|�tj�d�td�D]-}tjdd�d�}|�	�t
��}|dkr7t
�|�
�r3dnd�q|��tj|dd	�qdS)
Ng���ư>�cSr?r"rrrrrr�>rAz6ThreadTests.test_is_alive_after_fork.<locals>.<lambda>rHr�rR��exitcode)r�r�r
r�r;rrZrr+r`�osr��_exitrgre�wait_process)rr�rkrV�pidrrr�test_is_alive_after_fork3s�z$ThreadTests.test_is_alive_after_forkcsht��}��|jd���|jt��j���|jt����fdd�}tj|d�}|��|�	�dS)N�
MainThreadcs��t��jt��j�dSr")rhr�main_threadr]rmrrrrroNs�z'ThreadTests.test_main_thread.<locals>.frH)
rr�rIr*r]rmr�r+r`re)r�mainro�thrrr�test_main_threadHszThreadTests.test_main_threadztest needs os.fork()�waitpidztest needs os.waitpid()cC�@d}td|�\}}}|���dd�}|�|d�|�|d�dS)Na�if 1:
            import os, threading
            from test import support

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                support.wait_process(pid, exitcode=0)
        r��
rFr�zMainThread
True
True
�r�decode�replacerI�rr�r�r�r��datarrr�test_main_thread_after_forkUs

z'ThreadTests.test_main_thread_after_fork�due to known OS bugcCr�)Na�if 1:
            import os, threading, sys
            from test import support

            def func():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    support.wait_process(pid, exitcode=0)

            th = threading.Thread(target=func)
            th.start()
            th.join()
        r�r�rFr�zThread-1 (func)
True
True
r�r�rrr�/test_main_thread_after_fork_from_nonmain_threadjs
z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadcCsBd}td|�\}}}|��}|�|d�|�|��dgd�dS)Na�if 1:
            import gc, threading

            main_thread = threading.current_thread()
            assert main_thread is threading.main_thread()  # sanity check

            class RefCycle:
                def __init__(self):
                    self.cycle = self

                def __del__(self):
                    print("GC:",
                          threading.current_thread() is main_thread,
                          threading.main_thread() is main_thread,
                          threading.enumerate() == [main_thread])

            RefCycle()
            gc.collect()  # sanity check
            x = RefCycle()
        r�r�zGC: True True TruerE)rr�rI�
splitlinesr�rrr� test_main_thread_during_shutdown�s
�z,ThreadTests.test_main_thread_during_shutdowncCs$d}td|�\}}}|�|d�dS)Na�if 1:
            import os
            import threading
            import time
            import random

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_Finalize() is called.
                random_sleep()
                tls.x = Sleeper()
                random_sleep()

            threading.Thread(target=f).start()
            random_sleep()
        r�r�r��rr�r�r�r�rrr�test_finalization_shutdown�sz&ThreadTests.test_finalization_shutdowncs�t���t�����������fdd�}tj|d�}|�|jd�|�����|�|�	��|j}|�
|jdd�d����|�|jtj
d�d�|�|�	��|��|�
|�	��|�|j�|��dS)Ncs������t�d�dS)N�{�G�z�?)r�r�r3r4r�Zfinish�startedrrro�sz'ThreadTests.test_tstate_lock.<locals>.frHrr�F)rr�
allocate_lockr�rr+�assertIs�_tstate_lockr`r�rgrfr�rr�r\re)rrorV�tstate_lockrrr�test_tstate_lock�s&zThreadTests.test_tstate_lockcs�t���t�����������fdd�}tj|d�}|�����|�dt|�����d}t	d�D]}|t|�vrAnt
�d�q7|�|t|��|��dS)Ncs������dSr")r�r�rrrrro�sz(ThreadTests.test_repr_stopped.<locals>.frHr�stoppedi�r)
rrrr�rr+r`r�r_r�rZr3r4re)rrorVZLOOKING_FORrkrrr�test_repr_stopped�s"zThreadTests.test_repr_stoppedcs�tdd�D]H}t�|���fdd�t|�D�}|D]}|��q|D]}|��q"�fdd�t|�D�}|D]}|��q6|D]}|��q?|�t�j�qdS)NrrRc�g|]	}tj�jd��qS�rH)rr+r��rUr���bsrr�
<listcomp>
��z;ThreadTests.test_BoundedSemaphore_limit.<locals>.<listcomp>cr
r)rr+r�rr
rrrr)rZrrXr`rer��
ValueErrorr�)r�limitrjrVrr
r�test_BoundedSemaphore_limits$

�


�

�z'ThreadTests.test_BoundedSemaphore_limitc	s��fdd��dd����fdd��d�_t��}t���z t���ddl}|���td�D]}��q0Wt�|�dSt�|�w)	Nc��Sr"r��frame�event�arg��
noop_tracerrr �z9ThreadTests.test_frame_tstate_tracing.<locals>.noop_tracecss�	dVq)Nr�	generatorrrrrrr$s��z8ThreadTests.test_frame_tstate_tracing.<locals>.generatorcs�jdur	���_t�j�Sr")�gen�nextr)�callbackrrrr(s

z7ThreadTests.test_frame_tstate_tracing.<locals>.callbackrr/)rr��gettrace�settracer�	_testcapiZcall_in_temporary_c_threadrZ)r�	old_tracer"r;r)rrrr�test_frame_tstate_tracings


�z%ThreadTests.test_frame_tstate_tracingc	sP�fdd��t��}zt���t��}|��|�Wt�|�dSt�|�w)Ncrr"rrrrrrArz-ThreadTests.test_gettrace.<locals>.noop_trace)rr r!rI)rr#Z
trace_funcrrr�
test_gettrace@s
zThreadTests.test_gettracec	CsHdd�}t��}zt�|�|�|t���Wt�|�dSt�|�w)NcWr?r"rr�rrr�fnMrAz'ThreadTests.test_getprofile.<locals>.fn)r�
getprofile�
setprofilerI)rr&Zold_profilerrr�test_getprofileLs
zThreadTests.test_getprofilec	Cs�dD]K}|j|d��;t��}tj|j|d�}|��|j}|s(|�|tj�n|�	|tj�|�
�|��|�	|tj�Wd�n1sHwYqdS)N)FTr�)rCr�)ZsubTestrrpr+rtr`rr��_shutdown_locksrcrbre)rr�rrKrrrr�test_shutdown_locksUs���zThreadTests.test_shutdown_lockscCs$tdd�\}}}|�|��d�dS)Nr�a(if 1:
            import threading

            class Atexit:
                def __del__(self):
                    print("thread_dict.atexit = %r" % thread_dict.atexit)

            thread_dict = threading.local()
            thread_dict.atexit = "value"

            atexit = Atexit()
        sthread_dict.atexit = 'value')rrIr�r�rrr�test_locals_at_exitmszThreadTests.test_locals_at_exitcCsBGdd�dt�}|�}tj|d�}|��|��|�|j�dS)Nc@�$eZdZdd�Zdd�Zdd�ZdS)z6ThreadTests.test_boolean_target.<locals>.BooleanTargetcSr�NF��ranrrrrr�rz?ThreadTests.test_boolean_target.<locals>.BooleanTarget.__init__cSsdSr.rrrrr�__bool__�sz?ThreadTests.test_boolean_target.<locals>.BooleanTarget.__bool__cSr)NTr/rrrr�__call__�rz?ThreadTests.test_boolean_target.<locals>.BooleanTarget.__call__N)r$r%r&rr1r2rrrr�
BooleanTarget�sr3rH)rJrr+r`rer�r0)rr3rCrKrrr�test_boolean_target~szThreadTests.test_boolean_targetcCsFdd�}t���tj|d���Wd�dS1swYdS)NcSr?r"rrrrr�noop�rAz0ThreadTests.test_leak_without_join.<locals>.nooprH)rrqrr+r`)rr5rrr�test_leak_without_join�s
"�z"ThreadTests.test_leak_without_joinzneed debug build (Py_DEBUG)cCs*tddddd�\}}}d}|�||�dS)Nz	-Wdefaultr��pass�1)ZPYTHONTHREADDEBUGs�DeprecationWarning: The threading debug (PYTHONTHREADDEBUG environment variable) is deprecated and will be removed in Python 3.12)rr�)rr�r�r�r�rrr�test_debug_deprecation�s
�z"ThreadTests.test_debug_deprecationcCs6t�d�}td|�\}}}|�|d�|�|d�dS)Na�
            import _thread
            import sys

            event = _thread.allocate_lock()
            event.acquire()

            def import_threading():
                import threading
                event.release()

            if 'threading' in sys.modules:
                raise Exception('threading is already imported')

            _thread.start_new_thread(import_threading, ())

            # wait until the threading module is imported
            event.acquire()
            event.release()

            if 'threading' not in sys.modules:
                raise Exception('threading is not imported')

            # don't wait until the thread completes
        r�r�)r�r�rrIr�rrr�test_import_from_another_thread�s
z+ThreadTests.test_import_from_another_threadN)2r$r%r&rrLrQrlrwr~r�r�r�r�r�r�r�r�r�r�r�r�r}�
skipUnlessrar�r�r�r�r�r��skipIfr��platform�platforms_to_skipr�r�r�rr	rr$r%r)r+r,r4r6�Py_DEBUGr9r:rrrrr>asf

$X!!



##
'	


	r>c@s�eZdZdd�Zdd�Ze�eed�d�e�	e
jevd�dd	���Z
e�eed�d�e�	e
jevd�d
d���Ze�	e
jevd�dd
��Ze�eed�d�e�	e
jevd�dd���Ze�eed�d�dd��ZdS)�ThreadJoinOnShutdowncCs8d|}td|�\}}}|���dd�}|�|d�dS)Na�if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print('end of thread')
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
        
r�r�rFzend of main
end of thread
r�)r�scriptr�r�r�r�rrr�
_run_and_join�s
�z"ThreadJoinOnShutdown._run_and_joincC�d}|�|�dS)Nz�if 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print('end of main')
            �rB�rrArrr�test_1_join_on_shutdown�sz,ThreadJoinOnShutdown.test_1_join_on_shutdownr�r�r�cCrC)Na�if 1:
            from test import support

            childpid = os.fork()
            if childpid != 0:
                # parent process
                support.wait_process(childpid, exitcode=0)
                sys.exit(0)

            # child process
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print('end of main')
            rDrErrr�test_2_join_in_forked_process�sz2ThreadJoinOnShutdown.test_2_join_in_forked_processcCrC)Na�if 1:
            from test import support

            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    # parent process
                    support.wait_process(childpid, exitcode=0)
                    sys.exit(0)

                # child process
                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            rDrErrr�!test_3_join_in_forked_from_thread�sz6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadcCs"d}td|�\}}}|�|�dS)Nacif True:
            import os
            import random
            import sys
            import time
            import threading

            thread_has_run = set()

            def random_io():
                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
                while True:
                    with open(os.__file__, 'rb') as in_f:
                        stuff = in_f.read(200)
                        with open(os.devnull, 'wb') as null_f:
                            null_f.write(stuff)
                            time.sleep(random.random() / 1995)
                    thread_has_run.add(threading.current_thread())

            def main():
                count = 0
                for _ in range(40):
                    new_thread = threading.Thread(target=random_io)
                    new_thread.daemon = True
                    new_thread.start()
                    count += 1
                while len(thread_has_run) < count:
                    time.sleep(0.001)
                # Trigger process shutdown
                sys.exit(0)

            main()
            r��rrf�rrAr�r�r�rrr�test_4_daemon_threadss!z*ThreadJoinOnShutdown.test_4_daemon_threadscCsNdd�}g}td�D]}tj|d�}|�|�|��q
|D]}|��qdS)NcSs0t��}|dkrtj|dd�dSt�d�dS)Nr�2r�)r�r�rr�r�)r�rrr�do_fork_and_waitEszIThreadJoinOnShutdown.test_reinit_tls_after_fork.<locals>.do_fork_and_wait�rH)rZrr+r[r`re)rrMrjrkrVrrr�test_reinit_tls_after_fork?s	


�z/ThreadJoinOnShutdown.test_reinit_tls_after_forkcCs�g}td�D]}tjdd�d�}|�|�|��qt��}|dkr6tt�	��dkr0t�
d�n
t�
d�ntj|dd	�|D]}|�
�q?dS)
NrNcSs
t�d�S)Ng333333�?)r3r4rrrrr�^s
zKThreadJoinOnShutdown.test_clear_threads_states_after_fork.<locals>.<lambda>rHrr�3�4r�)rZrr+r[r`r�r�rdr��_current_framesr�rr�re)rrjrkrVr�rrr�$test_clear_threads_states_after_forkWs


�z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)r$r%r&rBrFr}r;rar�r<r�r=r>rGrHrKrOrSrrrrr@�s 
(r@c@s0eZdZdd�Zdd�Zdd�Zedd��Zd	S)
�SubinterpThreadingTestscCsFt��\}}|�tj|�|�tj|�ttd�rt�|d�||fS)N�set_blockingF)r��piper
�closerarU)r�r�wrrrrVqs
zSubinterpThreadingTests.pipecC�L|��\}}t�d|f�}tj�|�}|�|d�|�t�|d�d�dS)Na�
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
        rr�x�	rVr�r�r;rZrun_in_subinterprIr��read�rrXrYr�r�rrr�test_threads_joinys�z)SubinterpThreadingTests.test_threads_joincCrZ)Na�
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                tls.x = Sleeper()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
        rrr[r\r^rrr�test_threads_join_2�s�z+SubinterpThreadingTests.test_threads_join_2cCshdtjj�d�}d|f}tj���td|�\}}}Wd�n1s%wY|�d|���dS)Nz�if 1:
            import os
            import threading
            import time

            def f():
                # Make sure the daemon thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(zJ)
            threading.Thread(target=f, daemon=True).start()
            z[if 1:
            import _testcapi

            _testcapi.run_in_subinterp(%r)
            r�z:Fatal Python error: Py_EndInterpreter: not the last thread)r;rr�ZSuppressCrashReportrr�r�)rZsubinterp_coderAr�r�r�rrr�test_daemon_threads_fatal_error�s����z7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)r$r%r&rVr_r`rrarrrrrTps%rTc@sdeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�Zdd�Zdd�Z
dS)�ThreadingExceptionTestscCs*t��}|��|�t|j�|��dSr")rr+r`r��RuntimeErrorre�rrKrrr�test_start_thread_again�sz/ThreadingExceptionTests.test_start_thread_againcC�t��}|�t|j�dSr")rrmr�rcre)rrmrrr�test_joining_current_thread��z3ThreadingExceptionTests.test_joining_current_threadcCrfr")rr+r�rcrerdrrr�test_joining_inactive_thread�rhz4ThreadingExceptionTests.test_joining_inactive_threadcCs.t��}|��|�tt|dd�|��dSr�)rr+r`r�rcrrerdrrr�test_daemonize_active_thread�sz4ThreadingExceptionTests.test_daemonize_active_threadcCrfr")rrMr�rcr�rOrrr�test_releasing_unacquired_lock�rhz6ThreadingExceptionTests.test_releasing_unacquired_lockcCshd}d}tjtjd|gtjtjd�}|��\}}|���dd�}|�|j	dd|���|�||�dS)	Naif True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RecursionError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            zend of main thread
r�)�stdout�stderrr�rFrzUnexpected error: )
�
subprocess�Popenr��
executable�PIPE�communicater�r�rI�
returncode)rrAZexpected_output�prlrmr�rrr�test_recursion_limit�s�z,ThreadingExceptionTests.test_recursion_limitcC�\d}td|�\}}}|�|d�|��}|�d|�|�d|�|�d|�|�d|�dS)Na�if True:
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            r�r��Exception in thread�"Traceback (most recent call last):�ZeroDivisionError�Unhandled exception�rrIr�r�rcrJrrr�test_print_exceptionsz,ThreadingExceptionTests.test_print_exceptioncCrv)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            sys.stderr = None
            running = False
            t.join()
            r�r�rwrxryrzr{rJrrr�%test_print_exception_stderr_is_none_1'sz=ThreadingExceptionTests.test_print_exception_stderr_is_none_1cCs4d}td|�\}}}|�|d�|�d|���dS)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            sys.stderr = None
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            r�r�rz)rrIrcr�rJrrr�%test_print_exception_stderr_is_none_2Dsz=ThreadingExceptionTests.test_print_exception_stderr_is_none_2csXdd��G�fdd�dtj�}|�}|��|��|�|j�|�|jt�d|_dS)NcSs�r"rrrrr�
bare_raise^szOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.bare_raisecseZdZdZ�fdd�ZdS)zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558Nc
s:z��WdSty}z
||_WYd}~dSd}~wwr")r��exc)rr��rrrr6ds��zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558.run)r$r%r&r�r6rr�rr�
Issue27558asr�)rr+r`rerir�r�rc)rr�rKrr�r�#test_bare_raise_in_brand_new_thread]s	
z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadcsLdd��|�tjtj��fdd�td�D�}|D]
}|��|��qdS)NcSsHttjddd��}|�d�t��Wd�dS1swYdS)NrYzutf-8)�encoding� )�openr�TESTFN�write�	traceback�format_stack)�fprrr�modify_filets

"�zQThreadingExceptionTests.test_multithread_modify_file_noerror.<locals>.modify_filecsg|]}tj�d��qSr)rr+)rUrk�r�rrrzs
��zPThreadingExceptionTests.test_multithread_modify_file_noerror.<locals>.<listcomp>r�)r
r�unlinkr�rZr`re)rrjrVrr�r�$test_multithread_modify_file_noerrorrs
�
�z<ThreadingExceptionTests.test_multithread_modify_file_noerrorN)r$r%r&rergrirjrkrur|r}r~r�r�rrrrrb�srbc@�eZdZdd�ZdS)�
ThreadRunFailcC�td��)N�
run failed�rrrrrr6�r�zThreadRunFail.runNr�rrrrr���r�csReZdZ�fdd�Zdd�Zejdd��Zdd�Zd	d
�Z	dd�Z
d
d�Z�ZS)�ExceptHookTestscst|�t���dSr")r�superr9r��	__class__rrr9�szExceptHookTests.setUpcCs�t�d��}tdd�}|��|��Wd�n1swY|����}|�d|j�d�|�|�d|�|�d|�|�d|�dS)	Nrm�excepthook threadr)�Exception in thread �:
�#Traceback (most recent call last):
z   raise ValueError("run failed")zValueError: run failed)	r�captured_outputr�r`re�getvaluer�r�r*)rrmrKrrr�test_excepthook�s

�zExceptHookTests.test_excepthookcCs�t�d��7}ztd��ty4}zt�gt���d��}z	t�|�Wd}nd}wWYd}~nd}~wwWd�n1s?wY|�	��
�}|�dt���d�|�|�d|�|�d|�|�d|�dS)NrmZbugr�r�r�z  raise ValueError("bug")zValueError: bug)
rr�rr�r�ExceptHookArgsr��exc_inforr�r�r�r�)rrmr�r�rrr�test_excepthook_thread_None�s"���z+ExceptHookTests.test_excepthook_thread_NonecCsfGdd�dtj�}t�d��}|�}|��|��Wd�n1s$wY|�|��d�dS)Nc@r�)z4ExceptHookTests.test_system_exit.<locals>.ThreadExitcSst�d�dSr)r��exitrrrrr6�r:z8ExceptHookTests.test_system_exit.<locals>.ThreadExit.runNr�rrrr�
ThreadExit�r�r�rmrF)rr+rr�r`rerIr�)rr�rmrKrrr�test_system_exit�s
�z ExceptHookTests.test_system_exitcs�d��fdd�}zGt�td|��t�}|��|��Wd�n1s&wY|��jt�|�t	�j
�d�|��j�j
j�|�
�j|�Wd�dSd�w)Ncs|�dSr"r)Z	hook_argsr�rr�hook�sz4ExceptHookTests.test_custom_excepthook.<locals>.hookrr�)r�	swap_attrrr�r`rerI�exc_typer�str�	exc_value�
exc_traceback�
__traceback__rrK)rr�rKrr�r�test_custom_excepthook�s
�z&ExceptHookTests.test_custom_excepthookc
s�dd�}d��fdd�}t�td|��?t�td|��(t�d��}t�}|��|��Wd�n1s7wYWd�n1sFwYWd�n1sUwY|�|�	�d�|��d�dS)	NcSr�)N�threading_hook failedr�r�rrr�threading_hook�r�zCExceptHookTests.test_custom_excepthook_fail.<locals>.threading_hookcst|��dSr")r�)r�r�r��Zerr_strrr�sys_hook�sz=ExceptHookTests.test_custom_excepthook_fail.<locals>.sys_hookrrmz#Exception in threading.excepthook:
r�)
rr�rr�r�r�r`rerIr�)rr�r�rmrKrr�r�test_custom_excepthook_fail�s*��
���
�z+ExceptHookTests.test_custom_excepthook_failcCs�dd�}dd�}|�}t�td|��|�}tjt_|�}Wd�n1s'wY|�||�|�||�|�|d�dS)NcSsRt�d��}tdd�}|��|��Wd�|��S1s wY|��S)Nrmr�r))rr�r�r`rer�)�outputrKrrr�
run_thread�s


��z<ExceptHookTests.test_original_excepthook.<locals>.run_threadcSstdtjd�dS)NzRunning a thread failed)�file)r2r�rmr�rrrr��r z@ExceptHookTests.test_original_excepthook.<locals>.threading_hookrzRunning a thread failed
)rr�rrrrIrh)rr�r�Zdefault_outputZcustom_hook_outputZrecovered_outputrrr�test_original_excepthook�s�z(ExceptHookTests.test_original_excepthook)
r$r%r&r9r�rrr�r�r�r�r��
__classcell__rrr�rr��s

r�c@r-)�
TimerTestscCst�|�g|_t��|_dSr")r7r9�
callback_argsrrp�callback_eventrrrrr9�s
zTimerTests.setUpcCs�t�d|j�}|��|j��|j�d�d|jd<|j�	�t�d|j�}|��|j��|�
t|j�d�|�
|jdifdifg�|�
�|�
�dS)NrZblahZbarZfoorEr)r�Timer�
_callback_spyr`r�rtr�r[r��clearrIrdr�re)rZtimer1Ztimer2rrr� test_init_immutable_default_argss



z+TimerTests.test_init_immutable_default_argscOs*|j�|dd�|��f�|j��dSr")r�r[�copyr�rb)rr�r�rrrr�szTimerTests._callback_spyN)r$r%r&r9r�r�rrrrr��sr�c@�eZdZeej�ZdS)�	LockTestsN)r$r%r&�staticmethodrrM�locktyperrrrr��r�c@r�)�PyRLockTestsN)r$r%r&r�r�_PyRLockr�rrrrr�r�r�zRLock not implemented in Cc@r�)�CRLockTestsN)r$r%r&r�r�_CRLockr�rrrrr��r�c@r�)�
EventTestsN)r$r%r&r�rrpZ	eventtyperrrrr�!r�r�c@r�)�ConditionAsRLockTestsN)r$r%r&r�rr�r�rrrrr�$r�r�c@r�)�ConditionTestsN)r$r%r&r�rr�Zcondtyperrrrr�(r�r�c@r�)�SemaphoreTestsN)r$r%r&r�r�	Semaphore�semtyperrrrr�+r�r�c@r�)�BoundedSemaphoreTestsN)r$r%r&r�rrXr�rrrrr�.r�r�c@r�)�BarrierTestsN)r$r%r&r�r�BarrierZbarriertyperrrrr�1r�r�c@r�)�MiscTestCasecCs.t|�dh}ddh}tj|td||d�dS)Nr�r�r�)rrr)�extra�not_exported)rrZcheck__all__r)rr�r�rrr�test__all__6s

�zMiscTestCase.test__all__N)r$r%r&r�rrrrr�5r�r�c@sReZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
jdd��ZdS)�InterruptMainTestscCs\dd�}t�||�}|�tj||�|�t��
t��Wd�dS1s'wYdS)NcSsdddS)Nrrr)�signumrrrr�handlerA�zLInterruptMainTests.check_interrupt_main_with_signal_handler.<locals>.handler)�signalr
r�ryrr�interrupt_main)rr�r�Zold_handlerrrr�(check_interrupt_main_with_signal_handler@s
"�z;InterruptMainTests.check_interrupt_main_with_signal_handlerc
Cs\t�|�}z!t�|tj�t�|�t�|tj�t�|�Wt�||�dSt�||�wr")r��	getsignal�SIG_IGNrrr��SIG_DFL)rr�r�rrr�check_interrupt_main_noerrorJs

z/InterruptMainTests.check_interrupt_main_noerrorcCsZdd�}tj|d�}|�t��|��|��Wd�n1s"wY|��dS)NcSst��dSr")rrr�rrrr�call_interruptZr�zHInterruptMainTests.test_interrupt_main_subthread.<locals>.call_interruptrH)rr+r��KeyboardInterruptr`re)rr�rVrrr�test_interrupt_main_subthreadWs
�z0InterruptMainTests.test_interrupt_main_subthreadcCs8|�t��
t��Wd�dS1swYdSr")r�r�rrr�rrrr�test_interrupt_main_mainthreadbs
"�z1InterruptMainTests.test_interrupt_main_mainthreadcC�|�tj�|�tj�dSr")r�r��SIGINT�SIGTERMrrrr�'test_interrupt_main_with_signal_handlerh�z:InterruptMainTests.test_interrupt_main_with_signal_handlercCr�r")r�r�r�r�rrrr�test_interrupt_main_noerrorlr�z.InterruptMainTests.test_interrupt_main_noerrorcCs6|�ttjd�|�ttjtj�|�ttjd�dS)Nr�i@B)r�rrrr�r��NSIGrrrr�"test_interrupt_main_invalid_signalpsz5InterruptMainTests.test_interrupt_main_invalid_signalcCsjdg}dg}dg}dd�}tj||||fd�}|��|ds$	|drd|d<|��|�|d�dS)NTFcSs<d}d|d<|dr|r|d8}ndS	|ds
d|d<dS)Ni�Trrr)r�cont�interruptedZ
iterationsrrr�worker{s
�zAInterruptMainTests.test_can_interrupt_tight_loops.<locals>.worker)rCr�r)rr+r`rer�)rr�rr�r�rVrrr�test_can_interrupt_tight_loopsus�z1InterruptMainTests.test_can_interrupt_tight_loopsN)
r$r%r&r�r�r�r�r�r�r�rZreap_threadsr�rrrrr�?s

r�c@r-)�AtexitTestscCs.tdd�\}}}|�|�|�|��d�dS)Nr�z�if True:
            import threading

            def run_last():
                print('parrot')

            threading._register_atexit(run_last)
        sparrot)rrfrIr�r�rrr�test_atexit_output�s
	zAtexitTests.test_atexit_outputcCstdd�\}}}|�|�dS)Nr�aNif True:
            import threading
            from unittest.mock import Mock

            mock = Mock()
            threading._register_atexit(mock)
            mock.assert_not_called()
            # force early shutdown to ensure it was called once
            threading._shutdown()
            mock.assert_called_once()
        rIr�rrr�test_atexit_called_once�sz#AtexitTests.test_atexit_called_oncecCs.tdd�\}}}|�|�|�d|���dS)Nr�z�if True:
            import threading

            def func():
                pass

            def run_last():
                threading._register_atexit(func)

            threading._register_atexit(run_last)
        z2RuntimeError: can't register atexit after shutdown)rr�r�r�r�rrr�test_atexit_after_shutdown�s

�z&AtexitTests.test_atexit_after_shutdownN)r$r%r&r�r�r�rrrrr��s
r��__main__)=�__doc__Ztest.supportr;rrrrZtest.support.import_helperrZtest.support.script_helperrrr1r�rrrr3r}r�r�rnr�r�r�r	r
rr>rar?rrJrr+r(ZTestCaser7r>r@rTrbr�r�r�r�Z
RLockTestsr�r<r�r�r�r�r�r�r�r�r�r�r�r$r�rrrr�<module>st

!	j,b2s
P
2�