HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/__pycache__/test_threaded_import.cpython-37.pyc
B

��g�#�@s�ddlZddlZddlZddlZddlZddlZddlZddlZddlm	Z	ddl
mZmZm
Z
mZmZmZmZmZmZdd�Zdd�Zdd	d
dd�ZGd
d�d�ZGdd�d�ZGdd�dej�Zedd��Zedkr�e�dS)�N)�mock)	�verbose�
import_module�run_unittest�TESTFN�reap_threads�forget�unlink�rmtree�
start_threadsc	
Cs�zvy>t|�dr"ddl}ddl}nddl}ddl}|�dd�}Wn2tk
rr}z|�|�d��Wdd}~XYnXWd|�t���t|�|k}|r�|�	�XdS)N�r��)
�len�modulefinder�randomZ	randrange�	Exception�append�with_traceback�	threading�	get_ident�set)	�N�done�
done_tasks�errorsrr�x�eZfinished�r�5/usr/local/lib/python3.7/test/test_threaded_import.py�tasks
&r cCstjddd�|�S)Nzos.register_at_forkT)Zcreate)rZpatch)�funcrrr�mock_register_at_fork)sr"zaif 1:
        import time
        time.sleep(%(delay)s)
        x = 'a'
        import C
        zaif 1:
        import time
        time.sleep(%(delay)s)
        x = 'b'
        import D
        zimport Bzimport A)�A�B�C�Dc@s"eZdZdZdd�Zddd�ZdS)�FinderzIA dummy finder to detect concurrent access to its find_spec()
    method.cCsd|_d|_t��|_dS)Nr)�numcallsrrZLock�lock)�selfrrr�__init__GszFinder.__init__Nc	CsJt��st�|j�|jd7_WdQRX|j}t�d�|d|_dS)Nr
g{�G�z�?)�imp�	lock_held�AssertionErrorr)r(r�timeZsleep)r*�name�path�targetrrrr�	find_specLs
zFinder.find_spec)NN)�__name__�
__module__�__qualname__�__doc__r+r3rrrrr'Csr'c@seZdZdZddd�ZdS)�FlushingFinderzMA dummy finder which flushes sys.path_importer_cache when it gets
    called.NcCstj��dS)N)�sys�path_importer_cache�clear)r*r0r1r2rrrr3[szFlushingFinder.find_spec)NN)r4r5r6r7r3rrrrr8Wsr8c@s\eZdZdd�Zdd�Zedd��Zdd�Zd	d
�Zdd�Z	d
d�Z
dd�Zedd��ZdS)�ThreadedImportTestscCstj�dd�|_dS)Nr)r9�modules�pop�
old_random)r*rrr�setUpaszThreadedImportTests.setUpcCs|jdk	r|jtjd<dS)Nr)r?r9r=)r*rrr�tearDownds
zThreadedImportTests.tearDownc
st��rt�d��t���x�dD]�tr8td�ddd�x.dD]&}ytj	|=Wq>t
k
rbYq>Xq>Wg�g����t�
�}t����fdd	�t��D���WdQRX��d
�}t�
�|}tr�td|dd
dd�dt���f}|��|�|�||�tr td�q WdS)Nz"can't run when import lock is held)��2rBrCrBrCZTryingzthreads ...� )�end)rrc3s$|]}tjt����fd�VqdS))r2�argsN)r�Threadr )�.0�i)rrrrrr�	<genexpr>szAThreadedImportTests.check_parallel_module_init.<locals>.<genexpr>iXz%.1f msg@�@T)�flushrEzdone: %s/%szOK.)r,r-�unittestZSkipTestrZEventr�printr9r=�KeyErrorr;r/Z	monotonicr�range�waitr�assertFalseZ
assertTrue)r*�mock_os�modnameZt0Z	completedZdtZdbg_infor)rrrrr�check_parallel_module_initks6





z.ThreadedImportTests.check_parallel_module_initcCs|��dS)N)rT)r*rrr�test_parallel_module_init�sz-ThreadedImportTests.test_parallel_module_initc	CsRt�}tj�d|�z*|��|�|jd�|�|j|j�Wdtj�	|�XdS)Nr)
r'r9�	meta_path�insertrT�
assertGreaterr(�assertEqualr�remove)r*�finderrrr�test_parallel_meta_path�sz+ThreadedImportTests.test_parallel_meta_pathc	s�t��t�}�fdd�}tj�d|�tj�|�z4|�d�|��}|�	�j
d�|��j�j
�Wdtj�
|�tj�
|�XdS)Ncs��d�t�dS)N�)r3�ImportError)r1)r[rr�	path_hook�s
z?ThreadedImportTests.test_parallel_path_hooks.<locals>.path_hookrr])r'r8r9�
path_hooksrWrVrr3rTrXr(rYrrZ)r*Zflushing_finderr_Znumtestsr)r[r�test_parallel_path_hooks�s
z,ThreadedImportTests.test_parallel_path_hookscCs<ytjd=Wntk
r YnXddl}|�|jj�dS)Nztest.threaded_import_hangersr)r9r=rNZtest.threaded_import_hangersrQZthreaded_import_hangersr)r*Ztestrrr�test_import_hangers�sz'ThreadedImportTests.test_import_hangersc	
sd}t�t�|�tjt�tj�dt�|�tjj	t�x`t
��D]T\}}|d|i}ttj�
t|d�d��}|�|�d��WdQRX|�t|�qDWt��g��fdd�}�fd	d
�}tj|d�}tj|d�}|��|��|�
�|�
�|�t��dd
h�dS)Ng�?r�delayz.py�wbzutf-8csddl}��t|dd��dS)Nrr)r#r�getattr)r#)�resultsrr�	import_ab�sz<ThreadedImportTests.test_circular_imports.<locals>.import_abcsddl}��t|dd��dS)Nrr)r$rre)r$)rfrr�	import_ba�sz<ThreadedImportTests.test_circular_imports.<locals>.import_ba)r2�a�b)�os�mkdirr�
addCleanup�shutilr
r9r1rWrZ�circular_imports_modules�items�open�join�write�encoder�	importlib�invalidate_cachesrrG�startrYr)	r*rcr0�contents�frgrhZt1Zt2r)rfr�test_circular_imports�s*
z)ThreadedImportTests.test_circular_importsc	Cs�d}tj�dtj�|�tjjtj�td}t|d��}|�	|�
d��WdQRX|�t|�|�tt�|�t
d�t��tt�tjt=dS)Nz�if 1:
            import threading
            def target():
                import random
            t = threading.Thread(target=target)
            t.start()
            t.join()
            t = Nonerz.pyrdzutf-8�__pycache__)r9r1rWrk�curdirrmrZrrqrsrtr	rr
rurv�
__import__r=)r*rR�code�filenameryrrr�test_side_effect_import�s	z+ThreadedImportTests.test_side_effect_importN)
r4r5r6r@rAr"rTrUr\rarbrzr�rrrrr<_s"

&r<c	CsVd}yt��}t�d�Wntk
r.YnXztt�Wd|dk	rPt�|�XdS)Ng�h㈵��>)r9�getswitchinterval�setswitchinterval�AttributeErrorrr<)Zold_switchintervalrrr�	test_main�sr��__main__)�_impr,rkrur9r/rnrrLrZtest.supportrrrrrrr	r
rr r"ror'r8ZTestCaser<r�r4rrrr�<module>s,,