HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.10/test/__pycache__/_test_eintr.cpython-310.opt-2.pyc
o

�i�F�@s�	ddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlm
Z
ddlmZddlmZejdd��Ze�eed�d�Gd	d
�d
ej��Ze�eed�d�Gdd�de��Ze�eed�d�Gd
d�de��Ze�eed�d�Gdd�de��Ze�eed�d�e�eed�d�Gdd�de���Ze�eed�d�Gdd�de��ZGdd�de�Zedkr�e��dSdS)�N)�support)�	os_helper)�
socket_helperccsJ�	|�z|VWn|���Wd�dS1swYdS�N)�kill)�proc�r�-/usr/local/lib/python3.10/test/_test_eintr.py�
kill_on_errors�
�"�r
�	setitimerzrequires setitimer()c@sFeZdZ	dZdZdZdd�Zdd�Zedd��Z	d	d
�Z
dd�Zd
S)�
EINTRBaseTestg�������?g�������?cCs|jd7_dS�N�)�signals)�self�signum�framerrr	�
sighandler3�zEINTRBaseTest.sighandlercCsBd|_t�tj|j�|_t�tj|j|j�t	j
ddtjd�dS)NriXT)�exit�file)
r�signal�SIGALRMr�orig_handlerr�ITIMER_REAL�signal_delay�
signal_period�faulthandlerZdump_traceback_later�sys�
__stderr__�rrrr	�setUp6s�
�zEINTRBaseTest.setUpcCst�tjdd�dS�Nr)rrrrrrr	�
stop_alarmAszEINTRBaseTest.stop_alarmcCs$|��t�tj|j�t��dSr)r#rrrrZcancel_dump_traceback_laterr rrr	�tearDownEszEINTRBaseTest.tearDowncOs tjdf|}tj|fi|��S)Nz-c)r�
executable�
subprocess�Popen)r�args�kwZcmd_argsrrr	r&JszEINTRBaseTest.subprocessN)�__name__�
__module__�__qualname__rr�
sleep_timerr!�staticmethodr#r$r&rrrr	r's
rc@szeZdZ	dd�Zdd�Zdd�Ze�ee	d�d�d	d
��Z
dd�Zd
d�Ze�ee	d�d�dd��Z
dd�Zdd�ZdS)�OSEINTRTestcCsd|j}|�|�S)Nzimport time; time.sleep(%r))r-r&)r�coderrr	�new_sleep_processSs

zOSEINTRTest.new_sleep_processcsDd}�fdd�t|�D�}t|�D]}|�q|D]}|��qdS)N�csg|]}����qSr)r1)�.0�_r rr	�
<listcomp>Ysz3OSEINTRTest._test_wait_multiple.<locals>.<listcomp>)�range�wait)r�	wait_func�num�	processesr4rrr r	�_test_wait_multipleWs
�zOSEINTRTest._test_wait_multiplecCs|�tj�dSr)r;�osr7r rrr	�	test_wait`�zOSEINTRTest.test_wait�wait3zrequires wait3()cC�|�dd��dS)NcSs
t�d�Sr")r<r?rrrr	�<lambda>es
z(OSEINTRTest.test_wait3.<locals>.<lambda>)r;r rrr	�
test_wait3c�zOSEINTRTest.test_wait3cCs|��}||j�|��dSr)r1�pidr7)rr8rrrr	�_test_wait_singlegs
zOSEINTRTest._test_wait_singlecCr@)NcS�t�|d�Sr")r<�waitpid�rDrrr	rAn�z*OSEINTRTest.test_waitpid.<locals>.<lambda>�rEr rrr	�test_waitpidmrzOSEINTRTest.test_waitpid�wait4zrequires wait4()cCr@)NcSrFr")r<rLrHrrr	rArrIz(OSEINTRTest.test_wait4.<locals>.<lambda>rJr rrr	�
test_wait4prCzOSEINTRTest.test_wait4cCs�t��\}}|�tj|�gd�}d�dddd|d|jddd	d
df
�}|j|t|�|gd�}t|��'t�|�|D]}|�	|t�
|t|���q<|�	|��d
�Wd�dS1s^wYdS)N)shellosworldsspam�
zimport os, sys, time�zwr = int(sys.argv[1])�
datas = %r�sleep_time = %rzfor data in datas:z$    # let the parent block on read()�    time.sleep(sleep_time)z    os.write(wr, data)��pass_fdsr)
r<�pipe�
addCleanup�close�joinr-r&�strr
�assertEqual�read�lenr7)r�rd�wr�datasr0r�datarrr	�	test_readts,�

"�zOSEINTRTest.test_readcCs�t��\}}|�tj|�dtj}d�dddd|jdtjddd	d
dddd
ddddddf�}|j|t	|�|gd�}t
|��2t�|�d}|t|�kra|t�|t
|�|d��7}|t|�ksM|�|��d�Wd�dS1stwYdS)N�xrNzimport io, os, sys, timerOzrd = int(sys.argv[1])rQzdata = b"x" * %s�data_len = len(data)z!# let the parent block on write()�time.sleep(sleep_time)zread_data = io.BytesIO()z+while len(read_data.getvalue()) < data_len:z%    chunk = os.read(rd, 2 * data_len)z    read_data.write(chunk)zvalue = read_data.getvalue()zif value != data:z0    raise Exception("read error: %s vs %s bytes"z-                    % (len(value), data_len))rSr)r<rUrVrWrZ
PIPE_MAX_SIZErXr-r&rYr
r\�write�
memoryviewrZr7)rr]r^r`r0r�writtenrrr	�
test_write�sB
�

