HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.10/test/test_asyncio/__pycache__/test_windows_events.cpython-310.pyc
o

�i�*�@sddlZddlZddlZddlZddlZddlZddlZddlmZejdkr,e�	d��ddl
Z
ddlZddlZddlm
Z
ddlmZdd�ZGd	d
�d
ej�ZGdd�dej�ZGd
d�dej�ZGdd�dej�ZGdd�dej�Zedkre��dSdS)�N)�mock�win32zWindows only)�windows_events)�utilscCst�d�dS�N)�asyncio�set_event_loop_policy�r	r	�B/usr/local/lib/python3.10/test/test_asyncio/test_windows_events.py�tearDownModulesrc@s$eZdZdd�Zdd�Zdd�ZdS)�
UpperProtocCs
g|_dSr)�buf��selfr	r	r
�__init__�
zUpperProto.__init__cCs
||_dSr)�trans)rrr	r	r
�connection_maderzUpperProto.connection_madecCs>|j�|�d|vr|j�d�|j����|j��dSdS)N�
�)r
�appendr�write�join�upper�close)r�datar	r	r
�
data_received s
�zUpperProto.data_receivedN)�__name__�
__module__�__qualname__rrrr	r	r	r
rsrc@�eZdZdd�ZdS)�ProactorLoopCtrlCc	Cszdd�}tj|d�}t��}z"z|�|j�|��|�d�Wn	ty)YnwW|�	|�n|�	|�w|�
�dS)NcSst�d�t�tj�dS)N皙�����?)�time�sleep�signal�raise_signal�SIGINTr	r	r	r
�SIGINT_after_delay+s
z9ProactorLoopCtrlC.test_ctrl_c.<locals>.SIGINT_after_delay��targetz%should not fall through 'run_forever')�	threading�ThreadrZget_event_loopZ	call_soon�start�run_foreverZfail�KeyboardInterrupt�
close_loopr)rr(�thread�loopr	r	r
�test_ctrl_c)s��zProactorLoopCtrlC.test_ctrl_cN)rrrr3r	r	r	r
r!'sr!c@r )�ProactorMultithreadingcsDd�dd����fdd�}tj|d�}|��|��|���dS)NFc�s�t�d�IdHdS)Nr)rr$r	r	r	r
�coroAs�zAProactorMultithreading.test_run_from_nonmain_thread.<locals>.corocs$t��}|����|��d�dS)NT)rZnew_event_loop�run_until_completer�r2�r5�finishedr	r
�funcDszAProactorMultithreading.test_run_from_nonmain_thread.<locals>.funcr))r+r,r-r�
assertTrue)rr:r1r	r8r
�test_run_from_nonmain_thread>sz3ProactorMultithreading.test_run_from_nonmain_threadN)rrrr<r	r	r	r
r4=sr4csteZdZ�fdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Z�ZS)�
ProactorTestscs$t���t��|_|�|j�dSr)�super�setUpr�ProactorEventLoopr2Zset_event_loopr��	__class__r	r
r?Ts

zProactorTests.setUpcCsht��\}}|j�|t���}tj|j�|d�|jd�}|��|j�	|�|�
|��d�|��dS)N�dr7r)�socket�
socketpairr2Z_make_socket_transportr�ProtocolZ
ensure_futureZ	sock_recvrr6�assertEqual�result)r�a�br�fr	r	r
�
test_closeYszProactorTests.test_closecCsVdt��}t�|�}|�t��
t�|�Wd�n1s wY|��dS)Nz\\.\pipe\test_double_bind-%s)�os�getpidr�
PipeServer�assertRaises�PermissionErrorr)r�ADDRESSZserver1r	r	r
�test_double_bindbs
�zProactorTests.test_double_bindcC� |j�|���}|�|d�dS�N�done)r2r6�
_test_piperG�r�resr	r	r
�	test_pipei�zProactorTests.test_pipec�s��dt��}|�t��|j�tj|�IdHWd�n1s"wY|j�t	|�IdH\}|�
|tj�g}t
d�D]5}tj|jd�}tj||jd��|j��fdd�|�IdH\}}|�
|tj�|��|�|�||f�q?t|�D]\}\}}	|	�d�|����qyt|�D]\}\}}	|��IdH}
|�|
d�|����|	��q�|��|�t��|j�tj|�IdHWd�dS1s�wYdS)	Nz\\.\pipe\_test_pipe-%s�r7cs�Srr	r	��protocolr	r
�<lambda>~�z*ProactorTests._test_pipe.<locals>.<lambda>z	lower-{}
z	LOWER-{}
rV)rMrNrP�FileNotFoundErrorr2Zcreate_pipe_connectionrrF�start_serving_piper�assertIsInstancerrO�range�StreamReaderZStreamReaderProtocolZ	TransportrGr�	enumerater�format�encode�readliner)rrR�serverZclients�iZ
stream_readerr�proto�r�w�responser	r]r
rWmsL������
�
��zProactorTests._test_pipec	Cs�t�}tj|_tjjtd|d��>}|jj�	d�}|j�
|�}|��|�t
j��|j�|�Wd�n1s9wYWd�dSWd�dS1sQwYdS)N�ConnectPipe)Zside_effectZpipe_address)�OSError�_overlapped�ERROR_PIPE_BUSY�winerrorrZpatch�objectr2�	_proactorZconnect_pipe�create_task�cancelrPr�CancelledErrorr6)r�exc�connectr5Ztaskr	r	r
�test_connect_pipe_cancel�s
���"�z&ProactorTests.test_connect_pipe_cancelcCst�dddd�}|�tj|�|jj�|d�}|j��}|j�	|�}|j��|}|�
|d�|�|���|�
d|koAdkn|�t�|�|jj�|d�}|j��}|j�	|�}|j��|}|�
|d�|�
|���|�
d|ko}dkn|�|��dS)	NTFg�?g�������?g�������?�
rg333333�?)rr�CreateEvent�
addCleanup�_winapi�CloseHandler2rv�wait_for_handler#r6rG�assertFalserHr;ZSetEventrx)r�event�futr-rV�elapsedr	r	r
�test_wait_for_handle�s$
 

 z"ProactorTests.test_wait_for_handlecCs�t�dddd�}|�tj|�|jj�|d�}|��|j�	�}|�
