HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/__pycache__/test_file_eintr.cpython-37.opt-1.pyc
B

��gf*�@sddlZddlZddlZddlZddlZddlZddlZddlZddlZe�	ej
dkd�Gdd�d��ZGdd�deej�Z
Gdd	�d	eej�ZGd
d�de�ZGdd
�d
eej�ZGdd�deej�ZGdd�de�ZGdd�deej�ZGdd�deej�Zedk�re��dS)�N�posixztests requires a posix system.c@sReZdZdd�Zdd�Zdd�Zdd	d
�Zdd�Zd
Zdd�Z	dd�Z
dd�ZdS)�TestFileIOSignalInterruptcCs
d|_dS)N)�_process)�self�r�0/usr/local/lib/python3.7/test/test_file_eintr.py�setUpszTestFileIOSignalInterrupt.setUpcCs<|jr8|j��dkr8y|j��Wntk
r6YnXdS)N)r�poll�kill�OSError)rrrr�tearDowns
z"TestFileIOSignalInterrupt.tearDowncCs
d|jS)z�Returns the infile = ... line of code for the reader process.

        subclasseses should override this to test different IO objects.
        z=import %s as io ;infile = io.FileIO(sys.stdin.fileno(), "rb"))�modname)rrrr�_generate_infile_setup_code$sz5TestFileIOSignalInterrupt._generate_infile_setup_code�TcCs~|j��dkr<t�d�y|j��Wntk
r:YnX|r^|j��\}}||7}||7}|�d||��|��f�dS)a;A common way to cleanup and fail with useful debug output.

        Kills the process if it is still running, collects remaining output
        and fails the test with an error message including the output.

        Args:
            why: Text to go after "Error from IO process" in the message.
            stdout, stderr: standard output and error from the process so
                far to include in the error message.
            communicate: bool, when True we call communicate() on the process
                after killing it to gather additional output.
        Ng�������?z/Error from IO process %s:
STDOUT:
%sSTDERR:
%s
)	rr	�timeZsleepZ	terminater�communicate�fail�decode)rZwhy�stdout�stderrrZ
stdout_endZ
stderr_endrrr�fail_with_process_info-s
z0TestFileIOSignalInterrupt.fail_with_process_infocCs6|��}tjtjddd|dd|ddgtjtjtjd�|_|jj�t	d��}|dkrl|j
d	|d
�|jj�|�d}g}xV|s�t
�
|jjgddd
�\}}}|j�tj�|d7}|dkr�|j��|�d�q�W|jj��}|dkr�|j
d|d
�|jjdd�\}	}
|jj�r2|j
d|jj|	|
dd�dS)anGeneric buffered read method test harness to validate EINTR behavior.

        Also validates that Python signal handlers are run during the read.

        Args:
            data_to_write: String to write to the child process for reading
                before sending it a signal, confirming the signal was handled,
                writing a final newline and closing the infile pipe.
            read_and_verify_code: Single "line" of code to read from a file
                object named 'infile' and validate the result.  This will be
                executed as part of a python subprocess fed data_to_write.
        z-uz-czXimport signal, sys ;signal.signal(signal.SIGINT, lambda s, f: sys.stderr.write("$\n")) ;z ;z"sys.stderr.write("Worm Sign!\n") ;zinfile.close())�stdinrrsWorm Sign!
zwhile awaiting a sign)rrrg�������?���z,reader process failed to handle our signals.s$
zwhile awaiting signal�
)�inputzexited rc=%dF)rN)r�
subprocess�Popen�sys�
executable�PIPErr�read�lenrr�write�selectZsend_signal�signal�SIGINTr
r�readliner�
returncode)r�
data_to_write�read_and_verify_codeZinfile_setup_codeZ	worm_signZsignals_sentZrlist�_Zsignal_linerrrrr�
_test_readingHs<