�"�zOSEINTRTest.test_writeN)r*r+r,r1r;r=�unittest�
skipUnless�hasattrr<rBrErKrMrarhrrrr	r/Os	

r/c@s�eZdZ	e�eed�d�dd��Zdd�Ze�eejd�d�d	d
��Z	dd�Z
d
d�Zdd�Ze�eejd�d�dd��Z
dd�Ze�dd�e�eed�d�dd���Zdd�Ze�ejdkd �d!d"��Zd#d$�Ze�ejdkd �d%d&��Zd'S)(�SocketEINTRTest�
socketpairzneeds socketpair()c	Cs�t��\}}|�|j�gd�}d�ddddt|j�dt|j�d|d	|jdd
dddd
dddf�}|�	�}|j
|t|�|gd�}t|��%|��|D]
}|�
|||t|���qN|�
|��d�Wd�dS1sowYdS)N)rb�y�zrN�import os, socket, sys, timerO�fd = int(sys.argv[1])�family = %s�sock_type = %srPrQz)wr = socket.fromfd(fd, family, sock_type)�os.close(fd)zwith wr:z    for data in datas:z(        # let the parent block on recv()z        time.sleep(sleep_time)z        wr.sendall(data)rSr)�socketrmrVrWrX�int�family�typer-�filenor&rYr
rZr\r7)	rZ	recv_funcr]r^r_r0�fdrr`rrr	�
_test_recv�s:�
"�zSocketEINTRTest._test_recvcC�|�tjj�dSr)r{ru�recvr rrr	�	test_recv�rzSocketEINTRTest.test_recv�recvmsgzneeds recvmsg()cCr@)NcSs|�|�dSr")r��sockr`rrr	rA�sz.SocketEINTRTest.test_recvmsg.<locals>.<lambda>)r{r rrr	�test_recvmsg�rCzSocketEINTRTest.test_recvmsgc
Cs2t��\}}|�|j�dtjd}d�ddddt|j�dt|j	�d	|j
d
tjddddd
dddddddddddddf�}|��}|j|t
|�|gd�}t|��:|��d}|t|�kr||t|�|d��}	||	durvt|�n|	7}|t|�ksb|�|��d�Wd�dS1s�wYdS)Nsxyzr2rNrprOrqrrrsrQzdata = b"xyz" * %srcz)rd = socket.fromfd(fd, family, sock_type)rtzwith rd:z$    # let the parent block on send()rRz'    received_data = bytearray(data_len)z	    n = 0z    while n < data_len:z8        n += rd.recv_into(memoryview(received_data)[n:])zif received_data != data:z0    raise Exception("recv error: %s vs %s bytes"z5                    % (len(received_data), data_len))rSr)rurmrVrWrZ
SOCK_MAX_SIZErXrvrwrxr-ryr&rYr
r\rfrZr7)
rZ	send_funcr]r^r`r0rzrrg�sentrrr	�
_test_send�sP�
�"�zSocketEINTRTest._test_sendcCr|r)r�ru�sendr rrr	�	test_sendrzSocketEINTRTest.test_sendcCr|r)r�ru�sendallr rrr	�test_sendallrzSocketEINTRTest.test_sendall�sendmsgzneeds sendmsg()cCr@)NcSs|�|g�Sr)r�r�rrr	rArIz.SocketEINTRTest.test_sendmsg.<locals>.<lambda>)r�r rrr	�test_sendmsgrCzSocketEINTRTest.test_sendmsgcCs�t�tjdf�}|�|j�|��d}d�dddtjd|d|jdd	d
ddf
�}|�	|�}t
|��|��\}}|��|�|�
�d�Wd�dS1sRwYdS)
NrrrNzimport socket, timerOz	host = %rz	port = %srQz# let parent block on accept()rdz,with socket.create_connection((host, port)):rR)ru�
create_serverrZHOSTrVrW�getsocknamerXr-r&r
�acceptrZr7)rr��portr0rZclient_sockr4rrr	�test_accepts*�

"�zSocketEINTRTest.test_accept�
r2�mkfifozneeds mkfifo()cCs�tj}t�|�zt�|�Wnty(}z
|�d|�WYd}~nd}~ww|�tj|�d�ddd|d|j	dddd|f	�}|�
|�}t|��||�|�|�
�d	�Wd�dS1sdwYdS)
Nzos.mkfifo(): %srN�import os, timerOz	path = %arQz# let the parent blockrdr)r�TESTFN�unlinkr<r��PermissionErrorZskipTestrVrXr-r&r
rZr7)rZdo_open_close_readerZdo_open_close_writer�filename�er0rrrr	�
_test_open;s2
���

"�zSocketEINTRTest._test_opencCst|d�}|��dS)N�w)�openrW)r�path�fprrr	�python_openZs
zSocketEINTRTest.python_open�darwinz+hangs under macOS; see bpo-25234, bpo-35363cC�|�d|j�dS)Nzfp = open(path, 'r')
fp.close())r�r�r rrr	�	test_open^��zSocketEINTRTest.test_opencCst�|tj�}t�|�dSr)r<r��O_WRONLYrW)rr�rzrrr	�os_opendszSocketEINTRTest.os_opencCr�)Nz,fd = os.open(path, os.O_RDONLY)
os.close(fd))r�r�r rrr	�test_os_openhr�zSocketEINTRTest.test_os_openN)r*r+r,rirjrkrur{r~r�r�r�r�r�r�rZrequires_freebsd_versionr<r�r��skipIfr�platformr�r�r�rrrr	rl�s4
#
.

�
�rlc@seZdZ	dd�ZdS)�
TimeEINTRTestcCs:t��}t�|j�|��t��|}|�||j�dSr)�time�	monotonic�sleepr-r#�assertGreaterEqual�r�t0�dtrrr	�
test_sleepss
zTimeEINTRTest.test_sleepN)r*r+r,r�rrrr	r�osr��pthread_sigmaskzneed signal.pthread_sigmask()c@sJeZdZ	dd�Ze�eed�d�dd��Ze�eed�d�dd	��Z	d
S)�SignalEINTRTestc
Cs�tj}t��}t�|dd��}|�tj||�d�ddt��dt|�d|jdd	f�}t�tj	|g�}|�tjtj
|g�t��}|�
|�}t|��||�t��|}	Wd�n1sawY|�|��d
�dS)NcWsdSrr)r(rrr	rA�sz/SignalEINTRTest.check_sigwait.<locals>.<lambda>rNr�zpid = %szsignum = %srQrdzos.kill(pid, signum)r)r�SIGUSR1r<�getpidrVrXrvr-r��	SIG_BLOCK�SIG_UNBLOCKr�r�r&r
rZr7)
rr8rrDZold_handlerr0Zold_maskr�rr�rrr	�
check_sigwait�s*

�	

�zSignalEINTRTest.check_sigwait�sigwaitinfozneed signal.sigwaitinfo()cC�dd�}|�|�dS)NcSst�|g�dSr)rr��rrrr	r8�r>z3SignalEINTRTest.test_sigwaitinfo.<locals>.wait_func�r��rr8rrr	�test_sigwaitinfo��z SignalEINTRTest.test_sigwaitinfo�sigtimedwaitcCr�)NcSst�|gd�dS)Ng^@)rr�r�rrr	r8�rz4SignalEINTRTest.test_sigtimedwait.<locals>.wait_funcr�r�rrr	�test_sigtimedwait�r�z!SignalEINTRTest.test_sigtimedwaitN)
r*r+r,r�rirjrkrr�r�rrrr	r�{s�
�r�c@s�eZdZ	dd�Ze�ejdkd�e�e	e
d�d�dd���Ze�e	e
d	�d
�dd��Ze�e	e
d
�d�dd��Z
e�e	e
d�d�dd��ZdS)�SelectEINTRTestcCs@t��}t�ggg|j�t��|}|��|�||j�dSr)r�r��selectr-r#r�r�rrr	�test_select�s
zSelectEINTRTest.test_selectr�z(poll may fail on macOS; see issue #28087�pollzneed select.pollcCsFt��}t��}|�|jd�t��|}|��|�||j�dS�Ng@�@)r�r�r�r�r-r#r��rZpollerr�r�rrr	�	test_poll�szSelectEINTRTest.test_poll�epollzneed select.epollcCsNt��}|�|j�t��}|�|j�t��|}|��|�	||j�dSr)
r�r�rVrWr�r�r�r-r#r�r�rrr	�
test_epoll�szSelectEINTRTest.test_epoll�kqueuezneed select.kqueuecCsRt��}|�|j�t��}|�dd|j�t��|}|��|�	||j�dSr
)
r�r�rVrWr�r�Zcontrolr-r#r�)rr�r�r�rrr	�test_kqueue��zSelectEINTRTest.test_kqueue�devpollzneed select.devpollcCsRt��}|�|j�t��}|�|jd�t��|}|��|�	||j�dSr�)
r�r�rVrWr�r�r�r-r#r�r�rrr	�test_devpoll�r�zSelectEINTRTest.test_devpollN)r*r+r,r�rir�rr�rjrkr�r�r�r�r�rrrr	r��s�	



r�c@s8eZdZdd�Ze�e��dkd�dd��Zdd�Z	d	S)
�
FNTLEINTRTestc
Cs8|�tjtj�d�ddtjd|d|jf�}t��}|�|�}t	|��lt
tjd��Q}	t��|}|dkr?td	|��z||tj
tjB�||tj�t�d
�Wn	ty^Ynwq/||tj
�t��|}|�||j�|��Wd�n1s�wY|��Wd�dS1s�wYdS)NrNzimport fcntl, timezwith open('%s', 'wb') as f:z   fcntl.%s(f, fcntl.LOCK_EX)z   time.sleep(%s)�wbTgN@z failed to sync child in %.1f secg{�G�z�?)rVrr�r�rXr-r�r�r&r
r��	Exception�fcntl�LOCK_EX�LOCK_NB�LOCK_UNr��BlockingIOErrorr�r#r7)rZ	lock_funcZ	lock_namer0Z
start_timer�fr�rrr	�_lock�s<�

��

�
"�zFNTLEINTRTest._lockZAIXzAIX returns PermissionErrorcC�|�tjd�dS)N�lockf)r�r�r�r rrr	�
test_lockf	rCzFNTLEINTRTest.test_lockfcCr�)N�flock)r�r�r�r rrr	�
test_flock
rzFNTLEINTRTest.test_flockN)
r*r+r,r�rir�r��systemr�r�rrrr	r��s
 
r��__main__)�
contextlibrr�r<r�r�rrur&rr�ri�testrZtest.supportrr�contextmanagerr
rjrkZTestCaserr/rlr�r�r�r�r*�mainrrrr	�<module>sL


'h7�/8)