HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/eintrdata/__pycache__/eintr_tester.cpython-37.opt-2.pyc
B

��gG�@s�ddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlm
Z
ejdd��Ze�eed�d�Gdd�dej��Ze�eed�d�Gd	d
�d
e��Ze�eed�d�Gdd�de��Ze�eed�d�Gd
d�de��Ze�eed�d�e�eed�d�Gdd�de���Ze�eed�d�Gdd�de��ZGdd�de�Zedk�r|e��dS)�N)�supportc	cs6|�(y
|VWn|���YnXWdQRXdS)N)�kill)�proc�r�7/usr/local/lib/python3.7/test/eintrdata/eintr_tester.py�
kill_on_errors
r�	setitimerzrequires setitimer()c@sDeZdZdZdZdZdd�Zdd�Zedd��Z	d	d
�Z
dd�Zd
S)�
EINTRBaseTestg�������?g�������?cCs|jd7_dS)N�)�signals)�self�signum�framerrr�
sighandler1szEINTRBaseTest.sighandlercCsLd|_t�tj|j�|_t�tj|j|j�t	t
d�rHt
jddtj
d�dS)Nr�dump_traceback_lateriXT)�exit�file)r�signal�SIGALRMr�orig_handlerr�ITIMER_REAL�signal_delay�
signal_period�hasattr�faulthandlerr�sys�
__stderr__)rrrr�setUp4s
zEINTRBaseTest.setUpcCst�tjdd�dS)Nr)rrrrrrr�
stop_alarm@szEINTRBaseTest.stop_alarmcCs.|��t�tj|j�ttd�r*t��dS)N�cancel_dump_traceback_later)rrrrrrr)rrrr�tearDownDs
zEINTRBaseTest.tearDowncOstjdf|}tj|f|�S)Nz-c)r�
executable�
subprocess�Popen)r�args�kwZcmd_argsrrrr"JszEINTRBaseTest.subprocessN)�__name__�
__module__�__qualname__rr�
sleep_timerr�staticmethodrr r"rrrrr	%sr	c@sxeZdZdd�Zdd�Zdd�Ze�ee	d�d�d	d
��Z
dd�Zd
d�Ze�ee	d�d�dd��Z
dd�Zdd�ZdS)�OSEINTRTestcCsd|j}|�|�S)Nzimport time; time.sleep(%r))r)r")r�coderrr�new_sleep_processSs
zOSEINTRTest.new_sleep_processcsLd}�fdd�t|�D�}xt|�D]
}|�q$Wx|D]}|��q8WdS)N�csg|]}����qSr)r-)�.0�_)rrr�
<listcomp>Ysz3OSEINTRTest._test_wait_multiple.<locals>.<listcomp>)�range�wait)r�	wait_funcZnumZ	processesr0rr)rr�_test_wait_multipleWs

