HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/test_asyncio/__pycache__/test_buffered_proto.cpython-37.pyc
B

��g��@s�ddlZddlZddlmZGdd�dej�ZGdd�dej�ZGdd�deej	�Z
e�eed	�d
�Gdd�deej	��Z
ed
kr�e��dS)�N)�
functionalc@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�ReceiveStuffProtocCs||_||_dS)N)�cb�con_lost_fut)�selfrr�r�A/usr/local/lib/python3.7/test/test_asyncio/test_buffered_proto.py�__init__szReceiveStuffProto.__init__cCstd�|_|jS)N�d)�	bytearray�buffer)r�sizehintrrr�
get_buffers
zReceiveStuffProto.get_buffercCs|�|jd|��dS)N)rr)r�nbytesrrr�buffer_updatedsz ReceiveStuffProto.buffer_updatedcCs&|dkr|j�d�n|j�|�dS)N)rZ
set_resultZ
set_exception)r�excrrr�connection_lostsz!ReceiveStuffProto.connection_lostN)�__name__�
__module__�__qualname__r	rrrrrrrrsrc@seZdZdd�Zdd�ZdS)�BaseTestBufferedProtocolcCst�dS)N)�NotImplementedError)rrrr�new_loopsz!BaseTestBufferedProtocol.new_loopcs�dd���fdd�}�fdd�}�j�t�|dd��}|jd��}�j�tj||�d	�jd
��|���j�|���dS)Ns	12345678+ic�sRd����fdd���j����jj��fdd�f|��IdH\�}�IdHdS)N�cs�|7���kr��d�dS)N�1)�write)�buf)�NOISE�data�trrr�on_buf&sz^BaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.client.<locals>.on_bufcs
t���S)N)rr)�
conn_lost_futr rr�<lambda>/rz`BaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.client.<locals>.<lambda>)�loopZ
create_futureZcreate_connection)�addrZpr)rr)r!rr rr�client#s
 zNBaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.clientc�s4|���|�d�IdH|��|��IdHdS)N�)rZreadexactly�close�wait_closed)�reader�writer)rrr�on_server_client3s
zXBaseTestBufferedProtocol.test_buffered_proto_create_connection.<locals>.on_server_clientz	127.0.0.1r�)r#)	r#Zrun_until_complete�asyncioZstart_serverZsocketsZgetsocknameZwait_forr'r()rr%r+Zsrvr$r)rrr�%test_buffered_proto_create_connectionsz>BaseTestBufferedProtocol.test_buffered_proto_create_connectionN)rrrrr.rrrrrsrc@seZdZdd�ZdS)�BufferedProtocolSelectorTestscCst��S)N)r-ZSelectorEventLoop)rrrrrHsz&BufferedProtocolSelectorTests.new_loopN)rrrrrrrrr/Esr/�ProactorEventLoopzWindows onlyc@seZdZdd�ZdS)�BufferedProtocolProactorTestscCst��S)N)r-r0)rrrrrPsz&BufferedProtocolProactorTests.new_loopN)rrrrrrrrr1Lsr1�__main__)r-ZunittestZtest.test_asynciorZ
func_testsZBufferedProtocolrZFunctionalTestCaseMixinrZTestCaser/Z
skipUnless�hasattrr1r�mainrrrr�<module>s+