HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.10/test/__pycache__/test_signal.cpython-310.pyc
o

�i0��@s@ddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlm
Z
ddlmZddlmZmZzddlZWneyUdZYnwGdd�dej�Ze�ejdkd�Gd	d
�d
ej��Ze�ejdkd�Gdd
�d
ej��ZGdd�dej�Ze�ejdkd�Gdd�dej��Ze�eed�d�Gdd�dej��Ze�ejdkd�e�eed�d�Gdd�dej���Z e�ejdkd�e�eed�o�eed�d�Gdd�dej���Z!Gdd �d ej�Z"Gd!d"�d"ej�Z#Gd#d$�d$ej�Z$Gd%d&�d&ej�Z%d'd(�Z&e'd)k�re�(�dSdS)*�N)�support)�	os_helper)�assert_python_ok�spawn_pythonc@seZdZdd�Zdd�ZdS)�GenericTestscCs�tt�D]D}tt|�}|dvr|�|tj�q|dvr#|�|tj�q|�d�r5|�d�s5|�|tj�q|�d�rH|�|tj�|�t	j
d�qdS)N>�SIG_DFL�SIG_IGN>�SIG_SETMASK�	SIG_BLOCK�SIG_UNBLOCKZSIGZSIG_ZCTRL_�win32)�dir�signal�getattr�assertIsInstance�Handlers�Sigmasks�
startswith�Signals�assertEqual�sys�platform)�self�name�sig�r�-/usr/local/lib/python3.10/test/test_signal.py�
test_enumss

��zGenericTests.test_enumscCs>tt�D]}tt|�}t�|�rt�|�s|�|jd�qdS)Nr)r
rr�inspectZ	isroutineZ	isbuiltinr�
__module__)rr�valuerrr�test_functions_module_attr%s
��z'GenericTests.test_functions_module_attrN)�__name__r�__qualname__rr!rrrrrs
rrzNot valid on Windowsc@sZeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
�ej
d�dd��ZdS)�
PosixTestscG�dS�Nr�r�argsrrr�trivial_signal_handler0sz!PosixTests.trivial_signal_handlercCs8|�ttjd�|�ttjd|j�|�ttjd�dS)Ni�)�assertRaises�
ValueErrorr�	getsignalr)�	strsignal�rrrr�,test_out_of_range_signal_number_raises_error3s
�z7PosixTests.test_out_of_range_signal_number_raises_errorcCs|�ttjtjd�dSr&)r*�	TypeErrorr�SIGUSR1r.rrr�0test_setting_signal_handler_to_none_raises_error;s
�z;PosixTests.test_setting_signal_handler_to_none_raises_errorcCsZt�tj|j�}|�|tj�|�t�tj�|j�t�tj|�|�t�tj�|�dSr&)r�SIGHUPr)rrrr,)rZhuprrr�test_getsignal?s�zPosixTests.test_getsignalcCs@|�dt�tj��|�dt�tj��|�dt�tj��dS)NZ	InterruptZ
TerminatedZHangup)�assertInrr-�SIGINT�SIGTERMr3r.rrr�test_strsignalGszPosixTests.test_strsignalcCs&tj�t�}tj�|d�}t|�dS)Nzsignalinterproctester.py)�os�path�dirname�__file__�joinr)rr;Zscriptrrr�test_interprocess_signalMsz#PosixTests.test_interprocess_signalcCsdt��}|�|t�|�tjj|�|�tjj|�|�d|�|�tj	|�|�
t|�tj	�dS�Nr)r�
valid_signalsr�setr5rr6�SIGALRM�assertNotIn�NSIG�
assertLess�len�r�srrr�test_valid_signalsR�zPosixTests.test_valid_signals�sys.executable required.cCs<tjtjddgtjd�}|�d|j�|�|jt	j
�dS)z+KeyboardInterrupt triggers exit via SIGINT.�-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)��stderr�KeyboardInterruptN)�
subprocess�runr�
executable�PIPEr5rNr�
returncoderr6)r�processrrr�!test_keyboard_interrupt_exit_code[s��z,PosixTests.test_keyboard_interrupt_exit_codeN)r"rr#r)r/r2r4r8r>rI�unittest�
skipUnlessrrRrVrrrrr$.s	r$zWindows specificc@s2eZdZdd�Zdd�Ze�ejd�dd��Z	dS)	�WindowsSignalTestscCsdt��}|�|t�|�t|�d�|�tjj|�|�	d|�|�	tj
|�|�t|�tj
�dS)N�r)rr@rrAZassertGreaterEqualrFr5rr6rCrDrErGrrrrIorJz%WindowsSignalTests.test_valid_signalscCs�dd�}t�}tjtjtjtjtjtjtjfD]}t�	|�dur/t�|t�||��|�
|�q|�|�|�t
��t�d|�Wd�n1sKwY|�t
��t�d|�Wd�dS1sgwYdS)NcSr%r&r)�x�yrrr�<lambda>zsz3WindowsSignalTests.test_issue9324.<locals>.<lambda>����)rAr�SIGABRTZSIGBREAK�SIGFPE�SIGILLr6�SIGSEGVr7r,�add�
assertTruer*r+)r�handler�checkedrrrr�test_issue9324xs"�
�
�"�z!WindowsSignalTests.test_issue9324rKcCs<tjtjddgtjd�}|�d|j�d}|�|j|�dS)z?KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.rLzraise KeyboardInterruptrMrOl:N)	rPrQrrRrSr5rNrrT)rrUZSTATUS_CONTROL_C_EXITrrrrV�s
�z4WindowsSignalTests.test_keyboard_interrupt_exit_codeN)
r"rr#rIrhrWrXrrRrVrrrrrYls
	rYc@sNeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Ze�	e
