HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/eintrdata/__pycache__/eintr_tester.cpython-37.pyc
B

��gG�@s�dZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddl
mZejdd��Ze�eed�d�Gdd	�d	ej��Ze�eed�d�Gd
d�de��Ze�eed�d�Gdd
�d
e��Ze�eed�d�Gdd�de��Ze�eed�d�e�eed�d�Gdd�de���Ze�eed�d�Gdd�de��ZGdd�de�Zedk�r�e��dS)a�
This test suite exercises some system calls subject to interruption with EINTR,
to check that it is actually handled transparently.
It is intended to be run by the main test suite within a child process, to
ensure there is no background thread running (so that signals are delivered to
the correct thread).
Signals are generated in-process using setitimer(ITIMER_REAL), which allows
sub-second periodicity (contrarily to signal()).
�N)�supportc	cs6|�(y
|VWn|���YnXWdQRXdS)zGContext manager killing the subprocess if a Python exception is raised.N)�kill)�proc�r�7/usr/local/lib/python3.7/test/eintrdata/eintr_tester.py�
kill_on_errors
r�	setitimerzrequires setitimer()c@sHeZdZdZdZdZdZdd�Zdd�Ze	dd	��Z
d
d�Zdd
�ZdS)�
EINTRBaseTestz Base class for EINTR tests. g�������?g�������?cCs|jd7_dS)N�)�signals)�self�signum�framerrr�
sighandler1szEINTRBaseTest.sighandlercCsLd|_t�tj|j�|_t�tj|j|j�t	t
d�rHt
jddtj
d�dS)Nr�dump_traceback_lateriXT)�exit�file)r�signal�SIGALRMr�orig_handlerr�ITIMER_REAL�signal_delay�
signal_period�hasattr�faulthandlerr�sys�
__stderr__)rrrr�setUp4s
zEINTRBaseTest.setUpcCst�tjdd�dS)Nr)rrrrrrr�
stop_alarm@szEINTRBaseTest.stop_alarmcCs.|��t�tj|j�ttd�r*t��dS)N�cancel_dump_traceback_later)rrrrrrr)rrrr�tearDownDs
zEINTRBaseTest.tearDowncOstjdf|}tj|f|�S)Nz-c)r�
executable�
subprocess�Popen)r�args�kwZcmd_argsrrrr"JszEINTRBaseTest.subprocessN)
�__name__�
__module__�__qualname__�__doc__rr�
sleep_timerr�staticmethodrr r"rrrrr	%sr	c@s|eZdZdZdd�Zdd�Zdd�Ze�e	e
d�d	�d
d��Zdd
�Zdd�Z
e�e	e
d�d�dd��Zdd�Zdd�ZdS)�OSEINTRTestz  EINTR tests for the os module. cCsd|j}|�|�S)Nzimport time; time.sleep(%r))r*r")r�coderrr�new_sleep_processSs
zOSEINTRTest.new_sleep_processcsLd}�fdd�t|�D�}xt|�D]
}|�q$Wx|D]}|��q8WdS)N�csg|]}����qSr)r.)�.0�_)rrr�
<listcomp>Ysz3OSEINTRTest._test_wait_multiple.<locals>.<listcomp>)�range�wait)r�	wait_funcZnumZ	processesr1rr)rr�_test_wait_multipleWs