z'TestFileIOSignalInterrupt._test_readingz�got = infile.{read_method_name}() ;expected = {expected!r} ;assert got == expected, ("{read_method_name} returned wrong data.\n""got data %r\nexpected %r" % (got, expected))cCs|jd|jjddd�d�dS)z1readline() must handle signals and not lose data.s
hello, world!r'shello, world!
)�read_method_name�expected)r)r*N)r,�_READING_CODE_TEMPLATE�format)rrrr�
test_readline�s
z'TestFileIOSignalInterrupt.test_readlinecCs"|jd|jjdddgd�d�dS)z2readlines() must handle signals and not lose data.shello
world!�	readlinesshello
sworld!
)r-r.)r)r*N)r,r/r0)rrrr�test_readlines�s
z(TestFileIOSignalInterrupt.test_readlinescCs8|jd|jjddd�d�|jd|jjddd�d�dS)z0readall() must handle signals and not lose data.shello
world!�readalls
hello
world!
)r-r.)r)r*r!N)r,r/r0)rrrr�test_readall�sz&TestFileIOSignalInterrupt.test_readallN)rrT)�__name__�
__module__�__qualname__rrrrr,r/r1r3r5rrrrrs	
Irc@seZdZdZdS)�CTestFileIOSignalInterrupt�_ioN)r6r7r8r
rrrrr9�sr9c@seZdZdZdS)�PyTestFileIOSignalInterrupt�_pyioN)r6r7r8r
rrrrr;�sr;c@seZdZdd�Zdd�ZdS)�TestBufferedIOSignalInterruptcCs
d|jS)z?Returns the infile = ... line of code to make a BufferedReader.ziimport %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;assert isinstance(infile, io.BufferedReader))r
)rrrrr�sz9TestBufferedIOSignalInterrupt._generate_infile_setup_codecCs|jd|jjddd�d�dS)z<BufferedReader.read() must handle signals and not lose data.shello
world!r!s
hello
world!
)r-r.)r)r*N)r,r/r0)rrrrr5�s
z*TestBufferedIOSignalInterrupt.test_readallN)r6r7r8rr5rrrrr=�sr=c@seZdZdZdS)�CTestBufferedIOSignalInterruptr:N)r6r7r8r
rrrrr>�sr>c@seZdZdZdS)�PyTestBufferedIOSignalInterruptr<N)r6r7r8r
rrrrr?�sr?c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�TestTextIOSignalInterruptcCs
d|jS)z>Returns the infile = ... line of code to make a TextIOWrapper.zvimport %s as io ;infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;assert isinstance(infile, io.TextIOWrapper))r
)rrrrr�sz5TestTextIOSignalInterrupt._generate_infile_setup_codecCs|jd|jjddd�d�dS)z1readline() must handle signals and not lose data.s
hello, world!r'zhello, world!
)r-r.)r)r*N)r,r/r0)rrrrr1�s
z'TestTextIOSignalInterrupt.test_readlinecCs"|jd|jjdddgd�d�dS)z2readlines() must handle signals and not lose data.s
hello
world!r2zhello
zworld!
)r-r.)r)r*N)r,r/r0)rrrrr3�s
z(TestTextIOSignalInterrupt.test_readlinescCs|jd|jjddd�d�dS)z-read() must handle signals and not lose data.shello
world!r!z
hello
world!
)r-r.)r)r*N)r,r/r0)rrrrr5�s
z&TestTextIOSignalInterrupt.test_readallN)r6r7r8rr1r3r5rrrrr@�sr@c@seZdZdZdS)�CTestTextIOSignalInterruptr:N)r6r7r8r
rrrrrA�srAc@seZdZdZdS)�PyTestTextIOSignalInterruptr<N)r6r7r8r
rrrrrB�srB�__main__)�osr$r%rrrZunittestr:r<Z
skipUnless�namerZTestCaser9r;r=r>r?r@rArBr6�mainrrrr�<module>s(