jdkd�d
d��ZdS)�
WakeupFDTestscCsv|�t��tjtjd�Wd�n1swY|�t��t�tjd�Wd�dS1s4wYdS)N)�signumF)r*r0r�
set_wakeup_fdr6r.rrr�test_invalid_call�s�"�zWakeupFDTests.test_invalid_callcCs t��}|�ttftj|�dSr&)rZmake_bad_fdr*r+�OSErrorrrk)r�fdrrr�test_invalid_fd�s
�zWakeupFDTests.test_invalid_fdcCs0t��}|��}|��|�ttftj|�dSr&)�socket�fileno�closer*r+rmrrk)r�sockrnrrr�test_invalid_socket�s
�z!WakeupFDTests.test_invalid_socketcCs�t��\}}|�tj|�|�tj|�t��\}}|�tj|�|�tj|�ttd�r9t�|d�t�|d�t�|�|�t�|�|�|�t�d�|�|�t�d�d�dS)N�set_blockingFr^)	r9�pipe�
addCleanuprr�hasattrrurrkr)rZr1Zw1Zr2Zw2rrr�test_set_wakeup_fd_result�s

z'WakeupFDTests.test_set_wakeup_fd_resultcCs�t��}|�|j�|�d�|��}t��}|�|j�|�d�|��}t�|�|�t�|�|�|�t�d�|�|�t�d�d�dS)NFr^)rprwrr�setblockingrqrrkr)rZsock1�fd1Zsock2�fd2rrr� test_set_wakeup_fd_socket_result�s


z.WakeupFDTests.test_set_wakeup_fd_socket_resultrztests specific to POSIXcCs�t��\}}|�tj|�|�tj|�t�|d�|�t��
}t�|�Wd�n1s/wY|�	t
|j�d|�t�|d�t�|�t�d�dS)NTz&the fd %s must be in non-blocking modeFr^)r9rvrwrrrur*r+rrkr�str�	exception)rZrfdZwfd�cmrrr�test_set_wakeup_fd_blocking�s��
z)WakeupFDTests.test_set_wakeup_fd_blockingN)
r"rr#rlrortryr}rW�skipIfrrr�rrrrri�s	ric@steZdZe�edud�dd�dd��Ze�edud�dd��Zd	d
�Zdd�Z	d
d�Z
e�ee
d�d�dd��ZdS)�WakeupSignalTestsN�need _testcapiT��orderedcGs&d�ttt|��||�}td|�dS)Naif 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        rL)�format�tuple�map�intr)rZ	test_bodyr�Zsignals�coderrr�check_wakeup�s �"zWakeupSignalTests.check_wakeupc	Cs|d}t��\}}z#zt�|d�Wn	tyYnw|�d�Wt�|�t�|�nt�|�t�|�wtd|�dS)Na&if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        �xz9OS doesn't report write() error on the read end of a piperL)r9rv�writerm�skipTestrrr)rr��r�wrrr�test_wakeup_write_errors"�
�

�z)WakeupSignalTests.test_wakeup_write_errorcC�|�dtj�dS)Na�def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        �r�rrBr.rrr�test_wakeup_fd_earlyFs�z&WakeupSignalTests.test_wakeup_fd_earlycCr�)Na`def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r�r.rrr�test_wakeup_fd_duringhs�z'WakeupSignalTests.test_wakeup_fd_duringcCs|�dtjtj�dS)Nz�def test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )r�rr1rBr.rrr�test_signum�s�zWakeupSignalTests.test_signum�pthread_sigmask�need signal.pthread_sigmask()cCs|jdtjtjdd�dS)Na�def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Fr�)r�rr1�SIGUSR2r.rrr�test_pending�s

�zWakeupSignalTests.test_pending)r"rr#rWr��	_testcapir�r�r�r�r�rXrxrr�rrrrr��s&
3"�r��
socketpairzneed socket.socketpairc@sTeZdZe�edud�dd��Ze�edud�dd��Ze�edud�dd��ZdS)	�WakeupSocketSignalTestsNr�cC�d}td|�dS)Na�if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        rL�r�rr�rrr�test_socket�sz#WakeupSocketSignalTests.test_socketcC�.tjdkrd}nd}dj|d�}td|�dS)N�nt�sendr�a+if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        ��actionrL�r9rr�r�rr�r�rrr�test_send_error�s
!�"z'WakeupSocketSignalTests.test_send_errorcCr�)Nr�r�r�aJ
if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        written = 0
        if sys.platform == "vxworks":
            CHUNK_SIZES = (1,)
        else:
            # Start with large chunk size to reduce the
            # number of send needed to fill the buffer.
            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
        for chunk_size in CHUNK_SIZES:
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, TimeoutError):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        r�rLr�r�rrr�test_warn_on_full_buffer�s
g�hz0WakeupSocketSignalTests.test_warn_on_full_buffer)	r"rr#rWr�r�r�r�r�rrrrr��s
#
*r��siginterruptzneeds signal.siginterrupt()c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�SiginterruptTestc	Cs�d|f}td|��F}z|j��}|jtjd�\}}Wntjy0|��YWd�dSw||}|�	�}|dvrEt
d||f��|dkWd�S1sSwYdS)	z�Perform a read during which a signal will arrive.  Return True if the
        read is interrupted by the signal and raises an exception.  Return False
        if it returns normally.
        a�if 1:
            import errno
            import os
            import signal
            import sys

            interrupt = %r
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        rL)�timeoutNF)��zChild error (exit code %s): %rr�)r�stdout�readline�communicater�
SHORT_TIMEOUTrP�TimeoutExpired�kill�wait�	Exception)rZ	interruptr�rUZ
first_liner�rN�exitcoderrr�readpipe_interruptedgs(#�$
��$�z%SiginterruptTest.readpipe_interruptedcCs|�d�}|�|�dSr&�r�re�rZinterruptedrrr�test_without_siginterrupt��
z*SiginterruptTest.test_without_siginterruptcC�|�d�}|�|�dS�NTr�r�rrr�test_siginterrupt_on�r�z%SiginterruptTest.test_siginterrupt_oncCr�)NF)r�ZassertFalser�rrr�test_siginterrupt_off�r�z&SiginterruptTest.test_siginterrupt_offN)r"rr#r�r�r�r�rrrrr�cs
<r��	getitimer�	setitimerz/needs signal.getitimer() and signal.setitimer()c@sneZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
�ej
dvd�dd��Zdd�Zdd�ZdS)�
ItimerTestcCs(d|_d|_d|_t�tj|j�|_dS)NFr)�hndl_called�
hndl_count�itimerrrB�sig_alrm�	old_alarmr.rrr�setUp�szItimerTest.setUpcCs0t�tj|j�|jdurt�|jd�dSdSr?)rrBr�r�r�r.rrr�tearDown�s
�zItimerTest.tearDowncGs
d|_dSr�)r�r'rrrr��s
zItimerTest.sig_alrmcGsDd|_|jdkr
t�d��|jdkrt�tjd�|jd7_dS)NTr�z.setitimer didn't disable ITIMER_VIRTUAL timer.r�)r�r�r�ItimerErrorr��ITIMER_VIRTUALr'rrr�
sig_vtalrm�s


zItimerTest.sig_vtalrmcGsd|_t�tjd�dS)NTr)r�rr��ITIMER_PROFr'rrr�sig_prof�szItimerTest.sig_profcCs|�tjtjdd�dS)Nr^r)r*rr�r��ITIMER_REALr.rrr�test_itimer_exc�szItimerTest.test_itimer_exccCs0tj|_t�|jd�t��|�|jd�dS)Ng�?T)rr�r�r��pauserr�r.rrr�test_itimer_real�szItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.cCs�tj|_t�tj|j�t�|jdd�t��}t��|dkr7tddd�}t�	|j�dkr/n
t��|dks |�
d�|�t�	|j�d�|�|jd	�dS)
Ng333333�?皙�����?�N@�90�2	铖���r��8timeout: likely cause: machine too slow or load too highT)
rr�r��	SIGVTALRMr�r��time�	monotonic�powr�r�rr��rZ
start_time�_rrr�test_itimer_virtual�s�
zItimerTest.test_itimer_virtualcCs�tj|_t�tj|j�t�|jdd�t��}t��|dkr7tddd�}t�	|j�dkr/n
t��|dks |�
d�|�t�	|j�d�|�|jd�dS)	Nr�r�r�r�r�r�r�T)
rr�r��SIGPROFr�r�r�r�r�r�r�rr�r�rrr�test_itimer_profs�
zItimerTest.test_itimer_profcCs2tj|_t�|jd�t�d�|�|jd�dS)N���ư>r�T)rr�r�r�r��sleeprr�r.rrr�test_setitimer_tinys
zItimerTest.test_setitimer_tinyN)r"rr#r�r�r�r�r�r�r�rWr�rrr�r�r�rrrrr��s
	�
r�c@s�eZdZdZe�eed�d�dd��Ze�eed�d�e�eed�d�dd	���Z	e�eed
�d�dd
��Z
e�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�dd��Z
e�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�d d!��Ze�eed�d�e�eed�d�d"d#���Ze�eed�d�d$d%��Ze�eed�d�d&d'��Ze�eed�d�d(d)��Ze�eed
�d�d*d+��Zd,S)-�PendingSignalsTestsz[
    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
    functions.
    �
sigpendingzneed signal.sigpending()cCs|�t��t��dSr&)rrr�rAr.rrr�test_sigpending_empty&sz)PendingSignalsTests.test_sigpending_emptyr�r�cCr�)Na
if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rLr�r�rrr�test_sigpending+sz#PendingSignalsTests.test_sigpending�pthread_killzneed signal.pthread_kill()cCr�)Na�if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rLr�r�rrr�test_pthread_killJsz%PendingSignalsTests.test_pthread_killcCsd|��|f}td|�dS)zo
        test: body of the "def test(signum):" function.
        blocked: number of the blocked signal
        awif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        %s

        blocked = %s
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        rLN)�stripr)rZblocked�testr�rrr�wait_helpercs
 �%zPendingSignalsTests.wait_helper�sigwaitzneed signal.sigwait()cC�|�tjd�dS)Na 
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        �r�rrBr.rrr�test_sigwait��z PendingSignalsTests.test_sigwait�sigwaitinfozneed signal.sigwaitinfo()cCr�)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        r�r.rrr�test_sigwaitinfo�r�z$PendingSignalsTests.test_sigwaitinfo�sigtimedwaitzneed signal.sigtimedwait()cCr�)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r�r.rrr�test_sigtimedwait�r�z%PendingSignalsTests.test_sigtimedwaitcCr�)Nz�
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r�r.rrr�test_sigtimedwait_poll�sz*PendingSignalsTests.test_sigtimedwait_pollcCr�)Nz�
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        r�r.rrr�test_sigtimedwait_timeout�r�z-PendingSignalsTests.test_sigtimedwait_timeoutcCstj}|�ttj|gd�dS)Ng�)rrBr*r+r�)rrjrrr�"test_sigtimedwait_negative_timeout�sz6PendingSignalsTests.test_sigtimedwait_negative_timeoutcCstdd�dS)NrLa�if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        r�r.rrr�test_sigwait_thread�s	z'PendingSignalsTests.test_sigwait_threadcCs�|�ttj�|�ttjd�|�ttjddd�|�ttjdg�|�t��t�tjtjg�Wd�n1s;wY|�t��t�tjdg�Wd�n1sXwY|�t��t�tjdd>g�Wd�dS1sxwYdS)Nr�r�r�i�ri�)r*r0rr�rmr+r
rDr.rrr�test_pthread_sigmask_arguments�s��"�z2PendingSignalsTests.test_pthread_sigmask_argumentscCsJt�tjt���}|�tjtj|�t�tjt���}|�|t���dSr&)rr�r
r@rwr	rZassertLessEqualrGrrr�"test_pthread_sigmask_valid_signalssz6PendingSignalsTests.test_pthread_sigmask_valid_signalscCr�)Na-	if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        rLr�r�rrr�test_pthread_sigmask	sHz(PendingSignalsTests.test_pthread_sigmaskcCs`d}td|��}|��\}}|��}|dkrtd||f��Wd�dS1s)wYdS)Na7if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        rLr�zChild error (exit code %s): %s)rr�r�r�)rr�rUr�rNr�rrr�test_pthread_kill_main_threadVs
��"�z1PendingSignalsTests.test_pthread_kill_main_threadN)r"rr#�__doc__rWrXrxrr�r�r�r�r�r�r�r�r�r�rrrrrrrrrr�!s��
���
�
,�

�
	�
	�
�
�
���
�
�
K�r�c@sveZdZdZdd�Zdd�Zdd�Ze�e	e
d�d	�d
d��Ze�e	e
d�d	�dd
��Ze�e	e
d�d�dd��Z
dS)�
StressTestz�
    Stress signal delivery, especially when a signal arrives in
    the middle of recomputing the signal state or executing
    previously tripped signal handlers.
    cCs t�||�}|�tj||�dSr&)rrw)rrjrf�old_handlerrrr�setsigwszStressTest.setsigcs�d�g�d
��fdd�	}|�tjtjd�|�tj|�|�t���kr0t�d�t���ks%�fdd�t	t��d�D�}t
�|�}tj
rNtd	|f�|S)N�cs0t���kr��t���t�tjd�dSdS)Nr�)rF�appendr��perf_counterrr�r��rj�frame��N�timesrrrfs�z5StressTest.measure_itimer_resolution.<locals>.handlerrg����MbP?cs g|]}�|d�|�qS)r�r)�.0�i)rrr�
<listcomp>�s z8StressTest.measure_itimer_resolution.<locals>.<listcomp>r�z,detected median itimer() resolution: %.6f s.�NN)rwrr�r�rrBrFr�r��range�
statisticsZmedianr�verbose�print)rrfZ	durationsZmedrrr�measure_itimer_resolution{s
�
z$StressTest.measure_itimer_resolutioncCs4|��}|dkr
dS|dkrdS|�d|f�dS)Ng-C��6?i'g{�G�z�?�dz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))rr�)rZresorrr�decide_itimer_count�s
�zStressTest.decide_itimer_countr�ztest needs setitimer()cs,|��}g�dd�}d
�fdd�	}|�tj|�|�tj|�|�tj|�d}t��tj	}||kr�t
�t
��tj�|d7}t
��|kr]t��|kr]t�d�t
��|kr]t��|ksLt
�t
��tj�|d7}t
��|kr�t��|kr�t�d�t
��|kr�t��|ksv||ks3|�t
��|d	�dS)z;
        This test uses dependent signal handlers.
        cSst�tjdt��d�dS)Nr���h㈵��>)rr�r��randomrrrr�
first_handler�sz@StressTest.test_stress_delivery_dependent.<locals>.first_handlerNc���|�dSr&�r
r�Zsigsrr�second_handler��zAStressTest.test_stress_delivery_dependent.<locals>.second_handlerrr�r�Some signals were lostr)rrrr�r1rBr�r�rr�r9r��getpidrFr�r)rrrr"�
expected_sigs�deadlinerr!r�test_stress_delivery_dependent�s,
�
��z)StressTest.test_stress_delivery_dependentcs�|��}g��fdd�}|�tj|�|�tj|�d}t��tj}||krbt�	tj
dt��d�t�
t��tj�|d7}t��|kr^t��|kr^t�d�t��|kr^t��|ksM||ks'|�t��|d�dS)	z>
        This test uses simultaneous signal handlers.
        crr&r rr!rrrf�r#z=StressTest.test_stress_delivery_simultaneous.<locals>.handlerrr�rr�r$N)rrrr1rBr�r�rr�r�r�rr9r�r%rFr�r)rrrfr&r'rr!r�!test_stress_delivery_simultaneous�s 
��
z,StressTest.test_stress_delivery_simultaneousr1ztest needs SIGUSR1cs&tj�d�d�d��fdd�����fdd�}���fdd�}t����}|�tj�|�tj|d	�}zXd}t���3}|��|�d
�|��|j	durf|�
|j	jt�|�
d�d�d
�t|j	j��d
}Wd�n1spwY|s}|��d�|����Wd
�|��dSd
�|��w)NrFcs�d7�dS�Nr�rr)�num_received_signalsrr�custom_handler�szAStressTest.test_stress_modifying_handlers.<locals>.custom_handlercs"�st����d7��rdSdSr*)r�raise_signalr)�do_stop�num_sent_signalsrjrr�set_interrupts�s
�zAStressTest.test_stress_modifying_handlers.<locals>.set_interruptscsB�dkrtd�D]}�tjfD]}t��|�qq�dksdSdS)Nri N)rrr)rrf)r,r/rjrr�cycle_handlerss��zAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlers)�targetTzSignal �dz ignored due to race condition)rr1rw�	threading�ThreadrZcatch_unraisable_exception�startr=Z
unraisabler�	exc_valuermr5r~Z
assertGreaterrE)rr0r1r�tZignoredr�r)r,r.r+r/rjr�test_stress_modifying_handlers�sB


