HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //usr/local/lib/python3.7/test/__pycache__/test_signal.cpython-37.pyc
B

��gȠ�@s�ddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZddlm
Z
mZyddlZWnek
r�dZYnXGdd�de	j�Ze	�ejdkd�Gdd	�d	e	j��Ze	�ejdkd
�Gdd�de	j��ZGd
d�de	j�Ze	�ejdkd�Gdd�de	j��Ze	�eed�d�Gdd�de	j��Ze	�ejdkd�Gdd�de	j��Ze	�ejdkd�Gdd�de	j��ZGdd�de	j�ZGdd�de	j�Zdd�Z e!dk�r�e	�"�dS) �N)�support)�assert_python_ok�spawn_pythonc@seZdZdd�ZdS)�GenericTestscCs�x�tt�D]�}tt|�}|dkr0|�|tj�q
|dkrH|�|tj�q
|�d�rl|�d�sl|�|tj�q
|�d�r
|�|tj�|�t	j
d�q
WdS)N>�SIG_DFL�SIG_IGN>�SIG_UNBLOCK�SIG_SETMASK�	SIG_BLOCKZSIGZSIG_ZCTRL_�win32)�dir�signal�getattr�assertIsInstance�HandlersZSigmasks�
startswithZSignals�assertEqual�sys�platform)�self�name�sig�r�,/usr/local/lib/python3.7/test/test_signal.py�
test_enumss

zGenericTests.test_enumsN)�__name__�
__module__�__qualname__rrrrrrsrrzNot valid on Windowsc@s4eZdZdd�Zdd�Zdd�Zdd�Zd	d
�ZdS)�
PosixTestscGsdS)Nr)r�argsrrr�trivial_signal_handler%sz!PosixTests.trivial_signal_handlercCs(|�ttjd�|�ttjd|j�dS)Ni�)�assertRaises�
ValueErrorr
�	getsignalr )rrrr�,test_out_of_range_signal_number_raises_error(sz7PosixTests.test_out_of_range_signal_number_raises_errorcCs|�ttjtjd�dS)N)r!�	TypeErrorr
�SIGUSR1)rrrr�0test_setting_signal_handler_to_none_raises_error.s
z;PosixTests.test_setting_signal_handler_to_none_raises_errorcCsZt�tj|j�}|�|tj�|�t�tj�|j�t�tj|�|�t�tj�|�dS)N)r
�SIGHUPr rrrr#)rZhuprrr�test_getsignal2szPosixTests.test_getsignalcCs&tj�t�}tj�|d�}t|�dS)Nzsignalinterproctester.py)�os�path�dirname�__file__�joinr)rr,Zscriptrrr�test_interprocess_signal;sz#PosixTests.test_interprocess_signalN)rrrr r$r'r)r/rrrrr#s
	rzWindows specificc@seZdZdd�ZdS)�WindowsSignalTestsc	Cs�dd�}t�}xTtjtjtjtjtjtjtjfD]0}t�	|�dk	r0t�|t�||��|�
|�q0W|�|�|�t
��t�d|�WdQRX|�t
��t�d|�WdQRXdS)NcSsdS)Nr)�x�yrrr�<lambda>E�z3WindowsSignalTests.test_issue9324.<locals>.<lambda>����)�setr
�SIGABRTZSIGBREAK�SIGFPE�SIGILL�SIGINT�SIGSEGV�SIGTERMr#�add�
assertTruer!r")r�handler�checkedrrrr�test_issue9324Cs
z!WindowsSignalTests.test_issue9324N)rrrrBrrrrr0Asr0c@sNeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Ze�	e
jdkd�d
d��ZdS)�
WakeupFDTestsc	CsL|�t��tjtjd�WdQRX|�t��t�tjd�WdQRXdS)N)�signumF)r!r%r
�
set_wakeup_fdr;)rrrr�test_invalid_call[szWakeupFDTests.test_invalid_callcCs t��}|�ttftj|�dS)N)rZmake_bad_fdr!r"�OSErrorr
rE)r�fdrrr�test_invalid_fdds
zWakeupFDTests.test_invalid_fdcCs0t��}|��}|��|�ttftj|�dS)N)�socket�fileno�closer!r"rGr
rE)rZsockrHrrr�test_invalid_socketis

