HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/test_asyncio/__pycache__/test_windows_events.cpython-37.pyc
B

��gT�@s�ddlZddlZddlZddlZddlmZejdkr@e�d��ddlZddlZddl	Z	ddl	m
Z
ddlmZ
Gdd�de	j�ZGd	d
�d
e
j�ZGdd�de
j�Zed
kr�e��dS)�N)�mock�win32zWindows only)�windows_events)�utilsc@s$eZdZdd�Zdd�Zdd�ZdS)�
UpperProtocCs
g|_dS)N)�buf)�self�r	�A/usr/local/lib/python3.7/test/test_asyncio/test_windows_events.py�__init__szUpperProto.__init__cCs
||_dS)N)�trans)rrr	r	r
�connection_madeszUpperProto.connection_madecCs:|j�|�d|kr6|j�d�|j����|j��dS)N�
�)r�appendr�write�join�upper�close)r�datar	r	r
�
data_receivedszUpperProto.data_receivedN)�__name__�
__module__�__qualname__rr
rr	r	r	r
rsrcsTeZdZ�fdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
�ZS)�
ProactorTestscs$t���t��|_|�|j�dS)N)�super�setUp�asyncio�ProactorEventLoop�loopZset_event_loop)r)�	__class__r	r
r"s

zProactorTests.setUpcCsht��\}}|j�|t���}tj|j�|d�|jd�}|��|j�	|�|�
|��d�|��dS)N�d)rr)�socketZ
socketpairrZ_make_socket_transportr�ProtocolZ
ensure_futureZ	sock_recvr�run_until_complete�assertEqual�result)r�a�br�fr	r	r
�
test_close'szProactorTests.test_closec	CsBdt��}t�|�}|�t��t�|�WdQRX|��dS)Nz\\.\pipe\test_double_bind-%s)�os�getpidr�
PipeServer�assertRaises�PermissionErrorr)r�ADDRESSZserver1r	r	r
�test_double_bind0s

zProactorTests.test_double_bindcCs |j�|���}|�|d�dS)N�done)rr$�
_test_piper%)rZresr	r	r
�	test_pipe7szProactorTests.test_pipec	�s�dt��}|�t��|j�tj|�IdHWdQRX|j�t	|�IdH\}|�
|tj�g}xvt
d�D]j}tj|jd�}tj||jd��|j��fdd�|�IdH\}}|�
|tj�|��|�|�||f�qjWx,t|�D] \}\}}	|	�d�|����q�WxFt|�D]:\}\}}	|��IdH}
|�|
d�|����|	���qW|��|�t��|j�tj|�IdHWdQRXdS)	Nz\\.\pipe\_test_pipe-%s�)rcs�S)Nr	r	)�protocolr	r
�<lambda>Lrz*ProactorTests._test_pipe.<locals>.<lambda>z	lower-{}
z	LOWER-{}
r2)r+r,r.�FileNotFoundErrorrZcreate_pipe_connectionrr#Zstart_serving_piper�assertIsInstancerr-�range�StreamReaderZStreamReaderProtocolZ	Transportr%r�	enumerater�format�encode�readliner)rr0ZserverZclients�iZ
stream_readerr�proto�r�wZresponser	)r6r
r3;s8
zProactorTests._test_pipec
Csvt�}tj|_tjjtd|d��L}|jj�	d�}|j�
|�}|��|�t
j��|j�|�WdQRXWdQRXdS)NZConnectPipe)Zside_effectZpipe_address)�OSError�_overlappedZERROR_PIPE_BUSYZwinerrorrZpatch�objectr�	_proactorZconnect_pipeZcreate_task�cancelr.r�CancelledErrorr$)r�excZconnect�coroZtaskr	r	r
�test_connect_pipe_cancelas

z&ProactorTests.test_connect_pipe_cancelcCst�dddd�}|�tj|�|jj�|d�}|j��}|j�	|�}|j��|}|�
|d�|�|���|�
d|ko�dkn|�t�|�|jj�|d�}|j��}|j�	|�}|j��|}|�
|d�|�
|���|�
d|ko�dkn|�|��dS)	NTFg�?g�������?g�������?�
rg333333�?)rE�CreateEvent�
addCleanup�_winapi�CloseHandlerrG�wait_for_handle�timer$r%ZassertFalser&�
assertTrueZSetEventrH)r�event�fut�startr2�elapsedr	r	r
�test_wait_for_handlens$
 

 z"ProactorTests.test_wait_for_handlec	Cs�t�dddd�}|�tj|�|jj�|d�}|��|j�	�}|�
tj��|j�
|�WdQRX|j�	�|}|�d|ko�dkn|�|jj�|�}|��|��dS)NTFrMrg�������?)rErNrOrPrQrrGrRrHrSr.rrIr$rT)rrUrVrWrXr	r	r
�test_wait_for_handle_cancel�s
 z)ProactorTests.test_wait_for_handle_cancel)rrrrr*r1r4r3rLrYrZ�
__classcell__r	r	)r r
r s	&
"rc@seZdZdd�Zdd�ZdS)�WinPolicyTestsc	sD�fdd�}t��}zt�t���t�|��Wdt�|�XdS)Nc�s��t��tj�dS)N)r9r�get_running_loopZSelectorEventLoopr	)rr	r
�main�sz5WinPolicyTests.test_selector_win_policy.<locals>.main)r�get_event_loop_policy�set_event_loop_policyZWindowsSelectorEventLoopPolicy�run)rr^�
old_policyr	)rr
�test_selector_win_policy�s
z'WinPolicyTests.test_selector_win_policyc	sD�fdd�}t��}zt�t���t�|��Wdt�|�XdS)Nc�s��t��tj�dS)N)r9rr]rr	)rr	r
r^�sz5WinPolicyTests.test_proactor_win_policy.<locals>.main)rr_r`ZWindowsProactorEventLoopPolicyra)rr^rbr	)rr
�test_proactor_win_policy�s
z'WinPolicyTests.test_proactor_win_policyN)rrrrcrdr	r	r	r
r\�sr\�__main__)r+r"�sysZunittestr�platformZSkipTestrErPrrZtest.test_asynciorZ
test_utilsr#rZTestCaserr\rr^r	r	r	r
�<module>s"