zOSEINTRTest._test_wait_multiplecCs|�tj�dS)N)r6�osr4)rrrr�	test_wait`szOSEINTRTest.test_wait�wait3zrequires wait3()cCs|�dd��dS)NcSs
t�d�S)Nr)r7r9rrrr�<lambda>e�z(OSEINTRTest.test_wait3.<locals>.<lambda>)r6)rrrr�
test_wait3cszOSEINTRTest.test_wait3cCs|��}||j�|��dS)N)r.�pidr4)rr5rrrr�_test_wait_singlegs
zOSEINTRTest._test_wait_singlecCs|�dd��dS)NcSst�|d�S)Nr)r7�waitpid)r=rrrr:nr;z*OSEINTRTest.test_waitpid.<locals>.<lambda>)r>)rrrr�test_waitpidmszOSEINTRTest.test_waitpid�wait4zrequires wait4()cCs|�dd��dS)NcSst�|d�S)Nr)r7rA)r=rrrr:rr;z(OSEINTRTest.test_wait4.<locals>.<lambda>)r>)rrrr�
test_wait4pszOSEINTRTest.test_wait4cCs�t��\}}|�tj|�dddg}d�dddd|d	|jdd
ddd
f
�}|j|t|�|gd�}t|��Ft�|�x$|D]}|�	|t�
|t|���q|W|�	|��d�WdQRXdS)Nshellosworldsspam�
zimport os, sys, time�zwr = int(sys.argv[1])z
datas = %rzsleep_time = %rzfor data in datas:z$    # let the parent block on read()z    time.sleep(sleep_time)z    os.write(wr, data))�pass_fdsr)
r7�pipe�
addCleanup�close�joinr*r"�strr�assertEqual�read�lenr4)r�rd�wr�datasr-r�datarrr�	test_readts(



zOSEINTRTest.test_readcCs�t��\}}|�tj|�dtj}d�dddd|jdtjddd	d
dddd
ddddddf�}|j|t	|�|gd�}t
|��Rt�|�d}x,|t|�kr�|t�|t
|�|d��7}q�W|�|��d�WdQRXdS)N�xrCzimport io, os, sys, timerDzrd = int(sys.argv[1])zsleep_time = %rzdata = b"x" * %szdata_len = len(data)z!# let the parent block on write()ztime.sleep(sleep_time)zread_data = io.BytesIO()z+while len(read_data.getvalue()) < data_len:z%    chunk = os.read(rd, 2 * data_len)z    read_data.write(chunk)zvalue = read_data.getvalue()zif value != data:z0    raise Exception("read error: %s vs %s bytes"z-                    % (len(value), data_len)))rEr)r7rFrGrHrZ
PIPE_MAX_SIZErIr*r"rJrrM�write�
memoryviewrKr4)rrNrOrQr-r�writtenrrr�
test_write�s<


 zOSEINTRTest.test_writeN)r&r'r(r)r.r6r8�unittest�
skipUnlessrr7r<r>r@rBrRrWrrrrr,Os	r,c@s�eZdZdZe�eed�d�dd��Zdd�Z	e�eejd�d	�d
d��Z
dd
�Zdd�Zdd�Z
e�eejd�d�dd��Zdd�Ze�dd�e�eed�d�dd���Zdd�Ze�ejd kd!�d"d#��Zd$d%�Ze�ejd kd!�d&d'��Zd(S))�SocketEINTRTestz$ EINTR tests for the socket module. �
socketpairzneeds socketpair()c	Cs�t��\}}|�|j�dddg}d�ddddt|j�d	t|j�d
|d|jddd
ddddddf�}|�	�}|j
|t|�|gd�}t|��B|��x"|D]}|�
|||t|���q�W|�
|��d�WdQRXdS)NrS�y�zrCzimport os, socket, sys, timerDzfd = int(sys.argv[1])zfamily = %szsock_type = %sz
datas = %rzsleep_time = %rz)wr = socket.fromfd(fd, family, sock_type)zos.close(fd)zwith wr:z    for data in datas:z(        # let the parent block on recv()z        time.sleep(sleep_time)z        wr.sendall(data))rEr)�socketr[rGrHrI�int�family�typer*�filenor"rJrrKrMr4)	rZ	recv_funcrNrOrPr-�fdrrQrrr�
_test_recv�s6


zSocketEINTRTest._test_recvcCs|�tjj�dS)N)rdr^Zrecv)rrrr�	test_recv�szSocketEINTRTest.test_recv�recvmsgzneeds recvmsg()cCs|�dd��dS)NcSs|�|�dS)Nr)rf)�sockrQrrrr:�r;z.SocketEINTRTest.test_recvmsg.<locals>.<lambda>)rd)rrrr�test_recvmsg�szSocketEINTRTest.test_recvmsgc
Cst��\}}|�|j�dtjd}d�ddddt|j�dt|j	�d	|j
d
tjddddd
dddddddddddddf�}|��}|j|t
|�|gd�}t|��b|��d}x>|t|�kr�||t|�|d��}	||	dkr�t|�n|	7}q�W|�|��d�WdQRXdS)Nsxyzr/rCzimport os, socket, sys, timerDzfd = int(sys.argv[1])zfamily = %szsock_type = %szsleep_time = %rzdata = b"xyz" * %szdata_len = len(data)z)rd = socket.fromfd(fd, family, sock_type)zos.close(fd)zwith rd:z$    # let the parent block on send()z    time.sleep(sleep_time)z'    received_data = bytearray(data_len)z	    n = 0z    while n < data_len:z8        n += rd.recv_into(memoryview(received_data)[n:])zif received_data != data:z0    raise Exception("recv error: %s vs %s bytes"z5                    % (len(received_data), data_len)))rEr)r^r[rGrHrZ
SOCK_MAX_SIZErIr_r`rar*rbr"rJrrMrUrKr4)
rZ	send_funcrNrOrQr-rcrrVZsentrrr�
_test_send�sJ
zSocketEINTRTest._test_sendcCs|�tjj�dS)N)rir^�send)rrrr�	test_sendszSocketEINTRTest.test_sendcCs|�tjj�dS)N)rir^Zsendall)rrrr�test_sendallszSocketEINTRTest.test_sendall�sendmsgzneeds sendmsg()cCs|�dd��dS)NcSs|�|g�S)N)rm)rgrQrrrr:r;z.SocketEINTRTest.test_sendmsg.<locals>.<lambda>)ri)rrrr�test_sendmsgszSocketEINTRTest.test_sendmsgcCs�t�tjtj�}|�|j�|�tjdf�|��d}|�	�d�
dddtjd|d|jdd	d
ddf
�}|�|�}t
|��*|��\}}|��|�|��d�WdQRXdS)
Nrr
rCzimport socket, timerDz	host = %rz	port = %szsleep_time = %rz# let parent block on accept()ztime.sleep(sleep_time)z,with socket.create_connection((host, port)):z    time.sleep(sleep_time))r^ZAF_INETZSOCK_STREAMrGrHZbindrZHOSTZgetsocknameZlistenrIr*r"rZacceptrKr4)rrgZportr-rZclient_sockr1rrr�test_accepts*