z!WakeupFDTests.test_invalid_socketcCs�t��\}}|�tj|�|�tj|�t��\}}|�tj|�|�tj|�ttd�rrt�|d�t�|d�t�|�|�t�|�|�|�t�d�|�|�t�d�d�dS)N�set_blockingFr5)	r*�pipe�
addCleanuprL�hasattrrNr
rEr)rZr1Zw1Zr2Zw2rrr�test_set_wakeup_fd_resultps

z'WakeupFDTests.test_set_wakeup_fd_resultcCs�t��}|�|j�|�d�|��}t��}|�|j�|�d�|��}t�|�|�t�|�|�|�t�d�|�|�t�d�d�dS)NFr5)rJrPrLZsetblockingrKr
rEr)rZsock1Zfd1Zsock2Zfd2rrr� test_set_wakeup_fd_socket_result�s


z.WakeupFDTests.test_set_wakeup_fd_socket_resultrztests specific to POSIXc	Cs�t��\}}|�tj|�|�tj|�t�|d�|�t��}t�|�WdQRX|�	t
|j�d|�t�|d�t�|�t�d�dS)NTz&the fd %s must be in non-blocking modeFr5)r*rOrPrLrNr!r"r
rEr�strZ	exception)rZrfdZwfd�cmrrr�test_set_wakeup_fd_blocking�s

z)WakeupFDTests.test_set_wakeup_fd_blockingN)
rrrrFrIrMrRrS�unittest�skipIfrrrVrrrrrCYs	rCc@steZdZe�edkd�dd�dd��Ze�edkd�dd��Zd	d
�Zdd�Z	d
d�Z
e�ee
d�d�dd��ZdS)�WakeupSignalTestsNzneed _testcapiT)�orderedcGs&d�ttt|��||�}td|�dS)Naif 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        z-c)�format�tuple�map�intr)rZ	test_bodyrZZsignals�coderrr�check_wakeup�s#zWakeupSignalTests.check_wakeupc	Csjd}t��\}}z4yt�|d�Wntk
r6YnX|�d�Wdt�|�t�|�Xtd|�dS)Na)if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                _testcapi.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        �xz9OS doesn't report write() error on the read end of a pipez-c)r*rO�writerG�skipTestrLr)rr_�r�wrrr�test_wakeup_write_error�s&
z)WakeupSignalTests.test_wakeup_write_errorcCs|�dtj�dS)Na�def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        )r`r
�SIGALRM)rrrr�test_wakeup_fd_earlysz&WakeupSignalTests.test_wakeup_fd_earlycCs|�dtj�dS)Na`def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        )r`r
rg)rrrr�test_wakeup_fd_during%sz'WakeupSignalTests.test_wakeup_fd_duringcCs|�dtjtj�dS)Nz�def test():
            import _testcapi
            signal.signal(signal.SIGUSR1, handler)
            _testcapi.raise_signal(signal.SIGUSR1)
            _testcapi.raise_signal(signal.SIGALRM)
        )r`r
