File: //usr/local/lib/python3.7/test/test_asyncio/__pycache__/functional.cpython-37.opt-1.pyc
B
��gt � @ s� d dl Z d dlZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZG dd� d�Z G dd� d�Z
G dd� dej�ZG dd � d e�Z
G d
d� de�ZdS )� Nc @ s� e Zd Zdd� Zdd�dd�Zdd� Zd d
� Zdd� Zej d
dddd�dd�Z
ej dfdd�Zdd� Zdd� Z
ejdd� �Zdd� Zd
S )�FunctionalTestCaseMixinc C s t �� S )N)�asyncioZnew_event_loop)�self� r �8/usr/local/lib/python3.7/test/test_asyncio/functional.py�new_loop s z FunctionalTestCaseMixin.new_loopg{�G�z�?)�delayc C s | j �tj|| j d�� d S )N)�loop)r Zrun_until_completer Zsleep)r r r r r �run_loop_briefly s z(FunctionalTestCaseMixin.run_loop_brieflyc C s | j �|� | j�|� d S )N)�._FunctionalTestCaseMixin__unhandled_exceptions�appendr Zdefault_exception_handler)r r �contextr r r �loop_exception_handler s z.FunctionalTestCaseMixin.loop_exception_handlerc C sB | � � | _t�d � | j�| j� g | _tjj| _ dd� tj_d S )Nc S s d S )Nr r r r r �<lambda>! � z/FunctionalTestCaseMixin.setUp.<locals>.<lambda>)
r r r �set_event_loopZset_exception_handlerr r �events�_get_running_loop�_old_get_running_loop)r r r r �setUp s
zFunctionalTestCaseMixin.setUpc C sT z2| j �� | jr0td� t�| j� | �d� W d | jtj_ t�
d � d | _ X d S )Nz2Unexpected calls to loop.call_exception_handler():z1unexpected calls to loop.call_exception_handler())r �closer �print�pprint�failr r r r r )r r r r �tearDown# s
z FunctionalTestCaseMixin.tearDownN� � �
)�family�addr�timeout�backlog�max_clientsc
C s� |d kr<t td�r8|tjkr8t�� �}|j}W d Q R X nd}t�|tj�}|d krZtd��|dkrjtd��|�|� y|� |� |�
|� W n. tk
r� } z|�� | �W d d } ~ X Y nX t
| ||||�S )N�AF_UNIX)z 127.0.0.1r ztimeout is requiredr z#only blocking sockets are supported)�hasattr�socketr# �tempfileZNamedTemporaryFile�name�SOCK_STREAM�RuntimeError�
settimeoutZbindZlisten�OSErrorr �TestThreadedServer)
r Zserver_progr r r r! r"