zSocketEINTRTest.test_accept�
r/�mkfifozneeds mkfifo()cCs�tj}t�|�yt�|�Wn0tk
rN}z|�d|�Wdd}~XYnX|�tj|�d�ddd|d|j	dddd|f	�}|�
|�}t|��||�|�|�
�d	�WdQRXdS)
Nzos.mkfifo(): %srCzimport os, timerDz	path = %azsleep_time = %rz# let the parent blockztime.sleep(sleep_time)r)r�TESTFN�unlinkr7rq�PermissionErrorZskipTestrGrIr*r"rrKr4)rZdo_open_close_readerZdo_open_close_writer�filename�er-rrrr�
_test_open>s*
 

zSocketEINTRTest._test_opencCst|d�}|��dS)N�w)�openrH)r�path�fprrr�python_open]s
zSocketEINTRTest.python_open�darwinz+hangs under macOS; see bpo-25234, bpo-35363cCs|�d|j�dS)Nzfp = open(path, 'r')
fp.close())rwr|)rrrr�	test_openaszSocketEINTRTest.test_opencCst�|tj�}t�|�dS)N)r7ry�O_WRONLYrH)rrzrcrrr�os_opengszSocketEINTRTest.os_opencCs|�d|j�dS)Nz,fd = os.open(path, os.O_RDONLY)
os.close(fd))rwr�)rrrr�test_os_openkszSocketEINTRTest.test_os_openN)r&r'r(r)rXrYrr^rdrerhrirkrlrnrorZrequires_freebsd_versionr7rwr|�skipIfr�platformr~r�r�rrrrrZ�s"$.
rZc@seZdZdZdd�ZdS)�
TimeEINTRTestz" EINTR tests for the time module. cCs:t��}t�|j�|��t��|}|�||j�dS)N)�time�	monotonic�sleepr*r�assertGreaterEqual)r�t0�dtrrr�
test_sleepvs
zTimeEINTRTest.test_sleepN)r&r'r(r)r�rrrrr�rsr��pthread_sigmaskzneed signal.pthread_sigmask()c@sLeZdZdZdd�Ze�eed�d�dd��Z	e�eed�d�d	d
��Z
dS)�SignalEINTRTestz$ EINTR tests for the signal module. c
	Cs�tj}t��}t�|dd��}|�tj||�d�ddt��dt|�d|jdd	f�}t�tj	|g�}|�tjtj
