File: //usr/local/lib/python3.10/test/test_asyncio/__pycache__/test_streams.cpython-310.opt-1.pyc
o
�i� � @ s� d Z ddlZddlZddlZddlZddlZddlZddlZddlZddlm Z ddl
mZ zddlZW n e
y? dZY nw ddlZddlmZ dd� ZG dd � d ej�Zed
krae�� dS dS )zTests for streams.py.� N)�mock)�
socket_helper)�utilsc C s t �d � d S �N)�asyncioZset_event_loop_policy� r r �;/usr/local/lib/python3.10/test/test_asyncio/test_streams.py�tearDownModule s r c s� e Zd ZdZ� fdd�Z� fdd�Zdd� Zdd � Zej d
d� �Z
dd
� Ze�
edu d�dd� �Zej e�
edu d�dd� ��Zdd� Zdd� Zej dd� �Zdd� Zdd� Zdd� Zd d!� Zd"d#� Zd$d%� Zd&d'� Zd(d)� Zd*d+� Zd,d-� Zd.d/� Zd0d1� Zd2d3� Z d4d5� Z!d6d7� Z"d8d9� Z#d:d;� Z$d<d=� Z%d>d?� Z&d@dA� Z'dBdC� Z(dDdE� Z)dFdG� Z*dHdI� Z+dJdK� Z,dLdM� Z-dNdO� Z.dPdQ� Z/dRdS� Z0dTdU� Z1dVdW� Z2dXdY� Z3dZd[� Z4ej d\d]� �Z5e�
e6j7d^kd_�d`da� �Z8dbdc� Z9ddde� Z:dfdg� Z;dhdi� Z<djdk� Z=dldm� Z>dndo� Z?dpdq� Z@drds� ZAdtdu� ZBdvdw� ZCdxdy� ZDdzd{� ZEd|d}� ZFd~d� ZGd�d�� ZHd�d�� ZId�d�� ZJd�d�� ZKd�d�� ZLd�d�� ZMd�d�� ZN� ZOS )��StreamTestss line1
line2
line3
c s$ t � �� t�� | _| �| j� d S r )�super�setUpr Znew_event_loop�loop�set_event_loop��self�� __class__r r r s
zStreamTests.setUpc s, t �| j� | j�� t�� t� �� d S r )�
test_utils�run_brieflyr
�close�gcZcollectr �tearDownr r r r r # s
zStreamTests.tearDownc s� g � | j �� fdd�� | j �|�\}}|�d� |�� }| j �|�}| �|d� |�� }| j �|�}| �|�d�� |� � | �� g � d S )Nc �
� � |�S r ��append�r
�ctx��messagesr r �<lambda>- �
z7StreamTests._basetest_open_connection.<locals>.<lambda>� GET / HTTP/1.0
� HTTP/1.0 200 OK
�
Test message)
r
�set_exception_handler�run_until_complete�write�readline�assertEqual�read�
assertTrue�endswithr �r �open_connection_fut�reader�writer�f�datar r r �_basetest_open_connection+ s
z%StreamTests._basetest_open_connectionc C �D t �� �}tj|j� }| �|� W d � d S 1 sw Y d S r )r �run_test_serverr �open_connection�addressr2 �r �httpdZconn_futr r r �test_open_connection9 �
"�z StreamTests.test_open_connectionc C �D t �� �}t�|j�}| �|� W d � d S 1 sw Y d S r )r �run_test_unix_serverr �open_unix_connectionr6 r2 r7 r r r �test_open_unix_connection>