HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.10/test/test_asyncio/__pycache__/test_buffered_proto.cpython-310.pyc
o

�i!	�@s�ddlZddlZddlmZdd�ZGdd�dej�ZGdd�dej�Z	Gd	d
�d
e	ej
�Ze�e
ed�d�Gd
d�de	ej
��ZedkrKe��dSdS)�N)�
functionalcCst�d�dS�N)�asyncioZset_event_loop_policy�rr�B/usr/local/lib/python3.10/test/test_asyncio/test_buffered_proto.py�tearDownModulesrc@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�ReceiveStuffProtocCs||_||_dSr)�cb�con_lost_fut)�selfr	r
rrr�__init__s
zReceiveStuffProto.__init__cCstd�|_|jS)N�d)�	bytearray�buffer)r�sizehintrrr�
get_buffers
zReceiveStuffProto.get_buffercCs|�|jd|��dSr)r	r)r�nbytesrrr�buffer_updatedsz ReceiveStuffProto.buffer_updatedcCs(|dur|j�d�dS|j�|�dSr)r
�
set_result�
set_exception)r�excrrr�connection_lostsz!ReceiveStuffProto.connection_lostN)�__name__�
__module__�__qualname__rrrrrrrrrs
rc@seZdZdd�Zdd�ZdS)�BaseTestBufferedProtocolcCst�r)�NotImplementedError�rrrr�new_loop sz!BaseTestBufferedProtocol.new_loopcszdd���fdd�}�fdd�}�j�t�|dd��}|jd��}�j�t�||�d	��|���j�|���dS)
Ns	12345678+ic�sV�d����fdd���j����jj��fdd�g|�R�IdH\�}�IdHdS)N�cs"�|7���kr��d�dSdS)N�1)�write)�buf)�NOISE�data�trrr�on_buf*s�z^BaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.client.<locals>.on_bufcs
t���Sr)rr)�
conn_lost_futr&rr�<lambda>3s
z`BaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.client.<locals>.<lambda>)�loopZ
create_future�create_connection)�addrZpr�r#r)r'r$r&r%r�client's�
��zNBaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.clientc�s6�|���|�d�IdH|��|��IdHdS)N�)r!Zreadexactly�close�wait_closed)�reader�writer)r#rr�on_server_client7s
�
zXBaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.on_server_clientz	127.0.0.1r�)	r)Zrun_until_completerZstart_serverZsockets�getsockname�wait_forr/r0)rr-r3Zsrvr+rr,r�%test_buffered_proto_create_connection#s���z>BaseTestBufferedProtocol.test_buffered_proto_create_connectionN)rrrrr7rrrrrsrc@�eZdZdd�ZdS)�BufferedProtocolSelectorTestscC�t��Sr)rZSelectorEventLooprrrrrL�z&BufferedProtocolSelectorTests.new_loopN�rrrrrrrrr9Isr9�ProactorEventLoopzWindows onlyc@r8)�BufferedProtocolProactorTestscCr:r)rr=rrrrrTr;z&BufferedProtocolProactorTests.new_loopNr<rrrrr>Psr>�__main__)rZunittestZtest.test_asynciorZ
func_testsrZBufferedProtocolrZFunctionalTestCaseMixinrZTestCaser9Z
skipUnless�hasattrr>r�mainrrrr�<module>s +���