|g�t��}|�
|�}t|��||�t��|}	WdQRX|�|��d
�dS)NcWsdS)Nr)r$rrrr:�r;z/SignalEINTRTest.check_sigwait.<locals>.<lambda>rCzimport os, timezpid = %szsignum = %szsleep_time = %rztime.sleep(sleep_time)zos.kill(pid, signum)r)r�SIGUSR1r7�getpidrGrIr_r*r��	SIG_BLOCK�SIG_UNBLOCKr�r�r"rrKr4)
rr5r
r=Zold_handlerr-Zold_maskr�rr�rrr�
check_sigwait�s&



zSignalEINTRTest.check_sigwait�sigwaitinfozneed signal.sigwaitinfo()cCsdd�}|�|�dS)NcSst�|g�dS)N)rr�)r
rrrr5�sz3SignalEINTRTest.test_sigwaitinfo.<locals>.wait_func)r�)rr5rrr�test_sigwaitinfo�sz SignalEINTRTest.test_sigwaitinfo�sigtimedwaitcCsdd�}|�|�dS)NcSst�|gd�dS)Ng^@)rr�)r
rrrr5�sz4SignalEINTRTest.test_sigtimedwait.<locals>.wait_func)r�)rr5rrr�test_sigtimedwait�sz!SignalEINTRTest.test_sigtimedwaitN)r&r'r(r)r�rXrYrrr�r�rrrrr�~sr�c@s�eZdZdZdd�Ze�ejdkd�e�	e
ed�d�dd	���Ze�	e
ed
�d�dd
��Z
e�	e
ed�d�dd��Ze�	e
ed�d�dd��ZdS)�SelectEINTRTestz$ EINTR tests for the select module. cCs@t��}t�ggg|j�t��|}|��|�||j�dS)N)r�r��selectr*rr�)rr�r�rrr�test_select�s
zSelectEINTRTest.test_selectr}z(poll may fail on macOS; see issue #28087�pollzneed select.pollcCsFt��}t��}|�|jd�t��|}|��|�||j�dS)Ng@�@)r�r�r�r�r*rr�)r�pollerr�r�rrr�	test_poll�szSelectEINTRTest.test_poll�epollzneed select.epollcCsNt��}|�|j�t��}|�|j�t��|}|��|�	||j�dS)N)
r�r�rGrHr�r�r�r*rr�)rr�r�r�rrr�
test_epoll�szSelectEINTRTest.test_epoll�kqueuezneed select.kqueuecCsRt��}|�|j�t��}|�dd|j�t��|}|��|�	||j�dS)Nr
)
r�r�rGrHr�r�Zcontrolr*rr�)rr�r�r�rrr�test_kqueue�szSelectEINTRTest.test_kqueue�devpollzneed select.devpollcCsRt��}|�|j�t��}|�|jd�t��|}|��|�	||j�dS)Ng@�@)
r�r�rGrHr�r�r�r*rr�)rr�r�r�rrr�test_devpoll�szSelectEINTRTest.test_devpollN)r&r'r(r)r�rXr�rr�rYrr�r�r�r�r�rrrrr��s
r�c@s8eZdZdd�Ze�e��dkd�dd��Zdd�Z	d	S)
�
FNTLEINTRTestc
Cs|�tjtj�d�ddtjd|d|jf�}t��}|�|�}t	|���t
tjd���}xht��|}|dkr~td|��y,||tj
tjB�||tj�t�d	�Wq^tk
r�PYq^Xq^W||tj
�t��|}|�||j�|��WdQRX|��WdQRXdS)
NrCzimport fcntl, timezwith open('%s', 'wb') as f:z   fcntl.%s(f, fcntl.LOCK_EX)z   time.sleep(%s)�wbgN@z failed to sync child in %.1f secg{�G�z�?)rGrrsrrrIr*r�r�r"rry�	Exception�fcntlZLOCK_EXZLOCK_NBZLOCK_UNr��BlockingIOErrorr�rr4)rZ	lock_funcZ	lock_namer-Z
start_timer�fr�rrr�_lock�s2

zFNTLEINTRTest._lockZAIXzAIX returns PermissionErrorcCs|�tjd�dS)N�lockf)r�r�r�)rrrr�
test_lockfszFNTLEINTRTest.test_lockfcCs|�tjd�dS)N�flock)r�r�r�)rrrr�
test_flockszFNTLEINTRTest.test_flockN)
r&r'r(r�rXr�r��systemr�r�rrrrr��s r��__main__)r)�
contextlibrr�r7r�r�rr^r"rr�rXZtestr�contextmanagerrrYrZTestCaser	r,rZr�r�r�r�r&�mainrrrr�<module>	s@
)h:
/8)