HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.10/test/test_importlib/__pycache__/test_threaded_import.cpython-310.pyc
o

�i�%�@s�ddlZddlZddlZddlZddlZddlZddlZddlZddlm	Z	ddl
mZddlm
Z
ddlmZmZmZddl
mZmZdd�Zd	d
�Zddd
dd�ZGdd�d�ZGdd�d�ZGdd�dej�Zdd�Zedkrxe��dSdS)�N)�mock)�verbose)�forget)�TESTFN�unlink�rmtree)�
script_helper�threading_helperc	
Cs�zRzt|�drddl}ddl}nddl}ddl}|�dd�}Wnty:}z|�|�d��WYd}~nd}~wwW|�t���t|�|k}|rQ|�	�dSdS|�t���t|�|k}|rg|�	�ww)N�r��)
�len�modulefinder�random�	randrange�	Exception�append�with_traceback�	threading�	get_ident�set)	�N�done�
done_tasks�errorsrr�x�e�finished�r�E/usr/local/lib/python3.10/test/test_importlib/test_threaded_import.py�tasks,
�����
�r cCstjddd�|�S)Nzos.register_at_forkT)Zcreate)rZpatch)�funcrrr�mock_register_at_fork*sr"zaif 1:
        import time
        time.sleep(%(delay)s)
        x = 'a'
        import C
        zaif 1:
        import time
        time.sleep(%(delay)s)
        x = 'b'
        import D
        zimport Bzimport A)�A�B�C�Dc@s"eZdZdZdd�Zddd�ZdS)�FinderzIA dummy finder to detect concurrent access to its find_spec()
    method.cCsd|_d|_t��|_dS�Nr)�numcallsrr�Lock�lock��selfrrr�__init__HszFinder.__init__NcCs^t��sJ�|j�|jd7_Wd�n1swY|j}t�d�|d|_dS)Nrg{�G�z�?)�imp�	lock_heldr+r)r�time�sleep)r-�name�path�targetrrrr�	find_specMs�
zFinder.find_spec�NN)�__name__�
__module__�__qualname__�__doc__r.r6rrrrr'Dsr'c@seZdZdZddd�ZdS)�FlushingFinderzMA dummy finder which flushes sys.path_importer_cache when it gets
    called.NcCstj��dS�N)�sys�path_importer_cache�clear)r-r3r4r5rrrr6\szFlushingFinder.find_specr7)r8r9r:r;r6rrrrr<Xsr<c@sleZdZdd�Zdd�Zedd��Zdd�Zd	d