zOSEINTRTest._test_wait_multiplecCs|�tj�dS)N)r5�osr3)rrrr�	test_wait`szOSEINTRTest.test_wait�wait3zrequires wait3()cCs|�dd��dS)NcSs
t�d�S)Nr)r6r8rrrr�<lambda>e�z(OSEINTRTest.test_wait3.<locals>.<lambda>)r5)rrrr�
test_wait3cszOSEINTRTest.test_wait3cCs|��}||j�|��dS)N)r-�pidr3)rr4rrrr�_test_wait_singlegs
zOSEINTRTest._test_wait_singlecCs|�dd��dS)NcSst�|d�S)Nr)r6�waitpid)r<rrrr9nr:z*OSEINTRTest.test_waitpid.<locals>.<lambda>)r=)rrrr�test_waitpidmszOSEINTRTest.test_waitpid�wait4zrequires wait4()cCs|�dd��dS)NcSst�|d�S)Nr)r6r@)r<rrrr9rr:z(OSEINTRTest.test_wait4.<locals>.<lambda>)r=)rrrr�
test_wait4pszOSEINTRTest.test_wait4cCs�t��\}}|�tj|�dddg}d�dddd|d	|jdd
ddd
f
�}|j|t|�|gd�}t|��Ft�|�x$|D]}|�	|t�
|t|���q|W|�	|��d�WdQRXdS)Nshellosworldsspam�
zimport os, sys, time�zwr = int(sys.argv[1])z
datas = %rzsleep_time = %rzfor data in datas:z$    # let the parent block on read()z    time.sleep(sleep_time)z    os.write(wr, data))�pass_fdsr)
r6�pipe�
addCleanup�close�joinr)r"�strr�assertEqual�read�lenr3)r�rd�wr�datasr,r�datarrr�	test_readts(



zOSEINTRTest.test_readcCs�t��\}}|�tj|�dtj}d�dddd|jdtjddd	d
dddd
ddddddf�}|j|t	|�|gd�}t
|��Rt�|�d}x,|t|�kr�|t�|t
|�|d��7}q�W|�|��d�WdQRXdS)N�xrBzimport io, os, sys, timerCzrd = int(sys.argv[1])zsleep_time = %rzdata = b"x" * %szdata_len = len(data)z!# let the parent block on write()ztime.sleep(sleep_time)zread_data = io.BytesIO()z+while len(read_data.getvalue()) < data_len:z%    chunk = os.read(rd, 2 * data_len)z    read_data.write(chunk)zvalue = read_data.getvalue()zif value != data:z0    raise Exception("read error: %s vs %s bytes"z-                    % (len(value), data_len)))rDr)r6rErFrGrZ
PIPE_MAX_SIZErHr)r"rIrrL�write�
memoryviewrJr3)rrMrNrPr,r�writtenrrr�
test_write�s<


 zOSEINTRTest.test_writeN)r&r'r(r-r5r7�unittest�
skipUnlessrr6r;r=r?rArQrVrrrrr+Os	r+c@s�eZdZe�eed�d�dd��Zdd�Ze�eejd�d�d	d
��Z	dd�Z
d
d�Zdd�Ze�eejd�d�dd��Z
dd�Ze�dd�e�eed�d�dd���Zdd�Ze�ejdkd �d!d"��Zd#d$�Ze�ejdkd �d%d&��Zd'S)(�SocketEINTRTest�
socketpairzneeds socketpair()c	Cs�t��\}}|�|j�dddg}d�ddddt|j�d	t|j�d
|d|jddd
ddddddf�}|�	�}|j
|t|�|gd�}t|��B|��x"|D]}|�
|||t|���q�W|�
|��d�WdQRXdS)NrR�y�zrBzimport os, socket, sys, timerCzfd = int(sys.argv[1])zfamily = %szsock_type = %sz
datas = %rzsleep_time = %rz)wr = socket.fromfd(fd, family, sock_type)zos.close(fd)zwith wr:z    for data in datas:z(        # let the parent block on recv()z        time.sleep(sleep_time)z        wr.sendall(data))rDr)�socketrZrFrGrH�int�family�typer)�filenor"rIrrJrLr3)	rZ	recv_funcrMrNrOr,�fdrrPrrr�
_test_recv�s6


zSocketEINTRTest._test_recvcCs|�tjj�dS)N)rcr]Zrecv)rrrr�	test_recv�szSocketEINTRTest.test_recv�recvmsgzneeds recvmsg()cCs|�dd��dS)NcSs|�|�dS)Nr)re)�sockrPrrrr9�r:z.SocketEINTRTest.test_recvmsg.<locals>.<lambda>)rc)rrrr�test_recvmsg�szSocketEINTRTest.test_recvmsgc
Cst��\}}|�|j�dtjd}d�ddddt|j�dt|j	�d	|j
d
tjddddd
dddddddddddddf�}|��}|j|t
|�|gd�}t|��b|��d}x>|t|�kr�||t|�|d��}	||	dkr�t|�n|	7}q�W|�|��d�WdQRXdS)Nsxyzr.rBzimport os, socket, sys, timerCzfd = int(sys.argv[1])zfamily = %szsock_type = %szsleep_time = %rzdata = b"xyz" * %szdata_len = len(data)z)rd = socket.fromfd(fd, family, sock_type)zos.close(fd)zwith rd:z$    # let the parent block on send()z    time.sleep(sleep_time)z'    received_data = bytearray(data_len)z	    n = 0z    while n < data_len:z8        n += rd.recv_into(memoryview(received_data)[n:])zif received_data != data:z0    raise Exception("recv error: %s vs %s bytes"z5                    % (len(received_data), data_len)))rDr)r]rZrFrGrZ
SOCK_MAX_SIZErHr^r_r`r)rar"rIrrLrTrJr3)
rZ	send_funcrMrNrPr,rbrrUZsentrrr�
_test_send�sJ
zSocketEINTRTest._test_sendcCs|�tjj�dS)N)rhr]�send)rrrr�	test_sendszSocketEINTRTest.test_sendcCs|�tjj�dS)N)rhr]Zsendall)rrrr�test_sendallszSocketEINTRTest.test_sendall�sendmsgzneeds sendmsg()cCs|�dd��dS)NcSs|�|g�S)N)rl)rfrPrrrr9r:z.SocketEINTRTest.test_sendmsg.<locals>.<lambda>)rh)rrrr�test_sendmsgszSocketEINTRTest.test_sendmsgcCs�t�tjtj�}|�|j�|�tjdf�|��d}|�	�d�
dddtjd|d|jdd	d
ddf
�}|�|�}t
|��*|��\}}|��|�|��d�WdQRXdS)
Nrr
rBzimport socket, timerCz	host = %rz	port = %szsleep_time = %rz# let parent block on accept()ztime.sleep(sleep_time)z,with socket.create_connection((host, port)):z    time.sleep(sleep_time))r]ZAF_INETZSOCK_STREAMrFrGZbindrZHOSTZgetsocknameZlistenrHr)r"rZacceptrJr3)rrfZportr,rZclient_sockr0rrr�test_accepts*

