File: //usr/local/lib/python3.10/test/test_asyncio/__pycache__/functional.cpython-310.opt-1.pyc
o
�i5 � @ s� d dl Z d dlZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl m
Z
G dd� d�ZG dd� d�ZG dd� dej
�ZG d d
� d
e�ZG dd� de�ZdS )
� N)�supportc @ s� e Zd Zdd� Zdd�dd�Zdd� Zd d
� Zdd� Zej d
e
jddd�dd�Zej e
jfdd�Z
dd� Zdd� Zejdd� �Zdd� Zd
S )�FunctionalTestCaseMixinc C s t �� S �N)�asyncioZnew_event_loop��self� r �9/usr/local/lib/python3.10/test/test_asyncio/functional.py�new_loop s z FunctionalTestCaseMixin.new_loopg{�G�z�?)�delayc C s | j �t�|�� d S r )�loopZrun_until_completer �sleep)r r r r r �run_loop_briefly s z(FunctionalTestCaseMixin.run_loop_brieflyc C s | j �|� | j�|� d S r )�._FunctionalTestCaseMixin__unhandled_exceptions�appendr Zdefault_exception_handler)r r �contextr r r �loop_exception_handler s z.FunctionalTestCaseMixin.loop_exception_handlerc C s, | � � | _t�d � | j�| j� g | _d S r )r
r r �set_event_loopZset_exception_handlerr r r r r r �setUp s
zFunctionalTestCaseMixin.setUpc C sn z-| j �� | jr#td� t�| j� | �d� W t�d � d | _ d S W t�d � d | _ d S t�d � d | _ w )Nz2Unexpected calls to loop.call_exception_handler():z1unexpected calls to loop.call_exception_handler())r �closer �print�pprint�failr r r r r r �tearDown s
�
�z FunctionalTestCaseMixin.tearDownN� �
)�family�addr�timeout�backlog�max_clientsc C s� |d u r(t td�r&|tjkr&t�� �}|j}W d � n1 s w Y nd}tj|||d�}|d u r8td��|dkr@td��|�|� t | ||||�S )N�AF_UNIX)z 127.0.0.1r )r r �timeout is requiredr �#only blocking sockets are supported)
�hasattr�socketr! �tempfile�NamedTemporaryFile�name�
create_server�RuntimeError�
settimeout�TestThreadedServer) r Zserver_progr r r r r �tmp�sockr r r �
tcp_server- s
��
�z"FunctionalTestCaseMixin.tcp_serverc C sF t � |t j�}|d u rtd��|dkrtd��|�|� t| |||�S )Nr"