r&rg)rrrr�test_signumCszWakeupSignalTests.test_signum�pthread_sigmaskzneed signal.pthread_sigmask()cCs|jdtjtjdd�dS)Na�def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            _testcapi.raise_signal(signum1)
            _testcapi.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        F)rZ)r`r
r&�SIGUSR2)rrrr�test_pendingKszWakeupSignalTests.test_pending)rrrrWrX�	_testcapir`rfrhrirj�
skipUnlessrQr
rmrrrrrY�s&4"rYZ
socketpairzneed socket.socketpairc@sTeZdZe�edkd�dd��Ze�edkd�dd��Ze�edkd�dd��ZdS)	�WakeupSocketSignalTestsNzneed _testcapicCsd}td|�dS)Na�if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        _testcapi.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        z-c)r)rr_rrr�test_socket`s z#WakeupSocketSignalTests.test_socketcCs.tjdkrd}nd}dj|d�}td|�dS)N�nt�sendrba.if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        )�actionz-c)r*rr[r)rrtr_rrr�test_send_error�s

"z'WakeupSocketSignalTests.test_send_errorcCs.tjdkrd}nd}dj|d�}td|�dS)Nrrrsrba�if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        # Start with large chunk size to reduce the
        # number of send needed to fill the buffer.
        written = 0
        for chunk_size in (2 ** 16, 2 ** 8, 1):
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, socket.timeout):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        )rtz-c)r*rr[r)rrtr_rrr�test_warn_on_full_buffer�s

dz0WakeupSocketSignalTests.test_warn_on_full_buffer)	rrrrWrXrnrqrurvrrrrrp]s$+rpc@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�SiginterruptTestc	Cs�d|f}td|��t}y|j��}|jdd�\}}Wntjk
rR|��dSX||}|��}|dkr|td||f��|dkSWd	QRXd	S)
z�Perform a read during which a signal will arrive.  Return True if the
        read is interrupted by the signal and raises an exception.  Return False
        if it returns normally.
        a�if 1:
            import errno
            import os
            import signal
            import sys

            interrupt = %r
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        z-cg@)ZtimeoutF)��zChild error (exit code %s): %rryN)	r�stdout�readline�communicate�
subprocessZTimeoutExpired�kill�wait�	Exception)rZ	interruptr_�processZ
first_linerz�stderr�exitcoderrr�readpipe_interrupted s*

z%SiginterruptTest.readpipe_interruptedcCs|�d�}|�|�dS)N)r�r?)r�interruptedrrr�test_without_siginterrupt\s
z*SiginterruptTest.test_without_siginterruptcCs|�d�}|�|�dS)NT)r�r?)rr�rrr�test_siginterrupt_oncs
z%SiginterruptTest.test_siginterrupt_oncCs|�d�}|�|�dS)NF)r�ZassertFalse)rr�rrr�test_siginterrupt_offjs
z&SiginterruptTest.test_siginterrupt_offN)rrrr�r�r�r�rrrrrws<rwc@sneZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
�ej
dkd�dd��Zdd�Zdd�ZdS)�
ItimerTestcCs(d|_d|_d|_t�tj|j�|_dS)NFr)�hndl_called�
hndl_count�itimerr
rg�sig_alrm�	old_alarm)rrrr�setUptszItimerTest.setUpcCs,t�tj|j�|jdk	r(t�|jd�dS)Nr)r
rgr�r��	setitimer)rrrr�tearDownzs
zItimerTest.tearDowncGs
d|_dS)NT)r�)rrrrrr��szItimerTest.sig_alrmcGsFd|_|jdkrt�d��n|jdkr4t�tjd�|jd7_dS)NTryz.setitimer didn't disable ITIMER_VIRTUAL timer.r�)r�r�r
�ItimerErrorr��ITIMER_VIRTUAL)rrrrr�
sig_vtalrm�s

zItimerTest.sig_vtalrmcGsd|_t�tjd�dS)NTr)r�r
r��ITIMER_PROF)rrrrr�sig_prof�szItimerTest.sig_profcCs|�tjtjdd�dS)Nr5r)r!r
r�r�)rrrr�test_itimer_exc�szItimerTest.test_itimer_exccCs0tj|_t�|jd�t��|�|jd�dS)Ng�?T)r
�ITIMER_REALr�r��pauserr�)rrrr�test_itimer_real�szItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.cCs�tj|_t�tj|j�t�|jdd�t��}x<t��|dkrbtddd�}t�	|j�dkr2Pq2W|�
d�|�t�	|j�d�|�|jd	�dS)
Ng333333�?g�������?gN@i90i2	i���)ggz8timeout: likely cause: machine too slow or load too highT)
r
r�r��	SIGVTALRMr�r��time�	monotonic�pow�	getitimerrcrr�)r�
start_time�_rrr�test_itimer_virtual�s
zItimerTest.test_itimer_virtualcCs�tj|_t�tj|j�t�|jdd�t��}x<t��|dkrbtddd�}t�	|j�dkr2Pq2W|�
d�|�t�	|j�d�|�|jd�dS)	Ng�������?gN@i90i2	i���)ggz8timeout: likely cause: machine too slow or load too highT)
r
r�r��SIGPROFr�r�r�r�r�r�rcrr�)rr�r�rrr�test_itimer_prof�s
zItimerTest.test_itimer_profcCs2tj|_t�|jd�t�d�|�|jd�dS)Ng���ư>r�T)r
r�r�r�r��sleeprr�)rrrr�test_setitimer_tiny�s
zItimerTest.test_setitimer_tinyN)rrrr�r�r�r�r�r�r�rWrXrrr�r�r�rrrrr�rs
	r�c@s�eZdZdZe�eed�d�dd��Ze�eed�d�e�eed�d�dd	���Z	e�eed
�d�dd
��Z
e�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�dd��Z
e�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�d d!��Ze�eed�d�e�eed�d�d"d#���Ze�eed�d�d$d%��Ze�eed�d�d&d'��Ze�eed
�d�d(d)��Zd*S)+�PendingSignalsTestsz[
    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
    functions.
    �
sigpendingzneed signal.sigpending()cCs|�t��t��dS)N)rr
r�r7)rrrr�test_sigpending_empty�sz)PendingSignalsTests.test_sigpending_emptyrkzneed signal.pthread_sigmask()cCsd}td|�dS)Na
if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        z-c)r)rr_rrr�test_sigpending�sz#PendingSignalsTests.test_sigpending�pthread_killzneed signal.pthread_kill()cCsd}td|�dS)Na�if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        z-c)r)rr_rrr�test_pthread_killsz%PendingSignalsTests.test_pthread_killcCsd|��|f}td|�dS)zo
        test: body of the "def test(signum):" function.
        blocked: number of the blocked signal
        awif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        %s

        blocked = %s
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        z-cN)�stripr)rZblocked�testr_rrr�wait_helpers'zPendingSignalsTests.wait_helper�sigwaitzneed signal.sigwait()cCs|�tjd�dS)Na 
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        )r�r
rg)rrrr�test_sigwaitHsz PendingSignalsTests.test_sigwait�sigwaitinfozneed signal.sigwaitinfo()cCs|�tjd�dS)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        )r�r
rg)rrrr�test_sigwaitinfoTsz$PendingSignalsTests.test_sigwaitinfo�sigtimedwaitzneed signal.sigtimedwait()cCs|�tjd�dS)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        )r�r
rg)rrrr�test_sigtimedwait_sz%PendingSignalsTests.test_sigtimedwaitcCs|�tjd�dS)Nz�
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        )r�r
rg)rrrr�test_sigtimedwait_polljsz*PendingSignalsTests.test_sigtimedwait_pollcCs|�tjd�dS)Nz�
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        )r�r
rg)rrrr�test_sigtimedwait_timeoutwsz-PendingSignalsTests.test_sigtimedwait_timeoutcCstj}|�ttj|gd�dS)Ng�)r
rgr!r"r�)rrDrrr�"test_sigtimedwait_negative_timeout�sz6PendingSignalsTests.test_sigtimedwait_negative_timeoutcCstdd�dS)Nz-ca�if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        )r)rrrr�test_sigwait_thread�s	z'PendingSignalsTests.test_sigwait_threadcCsH|�ttj�|�ttjd�|�ttjddd�|�ttjdg�dS)Nr�rxryi�)r!r%r
rkrG)rrrr�test_pthread_sigmask_arguments�sz2PendingSignalsTests.test_pthread_sigmask_argumentscCsd}td|�dS)Na-	if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        z-c)r)rr_rrr�test_pthread_sigmask�sJz(PendingSignalsTests.test_pthread_sigmaskc	CsJd}td|��2}|��\}}|��}|dkr<td||f��WdQRXdS)Na7if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        z-cryzChild error (exit code %s): %s)rr|rr�)rr_r�rzr�r�rrr�test_pthread_kill_main_thread�sz1PendingSignalsTests.test_pthread_kill_main_threadN)rrr�__doc__rWrorQr
r�r�r�r�r�r�r�r�r�r�r�r�r�r�rrrrr��sB-

	Lr�c@s\eZdZdZdd�Zdd�Zdd�Ze�e	e
d�d	�d
d��Ze�e	e
d�d	�dd
��ZdS)�
StressTestz�
    Stress signal delivery, especially when a signal arrives in
    the middle of recomputing the signal state or executing
    previously tripped signal handlers.
    cCs t�||�}|�tj||�dS)N)r
rP)rrDr@Zold_handlerrrr�setsigszStressTest.setsigcs�d�g�d
��fdd�	}|�tjtjd�|�tj|�|�xt���krXt�d�q@W�fdd�t	t��d�D�}t
�|�}tj
r�td	|f�|S)N�cs,t���kr(��t���t�tjd�dS)Ng���ư>)�len�appendr�Zperf_counterr
r�r�)rD�frame)�N�timesrrr@'sz5StressTest.measure_itimer_resolution.<locals>.handlerrg����MbP?cs g|]}�|d�|�qS)r�r)�.0�i)r�rr�
<listcomp>5sz8StressTest.measure_itimer_resolution.<locals>.<listcomp>r�z,detected median itimer() resolution: %.6f s.)NN)rPr
r�r�r�rgr�r�r��range�
statisticsZmedianr�verbose�print)rr@Z	durationsZmedr)r�r�r�measure_itimer_resolution#s
z$StressTest.measure_itimer_resolutioncCs4|��}|dkrdS|dkr dS|�d|f�dS)Ng-C��6?i'g{�G�z�?�dz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r�rc)rZresorrr�decide_itimer_count;szStressTest.decide_itimer_countr�ztest needs setitimer()cs|��}g�dd�}d�fdd�	}|�tj|�|�tj|�|�tj|�d}t��d}x�||kr�t�	t�
�tj�|d7}x&t��|kr�t��|kr�t�d	�q�Wt�	t�
�tj�|d7}x&t��|kr�t��|kr�t�d	�q�Wq^W|�
t��|d
�dS)z;
        This test uses dependent signal handlers.
        cSst�tjdt��d�dS)Ng���ư>g�h㈵��>)r
r�r��random)rDr�rrr�
first_handlerRsz@StressTest.test_stress_delivery_dependent.<locals>.first_handlerNcs��|�dS)N)r�)rDr�)�sigsrr�second_handlerZszAStressTest.test_stress_delivery_dependent.<locals>.second_handlerrg.@r�g�h㈵��>zSome signals were lost)NN)r�r�r
r�r&rgr�r�r*r~�getpidr�r�r)rr�r�r��
expected_sigs�deadliner)r�r�test_stress_delivery_dependentIs&
z)StressTest.test_stress_delivery_dependentcs�|��}g��fdd�}|�tj|�|�tj|�d}t��d}xh||kr�t�tjdt	�	�d�t
�t
��tj�|d7}x&t
��|kr�t��|kr�t�d�q�WqFW|�t
��|d�d	S)
z>
        This test uses simultaneous signal handlers.
        cs��|�dS)N)r�)rDr�)r�rrr@�sz=StressTest.test_stress_delivery_simultaneous.<locals>.handlerrg.@g���ư>g�h㈵��>rxzSome signals were lostN)r�r�r
r&rgr�r�r�r�r�r*r~r�r�r�r)rr�r@r�r�r)r�r�!test_stress_delivery_simultaneousws
z,StressTest.test_stress_delivery_simultaneousN)
rrrr�r�r�r�rWrorQr
r�r�rrrrr�s-r�cCst��dS)N)rZ
reap_childrenrrrr�tearDownModule�sr��__main__)#r*r�r
rJr�r}rZ	threadingr�rWr�rZtest.support.script_helperrrrn�ImportErrorZTestCaserrXrrror0rCrYrQrprwr�r�r�r�r�mainrrrr�<module>sN


M
7
@
T
eB