HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/test_asyncio/__pycache__/test_pep492.cpython-37.opt-2.pyc
B

��g��@s�ddlZddlZddlZddlmZddlmZddlZddlmZ	Gdd�d�Z
Gdd�de	j�ZGd	d
�d
e�Z
Gdd�de�ZGd
d�de�Zedkr�e��dS)�N)�support)�mock)�utilsc@s.eZdZdd�Zd
dd�Zdd�Zdd	�ZdS)�FakeCorocCsdS)N�)�self�valuerr�9/usr/local/lib/python3.7/test/test_asyncio/test_pep492.py�sendsz
FakeCoro.sendNcCsdS)Nr)r�typ�val�tbrrr	�throwszFakeCoro.throwcCsdS)Nr)rrrr	�closeszFakeCoro.closeccs
dVdS)Nr)rrrr	�	__await__szFakeCoro.__await__)NN)�__name__�
__module__�__qualname__r
rrrrrrr	rs
rcseZdZ�fdd�Z�ZS)�BaseTestcsHt���t��|_t��|j_t��|j_d|jjj	_
|�|j�dS)Nr)�super�setUp�asyncioZ
BaseEventLoop�looprZMockZ_process_eventsZ	_selectorZselectZreturn_valueZset_event_loop)r)�	__class__rr	rs

zBaseTest.setUp)rrrr�
__classcell__rr)rr	rsrc@seZdZdd�Zdd�ZdS)�	LockTestscsptj�jd�tj�jd�tj�jd�tj�jd�g}�fdd�}x*|D]"}�j�||����|���qFWdS)N)rc
�s�tjd�jd�IdH��|���|4IdH�D}��|d���|���tjd�jd�IdH��|���WdQIdHRX��|���dS)Ng{�G�z�?)r)r�sleepr�assertFalse�locked�assertIs�
assertTrue)�lock�_lock)rrr	�test2sz7LockTests.test_context_manager_async_with.<locals>.test)	r�Lockr�	Condition�	Semaphore�BoundedSemaphore�run_until_completerr)r�
primitivesr#�	primitiver)rr	�test_context_manager_async_with*s

