HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/__pycache__/test_threadsignals.cpython-37.opt-1.pyc
B

��g^(�@s�dZddlZddlZddlZddlZddlmZddlZddl	Z	ej
dd�dkrbe�dej
��e��Z
e��Zejjdko�ejjdkZd	d
�Zdd�Zd
d�ZGdd�dej�Zdd�Zedkr�e�dS)z6PyUnit testing that threads honor our signal semantics�N)�support��winzCan't test signal on %sZpthreadz
mutex+condcCs4t�tj|�}t�tj|�}t�tj|�}|||fS)N)�signal�SIGUSR1�SIGUSR2�SIGALRM)Zfor_usr1Zfor_usr2Zfor_alrmZusr1Zusr2Zalrm�r	�3/usr/local/lib/python3.7/test/test_threadsignals.py�registerSignalssrcCs(t|dd7<t��t|d<dS)N�tripped��
tripped_by)�signal_blackboard�thread�	get_ident)�sig�framer	r	r
�handle_signalssrcCs(t�ttj�t�ttj�t��dS)N)�os�kill�process_pidrrr�
signalled_all�releaser	r	r	r
�send_signals"src@s�eZdZdd�Zdd�Zdd�Ze�ed�e�e	j
�d�oBe	jj
d	�e�e	j
�d
�d�dd
����Ze�ed�e�e	j
�d�o�e	jj
d	�e�e	j
�d
�d�dd����Zdd�Zdd�Zdd�Zdd�ZdS)�
ThreadSignalsc	Cs�t���t��|��t��WdQRXttjddksPttjddkrtzt�	d�t�
�Wdt�	d�X|�ttjdd�|�ttjdt�
��|�ttjdd�|�ttjdt�
��t��dS)Nrrr
r)r�wait_threads_exitr�acquire�spawnSignallingThreadrrrr�alarm�pauseZassertEqualrrr)�selfr	r	r
�test_signals)s"



zThreadSignals.test_signalscCst�td�dS)Nr	)r�start_new_threadr)r!r	r	r
rIsz#ThreadSignals.spawnSignallingThreadcCst�dS)N)�KeyboardInterrupt)r!rrr	r	r
�alarm_interruptLszThreadSignals.alarm_interruptz/POSIX condition variables cannot be interruptedZlinuxzBIssue 34004: musl does not allow interruption of locks by signals.Zopenbsdz%lock cannot be interrupted on OpenBSDc
Cs�t�tj|j�}zPt��}|��t�d�t��}|j	t
|jdd�t��|}|�|d�Wdt�d�t�tj|�XdS)Nr
�)�timeoutg@r)rrr%r�
allocate_lockrr�time�	monotonic�assertRaisesr$�
assertLess)r!�oldalrm�lock�t1�dtr	r	r
�test_lock_acquire_interruptionOs

z,ThreadSignals.test_lock_acquire_interruptionc
s�t�tj|j�}z�t����fdd�}t���rt�|d�x"�jdd�r^��	�t
�d�q>Wt�d�t
�
�}|jt�jdd	�t
�
�|}|�|d
�WdQRXWdt�d�t�tj|�XdS)Ncs���dS)N)rr	)�rlockr	r
�other_thread�szCThreadSignals.test_rlock_acquire_interruption.<locals>.other_threadr	F)�blockingg{�G�z�?r
r&)r'g@r)rrr%r�RLockrrr#rrr)�sleeprr*r+r$r,)r!r-r3r/r0r	)r2r
�test_rlock_acquire_interruptionns 


z-ThreadSignals.test_rlock_acquire_interruptionc
s�d�_�fdd�}t�tj|�}zr�fdd�}t���Tt�|d�x"�jdd�rf���t	�
d�qFW���}���j���|�WdQRXWdt�tj|�XdS)	NFcs
d�_dS)NT)�	sig_recvd)rr)r!r	r
�
my_handler�sz9ThreadSignals.acquire_retries_on_intr.<locals>.my_handlercs6���t�d�t�ttj�t�d����dS)Ng�?)	rr)r6rrrrrrr	)r.r	r
r3�s


z;ThreadSignals.acquire_retries_on_intr.<locals>.other_threadr	)r4g{�G�z�?)r8rrrrrr#rrr)r6Z
assertTrue)r!r.r9�old_handlerr3�resultr	)r.r!r
�acquire_retries_on_intr�s
z%ThreadSignals.acquire_retries_on_intrcCs|�t���dS)N)r<rr()r!r	r	r
�!test_lock_acquire_retries_on_intr�sz/ThreadSignals.test_lock_acquire_retries_on_intrcCs|�t���dS)N)r<rr5)r!r	r	r
�"test_rlock_acquire_retries_on_intr�sz0ThreadSignals.test_rlock_acquire_retries_on_intrc
s�d�_d�_d�_t������t�������fdd�}t�tj|�}z���fdd�}�fdd�}t�	��Vt�
|d�|�������j�jd	����j�jd
����jd�WdQRXWdt�tj|�XdS)Nrcs�jd7_dS)Nr
)�
sigs_recvd)Zsignumr)r!r	r
r9�sz@ThreadSignals.test_interrupted_timed_acquire.<locals>.my_handlercs$t���_�jdd�t���_dS)Ng�?)r')r)r*�startr�endr	)r.r!r	r
�
timed_acquire�s
zCThreadSignals.test_interrupted_timed_acquire.<locals>.timed_acquirecs6x(td�D]}t�d�t�ttj�q
W���dS)N�(g{�G�z�?)	�ranger)r6rrrrrr)�_)�doner	r
r�s
zBThreadSignals.test_interrupted_timed_acquire.<locals>.send_signalsr	g@g333333�?)
r@rAr?rr(rrrrrr#r,Z
assertGreater)r!r9r:rBrr	)rFr.r!r
�test_interrupted_timed_acquire�s(
z,ThreadSignals.test_interrupted_timed_acquireN)�__name__�
__module__�__qualname__r"rr%�unittestZskipIf�USING_PTHREAD_COND�sys�platform�
startswith�thread_info�versionr1r7r<r=r>rGr	r	r	r
r's*  rcCsRtjddd�tjddd�tjddd�iatttt�}zt�t	�Wdt|�XdS)Nr)rr)
rrrrrrrrZrun_unittestr)Zoldsigsr	r	r
�	test_main�srR�__main__)�__doc__rKrrrMZtestr�_threadrr)rNZSkipTest�getpidrr(rrP�namer.rLrrrZTestCaserrRrHr	r	r	r
�<module>s*	C