�Zdd�Z	d
d�Z
dd�Zedd��Zdd�Z
dd�ZdS)�ThreadedImportTestscCstj�dd�|_dS�Nr)r>�modules�pop�
old_randomr,rrr�setUpbszThreadedImportTests.setUpcCs|jdur
|jtjd<dSdSrB)rEr>rCr,rrr�tearDownes
�zThreadedImportTests.tearDownc
s"t��r	t�d��t���dD]�trtd�ddd�dD]}ztj	|=Wqt
y.Yqwg�g����t�
�}t�����fdd	�t��D���Wd�n1sWwY��d
�}t�
�|}trttd|dd
dd�dt���f}|��|�|�||�tr�td�qdS)Nz"can't run when import lock is held)��2rHrIrHrIZTryingzthreads ...� )�end)rrc3s&�|]}tjt����fd�VqdS))r5�argsN)r�Threadr )�.0�i�rrrrrr�	<genexpr>�s
��
�zAThreadedImportTests.check_parallel_module_init.<locals>.<genexpr>iXz%.1f msg@�@T)�flushrKzdone: %s/%szOK.)r/r0�unittestZSkipTestr�Eventr�printr>rC�KeyErrorr@r1�	monotonicr	Z
start_threads�range�waitr
�assertFalseZ
assertTrue)r-�mock_os�modname�t0Z	completedZdtZdbg_inforrPr�check_parallel_module_initls@
�
��
��z.ThreadedImportTests.check_parallel_module_initcCs|��dSr=)r^r,rrr�test_parallel_module_init�sz-ThreadedImportTests.test_parallel_module_initc	Cs\t�}tj�d|�z|��|�|jd�|�|j|j�Wtj�	|�dStj�	|�wr()
r'r>�	meta_path�insertr^�
assertGreaterr)�assertEqualr�remove)r-�finderrrr�test_parallel_meta_path�sz+ThreadedImportTests.test_parallel_meta_pathc	s�t��t�}�fdd�}tj�d|�tj�|�z'|�d�|��}|�	�j
d�|��j�j
�Wtj�
|�tj�
|�dStj�
|�tj�
|�w)Ncs��d�t�)N�)r6�ImportError)r4�rerr�	path_hook�s
z?ThreadedImportTests.test_parallel_path_hooks.<locals>.path_hookrrg)r'r<r>�
path_hooksrar`rr6r^rbr)rcrrd)r-Zflushing_finderrjZnumtestsrrir�test_parallel_path_hooks�s
�z,ThreadedImportTests.test_parallel_path_hookscCs<ztjd=Wn	tyYnwddl}|�|jjj�dS)Nz+test.test_importlib.threaded_import_hangersr)r>rCrVZ+test.test_importlib.threaded_import_hangersrZZtest_importlibZthreaded_import_hangersr)r-�testrrr�test_import_hangers�s�z'ThreadedImportTests.test_import_hangersc		s d}t�t�|�tjt�tj�dt�|�tjj	t�t
��D]4\}}|d|i}ttj�
t|d�d��}|�|�d��Wd�n1sJwY|�t|�q!t��g��fdd�}�fd	d
�}tj|d�}tj|d�}|��|��|�
�|�
�|�t��dd
h�dS)Ng�?r�delay�.py�wb�utf-8c�ddl}��t|dd��dS�Nrr)r#r�getattr)r#��resultsrr�	import_ab��z<ThreadedImportTests.test_circular_imports.<locals>.import_abcrsrt)r$rru)r$rvrr�	import_ba�ryz<ThreadedImportTests.test_circular_imports.<locals>.import_ba)r5�a�b)�os�mkdirr�
addCleanup�shutilrr>r4rard�circular_imports_modules�items�open�join�write�encoder�	importlib�invalidate_cachesrrM�startrcr)	r-ror3�contents�frxrz�t1�t2rrvr�test_circular_imports�s,
�z)ThreadedImportTests.test_circular_importscCs�d}tj�dtj�|�tjjtj�td}t|d��}|�	|�
d��Wd�n1s/wY|�t|�|�tt�|�t
d�t��tt�tjt=dS)Nz�if 1:
            import threading
            def target():
                import random
            t = threading.Thread(target=target)
            t.start()
            t.join()
            t = Nonerrprqrr�__pycache__)r>r4rar}�curdirrrdrr�r�r�rrrr�r��
__import__rC)r-r[�code�filenamer�rrr�test_side_effect_import�s�z+ThreadedImportTests.test_side_effect_importcC�&tj�tj�t�dd�}t�|�dS)N�partialzcfimport.py�r}r4r��dirname�__file__rZassert_python_ok�r-�fnrrr�'test_concurrent_futures_circular_import���z;ThreadedImportTests.test_concurrent_futures_circular_importcCr�)Nr�zpool_in_threads.pyr�r�rrr�)test_multiprocessing_pool_circular_import�r�z=ThreadedImportTests.test_multiprocessing_pool_circular_importN)r8r9r:rFrGr"r^r_rfrlrnr�r�r�r�rrrrrA`s
!

&
rAcCsXt��}tjtjg|�R�zt��}t�tj|�t�d�WdSty+YdSw)Ng�h㈵��>)	r	Zthreading_setuprSZaddModuleCleanupZthreading_cleanupr>�getswitchinterval�setswitchinterval�AttributeError)�thread_infoZold_switchintervalrrr�setUpModules�r��__main__)�_impr/r}r�r>r1r�rrSrZtest.supportrZtest.support.import_helperrZtest.support.os_helperrrrrr	r r"r�r'r<ZTestCaserAr�r8�mainrrrr�<module>s8
�&