HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/test_asyncio/__pycache__/functional.cpython-37.opt-1.pyc
B

��gt�@s�ddlZddlZddlZddlZddlZddlZddlZddlZddlZGdd�d�Z	Gdd�d�Z
Gdd�dej�ZGdd	�d	e�Z
Gd
d�de�ZdS)�Nc@s�eZdZdd�Zdd�dd�Zdd�Zd	d
�Zdd�Zej	d
dddd�dd�Z
ej	dfdd�Zdd�Zdd�Z
ejdd��Zdd�Zd
S)�FunctionalTestCaseMixincCst��S)N)�asyncioZnew_event_loop)�self�r�8/usr/local/lib/python3.7/test/test_asyncio/functional.py�new_loopsz FunctionalTestCaseMixin.new_loopg{�G�z�?)�delaycCs|j�tj||jd��dS)N)�loop)r	Zrun_until_completerZsleep)rrrrr�run_loop_brieflysz(FunctionalTestCaseMixin.run_loop_brieflycCs|j�|�|j�|�dS)N)�._FunctionalTestCaseMixin__unhandled_exceptions�appendr	Zdefault_exception_handler)rr	�contextrrr�loop_exception_handlersz.FunctionalTestCaseMixin.loop_exception_handlercCsB|��|_t�d�|j�|j�g|_tjj|_	dd�tj_dS)NcSsdS)Nrrrrr�<lambda>!�z/FunctionalTestCaseMixin.setUp.<locals>.<lambda>)
rr	r�set_event_loopZset_exception_handlerrr�events�_get_running_loop�_old_get_running_loop)rrrr�setUps


zFunctionalTestCaseMixin.setUpc	CsTz2|j��|jr0td�t�|j�|�d�Wd|jtj_	t�
d�d|_XdS)Nz2Unexpected calls to loop.call_exception_handler():z1unexpected calls to loop.call_exception_handler())r	�closer�print�pprint�failrrrrr)rrrr�tearDown#s


z FunctionalTestCaseMixin.tearDownN���
)�family�addr�timeout�backlog�max_clientsc

Cs�|dkr<ttd�r8|tjkr8t���}|j}WdQRXnd}t�|tj�}|dkrZtd��|dkrjtd��|�|�y|�	|�|�
|�Wn.tk
r�}	z|��|	�Wdd}	~	XYnXt
|||||�S)N�AF_UNIX)z	127.0.0.1rztimeout is requiredrz#only blocking sockets are supported)�hasattr�socketr#�tempfileZNamedTemporaryFile�name�SOCK_STREAM�RuntimeError�
settimeoutZbindZlisten�OSErrorr�TestThreadedServer)
rZserver_progrrr r!r"Ztmp�sock�exrrr�
tcp_server1s&


z"FunctionalTestCaseMixin.tcp_servercCsFt�|tj�}|dkrtd��|dkr.td��|�|�t||||�S)Nztimeout is requiredrz#only blocking sockets are supported)r%r(r)r*�TestThreadedClient)rZclient_progrr r-rrr�
tcp_clientQs
z"FunctionalTestCaseMixin.tcp_clientcOs$ttd�st�|j|dtji|��S)Nr#r)r$r%�NotImplementedErrorr/r#)r�args�kwargsrrr�unix_server`s
z#FunctionalTestCaseMixin.unix_servercOs$ttd�st�|j|dtji|��S)Nr#r)r$r%r2r1r#)rr3r4rrr�unix_clientes
z#FunctionalTestCaseMixin.unix_clientccsXt���F}tj�|d�}z
|VWdyt�|�Wntk
rFYnXXWdQRXdS)Nr-)r&ZTemporaryDirectory�os�path�join�unlinkr+)rZtd�fnrrr�unix_sock_namejs