tj��|j�
|�Wd�n1s7wY|j�	�|}|�d|koNdkn|�|jj�|�}|��|��dS)NTFr}rr")rrr~rr�r�r2rvr�rxr#rPrryr6r;)rr�r�r-r�r	r	r
�test_wait_for_handle_cancel�s
� z)ProactorTests.test_wait_for_handle_cancelcCsrt��|j_|j�ddd��}|j��|j��|j��|j��|j�|�|�|j�|�	|jjj
�dS)NcSsdSrr	r	r	r	r
r_�r`z;ProactorTests.test_read_self_pipe_restart.<locals>.<lambda>)rZMockr2Zcall_exception_handlerZrun_in_executor�stopr.r6r0r�Zcalled)rrKr	r	r
�test_read_self_pipe_restart�s



	z)ProactorTests.test_read_self_pipe_restartcCs�|jj}tjtjd�}d}|�t��|�||�Wd�n1s#wY|�t��|j|d|d�Wd�n1s@wY|��dS)N)�typesabc)�addr)	r2rvrD�
SOCK_DGRAMrP�	TypeErrorr{�sendtor)rZproactor�sockZbad_addressr	r	r
� test_address_argument_type_error�s��z.ProactorTests.test_address_argument_type_errorcCrTrU)r2r6�_test_client_pipe_statrGrXr	r	r
�test_client_pipe_stat�r[z#ProactorTests.test_client_pipe_statc�s�dt����fdd�}|�t��|�IdHWd�n1s#wY|j�tj��IdH\}|�|t	j
�g�|j��fdd��td�D]}|j�
|��IdHqK|�t��d��|��|�t��|�IdHWd�dS1s|wYdS)	Nz!\\.\pipe\test_client_pipe_stat-%sc
�s��t���}z4z
t�t����Wnty*}z
|jtjkr �WYd}~n
d}~wwWt�|�dSWt�|�dSt�|�wr)rrrpr�r�rqrtrs)�h�e)rRr	r
�probes�
�����z3ProactorTests._test_client_pipe_stat.<locals>.probecs
��|�Sr)r)�_r)�errorsr	r
r_s
z6ProactorTests._test_client_pipe_stat.<locals>.<lambda>r\rrV)rMrNrPrar2rbrrFrcrrOZset_exception_handlerrdrwrG�lenr)rr�rjrkr	)rRr�r
r�s(��
��z$ProactorTests._test_client_pipe_stat)rrrr?rLrSrZrWr|r�r�r�r�r�r��
__classcell__r	r	rAr
r=Rs	&
"r=c@seZdZdd�Zdd�ZdS)�WinPolicyTestsc	�L�fdd�}t��}zt�t���t�|��Wt�|�dSt�|�w)Nc�����t��tj�dSr)rcr�get_running_loopZSelectorEventLoopr	rr	r
�main)�
��z5WinPolicyTests.test_selector_win_policy.<locals>.main)r�get_event_loop_policyrZWindowsSelectorEventLoopPolicy�run�rr�Z
old_policyr	rr
�test_selector_win_policy(��z'WinPolicyTests.test_selector_win_policyc	r�)Nc�r�r)rcrr�r@r	rr	r
r�7r�z5WinPolicyTests.test_proactor_win_policy.<locals>.main)rr�rZWindowsProactorEventLoopPolicyr�r�r	rr
�test_proactor_win_policy6r�z'WinPolicyTests.test_proactor_win_policyN)rrrr�r�r	r	r	r
r�&sr��__main__)rMr%rD�sysr#r+Zunittestr�platformZSkipTestrrr�rrZtest.test_asynciorZ
test_utilsrrFrZTestCaser!r4r=r�rr�r	r	r	r
�<module>s2

U