z)LockTests.test_context_manager_async_withcsptj�jd�tj�jd�tj�jd�tj�jd�g}�fdd�}x*|D]"}�j�||����|���qFWdS)N)rc
�s�tjd�jd�IdH��|�����t��h|IdH�D}��|d���|���tjd�jd�IdH��|���WdQRX��|���WdQRXdS)Ng{�G�z�?)r)	rrrrrZassertWarns�DeprecationWarningrr )r!r")rrr	r#Hsz7LockTests.test_context_manager_with_await.<locals>.test)	rr$rr%r&r'r(rr)rr)r#r*r)rr	�test_context_manager_with_await@s
z)LockTests.test_context_manager_with_awaitN)rrrr+r-rrrr	r(src@seZdZdd�ZdS)�StreamReaderTestscsTd}tj|jd����|�����fdd�}|j�|��}|�|dddg�dS)Nsline1
line2
line3)rc	�s>g}x4�2y3dH}Wn
tk
s.X|�|�q
YW|S)N)�StopAsyncIteration�append)�data�line)�streamrr	�readerasz/StreamReaderTests.test_readline.<locals>.readersline1
sline2
sline3)r�StreamReaderrZ	feed_dataZfeed_eofr(�assertEqual)rZDATAr4r1r)r3r	�
test_readlineZs
zStreamReaderTests.test_readlineN)rrrr7rrrr	r.Xsr.c@sLeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dS)�CoroutineTestscCsDdd�}|�}z|�t�|��Wd|��X|�t�t���dS)Nc�sdS)Nrrrrr	�foon�z,CoroutineTests.test_iscoroutine.<locals>.foo)r rZiscoroutinerr)rr9�frrr	�test_iscoroutinems
zCoroutineTests.test_iscoroutinecCsdd�}|�t�|��dS)Nc�sdS)Nrrrrr	r9yr:z4CoroutineTests.test_iscoroutinefunction.<locals>.foo)r rZiscoroutinefunction)rr9rrr	�test_iscoroutinefunctionxsz'CoroutineTests.test_iscoroutinefunctioncsDGdd�d��tj�fdd��}|�}|�|�d�d�|��dS)Nc@seZdZdd�ZdS)zCCoroutineTests.test_function_returning_awaitable.<locals>.AwaitablecSsdS)N)�spamr)rrrr	r~szMCoroutineTests.test_function_returning_awaitable.<locals>.Awaitable.__await__N)rrrrrrrr	�	Awaitable}sr?cs��S)Nrr)r?rr	�func�sz>CoroutineTests.test_function_returning_awaitable.<locals>.funcr>)r�	coroutiner6r
r)rr@�coror)r?r	�!test_function_returning_awaitable|s
z0CoroutineTests.test_function_returning_awaitablecsXdd���fdd�}|j�|��}|�|d�|j�d�|j�|��}|�|d�dS)Nc�sdS)Nr>rrrrr	�bar�sz5CoroutineTests.test_async_def_coroutines.<locals>.barc�s��IdHS)Nrr)rDrr	r9�sz5CoroutineTests.test_async_def_coroutines.<locals>.foor>T)rr(r6�	set_debug)rr9r1r)rDr	�test_async_def_coroutines�sz(CoroutineTests.test_async_def_coroutinescsJ�fdd�}��t��d��j�d��j�|����t��d�dS)Nc�s��t��dk�dS)Nr)r �sys�#get_coroutine_origin_tracking_depthr)rrr	�start�szOCoroutineTests.test_debug_mode_manages_coroutine_origin_tracking.<locals>.startrT)r6rGrHrrEr()rrIr)rr	�1test_debug_mode_manages_coroutine_origin_tracking�s
z@CoroutineTests.test_debug_mode_manages_coroutine_origin_trackingcsFdd��tj�fdd�����fdd�}�j�|��}��|d�dS)NcssdEdHdS)Nrr>rrrrr	�gen�s
z0CoroutineTests.test_types_coroutine.<locals>.gencs��S)Nrr)rKrr	r@�sz1CoroutineTests.test_types_coroutine.<locals>.funcc�s��}��|tj�|IdHS)N)ZassertIsInstance�types�_GeneratorWrapper)�wrapper)r@rrr	rB�sz1CoroutineTests.test_types_coroutine.<locals>.coror>)rLrArr(r6)rrBr1r)r@rKrr	�test_types_coroutine�s
z#CoroutineTests.test_types_coroutinecs4d���fdd�����fdd�}�j�|��dS)Nc�s0�jdd�}z��|djjd�Wdd}XdS)N�)�limitrr9)Z	get_stackr6�f_code�co_name)r;)�Trrr	r9�sz1CoroutineTests.test_task_print_stack.<locals>.fooc�s tj���jd���IdHdS)N)r)rZ
ensure_futurerr)rTr9rrr	�runner�sz4CoroutineTests.test_task_print_stack.<locals>.runner)rr()rrUr)rTr9rr	�test_task_print_stack�sz$CoroutineTests.test_task_print_stackc	sR�fdd����fdd�}�j�d��jtdd���j�|��WdQRXdS)Nc�stjd�jd�IdHdS)Ng�������?)r)rrrr)rrr	�afunc�sz/CoroutineTests.test_double_await.<locals>.afuncc�sJ��}tj|�jd�}z$tjd�jd�IdH|IdHWd|��XdS)N)rr)rZTaskrrZcancel)rB�t)rWrrr	rU�sz0CoroutineTests.test_double_await.<locals>.runnerTz"coroutine is being awaited already)�msg)rrEZassertRaises�RuntimeErrorr()rrUr)rWrr	�test_double_await�s	
z CoroutineTests.test_double_awaitN)rrrr<r=rCrFrJrOrVr[rrrr	r8ks
	r8�__main__)rGrLZunittestr#rrrZtest.test_asynciorZ
test_utilsrZTestCaserrr.r8r�mainrrrr	�<module>s0m