z&FunctionalTestCaseMixin.unix_sock_namec	Cs z|j��Wd|�|�XdS)N)r	�stopr)rr.rrr�_abort_socket_testvsz*FunctionalTestCaseMixin._abort_socket_test)�__name__�
__module__�__qualname__rr
rrrr%ZAF_INETr/r1r5r6�
contextlib�contextmanagerr<r>rrrrrs 
rc@s<eZdZdd�Zdd�Zddd�dd	�Zd
d�Zdd
�ZdS)�TestSocketWrappercCs
||_dS)N)�_TestSocketWrapper__sock)rr-rrr�__init__�szTestSocketWrapper.__init__cCs@d}x6t|�|kr:|�|t|��}|dkr0t�||7}qW|S)Nr)�lenZrecv�ConnectionAbortedError)r�n�buf�datarrr�recv_all�szTestSocketWrapper.recv_allFN)�server_side�server_hostnamecCsT|j|j||dd�}z(y|��Wn|���YnXWd|j��X||_dS)NF)rMrNZdo_handshake_on_connect)Zwrap_socketrEZdo_handshaker)rZssl_contextrMrNZssl_sockrrr�	start_tls�szTestSocketWrapper.start_tlscCst|j|�S)N)�getattrrE)rr'rrr�__getattr__�szTestSocketWrapper.__getattr__cCsd�t|�j|j�S)Nz	<{} {!r}>)�format�typer?rE)rrrr�__repr__�szTestSocketWrapper.__repr__)r?r@rArFrLrOrQrTrrrrrD�s
rDc@s$eZdZdd�Zdd�Zdd�ZdS)�SocketThreadcCsd|_|��dS)NF)�_activer9)rrrrr=�szSocketThread.stopcCs|��|S)N)�start)rrrr�	__enter__�szSocketThread.__enter__cGs|��dS)N)r=)r�excrrr�__exit__�szSocketThread.__exit__N)r?r@rAr=rXrZrrrrrU�srUc@seZdZdd�Zdd�ZdS)r0cCs:tj�|ddd�d|_||_||_d|_||_||_dS)Nztest-clientT)	�	threading�ThreadrF�daemon�_timeout�_sockrV�_prog�_test)r�testr-�progr rrrrF�szTestThreadedClient.__init__c
CsHy|�t|j��Wn.tk
rB}z|j�|�Wdd}~XYnXdS)N)r`rDr_�	Exceptionrar>)rr.rrr�run�szTestThreadedClient.runN)r?r@rArFrerrrrr0�s
r0csHeZdZdd�Z�fdd�Zdd�Zdd�Zd	d
�Zedd��Z	�Z
S)
r,cCshtj�|ddd�d|_d|_d|_||_||_||_d|_	||_
t��\|_
|_|j
�d�||_dS)Nztest-serverTrF)r[r\rFr]�_clientsZ_finished_clients�_max_clientsr^r_rVr`r%Z
socketpair�_s1�_s2�setblockingra)rrbr-rcr r"rrrrF�szTestThreadedServer.__init__csPz>|jr<|j��dkr<y|j�d�Wntk
r:YnXWdt���XdS)N���sstop)ri�fileno�sendr+�superr=)r)�	__class__rrr=�s
zTestThreadedServer.stopc	CsFz*|j�|j�d�|��WdQRXWd|j��|j��XdS)Nr)r_rj�_runrhrri)rrrrre�s
zTestThreadedServer.runcCs �x|j�r|j|jkrdSt�|j|jggg|j�\}}}|j|krJdS|j|kry|j��\}}Wn6tk
r|wYqt	j
k
r�|js�dS�YqX|jd7_|�|j�y|�|�|�WdQRXWqt
k
�r}z d|_z�Wd|j�|�XWdd}~XYqXqWdS)NrF)rVrfrg�selectr_rhr^Zaccept�BlockingIOErrorr%r r*�_handle_clientrdrar>)r�r�w�xZconnrr.rrrrp�s4

zTestThreadedServer._runcCs|�t|��dS)N)r`rD)rr-rrrrssz!TestThreadedServer._handle_clientcCs
|j��S)N)r_Zgetsockname)rrrrrszTestThreadedServer.addr)r?r@rArFr=rerprs�propertyr�
__classcell__rr)rorr,�s
	"r,)rZasyncio.eventsrBr7rrqr%r&r[rrDr\rUr0r,rrrr�<module>sv(