zSocketEINTRTest.test_accept�
r.�mkfifozneeds mkfifo()cCs�tj}t�|�yt�|�Wn0tk
rN}z|�d|�Wdd}~XYnX|�tj|�d�ddd|d|j	dddd|f	�}|�
|�}t|��||�|�|�
�d	�WdQRXdS)
Nzos.mkfifo(): %srBzimport os, timerCz	path = %azsleep_time = %rz# let the parent blockztime.sleep(sleep_time)r)r�TESTFN�unlinkr6rp�PermissionErrorZskipTestrFrHr)r"rrJr3)rZdo_open_close_readerZdo_open_close_writer�filename�er,rrrr�
_test_open>s*
 

zSocketEINTRTest._test_opencCst|d�}|��dS)N�w)�openrG)r�path�fprrr�python_open]s
zSocketEINTRTest.python_open�darwinz+hangs under macOS; see bpo-25234, bpo-35363cCs|�d|j�dS)Nzfp = open(path, 'r')
fp.close())rvr{)rrrr�	test_openaszSocketEINTRTest.test_opencCst�|tj�}t�|�dS)N)r6rx�O_WRONLYrG)rryrbrrr�os_opengszSocketEINTRTest.os_opencCs|�d|j�dS)Nz,fd = os.open(path, os.O_RDONLY)
os.close(fd))rvr)rrrr�test_os_openkszSocketEINTRTest.test_os_openN)r&r'r(rWrXrr]rcrdrgrhrjrkrmrnrZrequires_freebsd_versionr6rvr{�skipIfr�platformr}rr�rrrrrY�s $.
rYc@seZdZdd�ZdS)�
TimeEINTRTestcCs:t��}t�|j�|��t��|}|�||j�dS)N)�time�	monotonic�sleepr)r�assertGreaterEqual)r�t0�dtrrr�
test_sleepvs
zTimeEINTRTest.test_sleepN)r&r'r(r�rrrrr�rsr��pthread_sigmaskzneed signal.pthread_sigmask()c@sHeZdZdd�Ze�eed�d�dd��Ze�eed�d�dd	��Z	d
S)�SignalEINTRTestc
	Cs�tj}t��}t�|dd��}|�tj||�d�ddt��dt|�d|jdd	f�}t�tj	|g�}|�tjtj
|g�t��}|�
|�}t|��||�t��|}	WdQRX|�|��d
�dS)NcWsdS)Nr)r$rrrr9�r:z/SignalEINTRTest.check_sigwait.<locals>.<lambda>rBzimport os, timezpid = %szsignum = %szsleep_time = %rztime.sleep(sleep_time)zos.kill(pid, signum)r)r�SIGUSR1r6�getpidrFrHr^r)r��	SIG_BLOCK�SIG_UNBLOCKr�r�r"rrJr3)
rr4r
r<Zold_handlerr,Zold_maskr�rr�rrr�
check_sigwait�s&