����
z)StressTest.test_stress_modifying_handlersN)r"rr#rrrrrWrXrxrr(r)r9rrrrrps"�
,�
�rc@s>eZdZdd�Ze�ejdkd�dd��Zdd�Z	d	d
�Z
dS)�RaiseSignalTestcCs<|�t��t�tj�Wd�dS1swYdSr&)r*�KeyboardInterruptrr-r6r.rrr�test_sigint-s"�zRaiseSignalTest.test_sigintrzWindows specific testc
CsVzd}t�|�|�d�WdSty*}z|jtjkrn�WYd}~dSd}~ww)Nr�z#OSError (Invalid argument) expected)rr-Zfailrm�errno�EINVAL)rr3�errr�test_invalid_argument1s
���z%RaiseSignalTest.test_invalid_argumentcsJd��fdd�}t�tj|�}|�tjtj|�t�tj�|���dS)NFcsd�dSr�r)�a�b�Zis_okrrrf?sz-RaiseSignalTest.test_handler.<locals>.handler)rr6rwr-re)rrfZ
old_signalrrCr�test_handler=szRaiseSignalTest.test_handlercCs$d}td|�\}}}|�d|�dS)Nz�if 1:
        import _thread
        class Foo():
            def __del__(self):
                _thread.interrupt_main()

        x = Foo()
        rLs/OSError: Signal 2 ignored due to race condition)rr5)rr��rc�out�errrrr�test__thread_interrupt_mainHs	z+RaiseSignalTest.test__thread_interrupt_mainN)r"rr#r<rWr�rrr@rDrHrrrrr:+s
r:c@s&eZdZe�eed�d�dd��ZdS)�PidfdSignalTest�pidfd_send_signalzpidfd support not built incCs |�t��}t�dtj�Wd�n1swY|jjtjkr)|�d�n|jjtj	kr5|�d�|�
|jjtj�t�
dt����tj�}|�tj|�|�td��t�|tjt�d�Wd�n1slwY|�t��t�|tj�Wd�dS1s�wYdS)Nrzkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)r*rmrrJr6rr=ZENOSYSr��EPERMr�EBADFr9�openr%�O_DIRECTORYrwrrZassertRaisesRegexr0�objectr;)rr�Zmy_pidfdrrr�test_pidfd_send_signalZs �
�"�z&PidfdSignalTest.test_pidfd_send_signalN)r"rr#rWrXrxrrPrrrrrIXs�rIcCst��dSr&)r�
reap_childrenrrrr�tearDownModulemsrR�__main__))r=rr9rrrprrPrr4r�rWr�rZtest.supportrZtest.support.script_helperrrr��ImportErrorZTestCaserr�rr$rXrYrir�rxr�r�r�r�rr:rIrRr"�mainrrrr�<module>sf�=/M6DT�eQ<-