zSignalEINTRTest.check_sigwait�sigwaitinfozneed signal.sigwaitinfo()cCsdd�}|�|�dS)NcSst�|g�dS)N)rr�)r
rrrr4�sz3SignalEINTRTest.test_sigwaitinfo.<locals>.wait_func)r�)rr4rrr�test_sigwaitinfo�sz SignalEINTRTest.test_sigwaitinfo�sigtimedwaitcCsdd�}|�|�dS)NcSst�|gd�dS)Ng^@)rr�)r
rrrr4�sz4SignalEINTRTest.test_sigtimedwait.<locals>.wait_func)r�)rr4rrr�test_sigtimedwait�sz!SignalEINTRTest.test_sigtimedwaitN)
r&r'r(r�rWrXrrr�r�rrrrr�~s
r�c@s�eZdZdd�Ze�ejdkd�e�e	e
d�d�dd���Ze�e	e
d	�d
�dd��Ze�e	e
d
�d�dd��Z
e�e	e
d�d�dd��ZdS)�SelectEINTRTestcCs@t��}t�ggg|j�t��|}|��|�||j�dS)N)r�r��selectr)rr�)rr�r�rrr�test_select�s
zSelectEINTRTest.test_selectr|z(poll may fail on macOS; see issue #28087�pollzneed select.pollcCsFt��}t��}|�|jd�t��|}|��|�||j�dS)Ng@�@)r�r�r�r�r)rr�)r�pollerr�r�rrr�	test_poll�szSelectEINTRTest.test_poll�epollzneed select.epollcCsNt��}|�|j�t��}|�|j�t��|}|��|�	||j�dS)N)
r�r�rFrGr�r�r�r)rr�)rr�r�r�rrr�
test_epoll�szSelectEINTRTest.test_epoll�kqueuezneed select.kqueuecCsRt��}|�|j�t��}|�dd|j�t��|}|��|�	||j�dS)Nr
)
r�r�rFrGr�r�Zcontrolr)rr�)rr�r�r�rrr�test_kqueue�szSelectEINTRTest.test_kqueue�devpollzneed select.devpollcCsRt��}|�|j�t��}|�|jd�t��|}|��|�	||j�dS)Ng@�@)
r�r�rFrGr�r�r�r)rr�)rr�r�r�rrr�test_devpoll�szSelectEINTRTest.test_devpollN)r&r'r(r�rWr�rr�rXrr�r�r�r�r�rrrrr��s
r�c@s8eZdZdd�Ze�e��dkd�dd��Zdd�Z	d	S)
�
FNTLEINTRTestc
Cs|�tjtj�d�ddtjd|d|jf�}t��}|�|�}t	|���t
tjd���}xht��|}|dkr~td|��y,||tj
tjB�||tj�t�d	�Wq^tk
r�PYq^Xq^W||tj
�t��|}|�||j�|��WdQRX|��WdQRXdS)
NrBzimport fcntl, timezwith open('%s', 'wb') as f:z   fcntl.%s(f, fcntl.LOCK_EX)z   time.sleep(%s)�wbgN@z failed to sync child in %.1f secg{�G�z�?)rFrrrrqrHr)r�r�r"rrx�	Exception�fcntlZLOCK_EXZLOCK_NBZLOCK_UNr��BlockingIOErrorr�rr3)rZ	lock_funcZ	lock_namer,Z
start_timer�fr�rrr�_lock�s2

zFNTLEINTRTest._lockZAIXzAIX returns PermissionErrorcCs|�tjd�dS)N�lockf)r�r�r�)rrrr�
test_lockfszFNTLEINTRTest.test_lockfcCs|�tjd�dS)N�flock)r�r�r�)rrrr�
test_flockszFNTLEINTRTest.test_flockN)
r&r'r(r�rWr�r��systemr�r�rrrrr��s r��__main__)�
contextlibrr�r6r�r�rr]r"rr�rWZtestr�contextmanagerrrXrZTestCaser	r+rYr�r�r�r�r&�mainrrrr�<module>s>
)h:
/8)