o
    HXîh0Ã  ã                   @   s@  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZ d dlmZ d dlmZmZ zd dlZW n eyU   dZY nw G dd„ dejƒZe ejdkd¡G d	d
„ d
ejƒƒZe ejdkd¡G dd„ dejƒƒZG dd„ dejƒZe ejdkd¡G dd„ dejƒƒZe eedƒd¡G dd„ dejƒƒZe ejdkd¡e eedƒd¡G dd„ dejƒƒƒZ e ejdkd¡e eedƒoÞeedƒd¡G dd„ dejƒƒƒZ!G dd „ d ejƒZ"G d!d"„ d"ejƒZ#G d#d$„ d$ejƒZ$G d%d&„ d&ejƒZ%d'd(„ Z&e'd)kre (¡  dS dS )*é    N)Úsupport)Ú	os_helper)Úassert_python_okÚspawn_pythonc                   @   s   e Zd Zdd„ Zdd„ ZdS )ÚGenericTestsc                 C   s–   t tƒD ]D}tt|ƒ}|dv r|  |tj¡ q|dv r#|  |tj¡ q| d¡r5| d¡s5|  |tj¡ q| d¡rH|  |tj¡ |  t	j
d¡ qd S )N>   ÚSIG_DFLÚSIG_IGN>   ÚSIG_UNBLOCKÚSIG_SETMASKÚ	SIG_BLOCKZSIGZSIG_ZCTRL_Úwin32)ÚdirÚsignalÚgetattrÚassertIsInstanceÚHandlersÚSigmasksÚ
startswithÚSignalsÚassertEqualÚsysÚplatform)ÚselfÚnameÚsig© r   ú:/opt/python-3.10.19/usr/lib/python3.10/test/test_signal.pyÚ
test_enums   s   

€özGenericTests.test_enumsc                 C   s>   t tƒD ]}tt|ƒ}t |¡rt |¡s|  |jd¡ qd S )Nr   )r   r   r   ÚinspectZ	isroutineZ	isbuiltinr   Ú
__module__)r   r   Úvaluer   r   r   Útest_functions_module_attr%   s   
€ýz'GenericTests.test_functions_module_attrN)Ú__name__r   Ú__qualname__r   r!   r   r   r   r   r      s    r   r   zNot valid on Windowsc                   @   sZ   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	e
 ejd¡dd„ ƒZdS )Ú
PosixTestsc                 G   ó   d S ©Nr   ©r   Úargsr   r   r   Útrivial_signal_handler0   s   z!PosixTests.trivial_signal_handlerc                 C   s8   |   ttjd¡ |   ttjd| j¡ |   ttjd¡ d S )Ni’  )ÚassertRaisesÚ
ValueErrorr   Ú	getsignalr)   Ú	strsignal©r   r   r   r   Ú,test_out_of_range_signal_number_raises_error3   s
   ÿz7PosixTests.test_out_of_range_signal_number_raises_errorc                 C   s   |   ttjtjd ¡ d S r&   )r*   Ú	TypeErrorr   ÚSIGUSR1r.   r   r   r   Ú0test_setting_signal_handler_to_none_raises_error;   s   
ÿz;PosixTests.test_setting_signal_handler_to_none_raises_errorc                 C   sZ   t   t j| j¡}|  |t j¡ |  t  t j¡| j¡ t   t j|¡ |  t  t j¡|¡ d S r&   )r   ÚSIGHUPr)   r   r   r   r,   )r   Zhupr   r   r   Útest_getsignal?   s   ÿzPosixTests.test_getsignalc                 C   s@   |   dt tj¡¡ |   dt tj¡¡ |   dt tj¡¡ d S )NZ	InterruptZ
TerminatedZHangup)ÚassertInr   r-   ÚSIGINTÚSIGTERMr3   r.   r   r   r   Útest_strsignalG   s   zPosixTests.test_strsignalc                 C   s&   t j t¡}t j |d¡}t|ƒ d S )Nzsignalinterproctester.py)ÚosÚpathÚdirnameÚ__file__Újoinr   )r   r;   Zscriptr   r   r   Útest_interprocess_signalM   s   z#PosixTests.test_interprocess_signalc                 C   sd   t  ¡ }|  |t¡ |  t jj|¡ |  t jj|¡ |  d|¡ |  t j	|¡ |  
t|ƒt j	¡ d S ©Nr   )r   Úvalid_signalsr   Úsetr5   r   r6   ÚSIGALRMÚassertNotInÚNSIGÚ
assertLessÚlen©r   Úsr   r   r   Útest_valid_signalsR   ó   zPosixTests.test_valid_signalsúsys.executable required.c                 C   s>   	 t jtjddgt jd}|  d|j¡ |  |jt	j
 ¡ d S )Nú-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)©Ústderró   KeyboardInterrupt)Ú
subprocessÚrunr   Ú
executableÚPIPEr5   rN   r   Ú
returncoder   r6   )r   Úprocessr   r   r   Ú!test_keyboard_interrupt_exit_code[   s   ÿûz,PosixTests.test_keyboard_interrupt_exit_codeN)r"   r   r#   r)   r/   r2   r4   r8   r>   rI   ÚunittestÚ
skipUnlessr   rR   rV   r   r   r   r   r$   .   s    	r$   zWindows specificc                   @   s2   e Zd Zdd„ Zdd„ Ze ejd¡dd„ ƒZ	dS )	ÚWindowsSignalTestsc                 C   sd   t  ¡ }|  |t¡ |  t|ƒd¡ |  t jj|¡ |  	d|¡ |  	t j
|¡ |  t|ƒt j
¡ d S )Né   r   )r   r@   r   rA   ZassertGreaterEqualrF   r5   r   r6   rC   rD   rE   rG   r   r   r   rI   o   rJ   z%WindowsSignalTests.test_valid_signalsc                 C   sÜ   dd„ }t ƒ }tjtjtjtjtjtjtjfD ]}t 	|¡d ur/t |t ||¡¡ | 
|¡ q|  |¡ |  t¡ t d|¡ W d   ƒ n1 sKw   Y  |  t¡ t d|¡ W d   ƒ d S 1 sgw   Y  d S )Nc                 S   r%   r&   r   )ÚxÚyr   r   r   Ú<lambda>z   s    z3WindowsSignalTests.test_issue9324.<locals>.<lambda>éÿÿÿÿé   )rA   r   ÚSIGABRTZSIGBREAKÚSIGFPEÚSIGILLr6   ÚSIGSEGVr7   r,   ÚaddÚ
assertTruer*   r+   )r   ÚhandlerÚcheckedr   r   r   r   Útest_issue9324x   s"   þ
€
ÿ"ÿz!WindowsSignalTests.test_issue9324rK   c                 C   s>   	 t jtjddgt jd}|  d|j¡ d}|  |j|¡ d S )NrL   zraise KeyboardInterruptrM   rO   l   :   )	rP   rQ   r   rR   rS   r5   rN   r   rT   )r   rU   ZSTATUS_CONTROL_C_EXITr   r   r   rV      s   
þz4WindowsSignalTests.test_keyboard_interrupt_exit_codeN)
r"   r   r#   rI   rh   rW   rX   r   rR   rV   r   r   r   r   rY   l   s
    	rY   c                   @   sN   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Ze 	e
jdkd¡dd„ ƒZdS )ÚWakeupFDTestsc                 C   sv   |   t¡ tjtjd W d   ƒ n1 sw   Y  |   t¡ t tjd¡ W d   ƒ d S 1 s4w   Y  d S )N)ÚsignumF)r*   r0   r   Úset_wakeup_fdr6   r.   r   r   r   Útest_invalid_callž   s   ÿ"ÿzWakeupFDTests.test_invalid_callc                 C   s    t  ¡ }|  ttftj|¡ d S r&   )r   Zmake_bad_fdr*   r+   ÚOSErrorr   rk   )r   Úfdr   r   r   Útest_invalid_fd§   s   
ÿzWakeupFDTests.test_invalid_fdc                 C   s0   t   ¡ }| ¡ }| ¡  |  ttftj|¡ d S r&   )ÚsocketÚfilenoÚcloser*   r+   rm   r   rk   )r   Úsockrn   r   r   r   Útest_invalid_socket¬   s   
ÿz!WakeupFDTests.test_invalid_socketc                 C   s¶   t  ¡ \}}|  t j|¡ |  t j|¡ t  ¡ \}}|  t j|¡ |  t j|¡ tt dƒr9t  |d¡ t  |d¡ t |¡ |  t |¡|¡ |  t d¡|¡ |  t d¡d¡ d S )NÚset_blockingFr^   )	r9   ÚpipeÚ
addCleanuprr   Úhasattrru   r   rk   r   )r   Zr1Zw1Zr2Zw2r   r   r   Útest_set_wakeup_fd_result³   s   

z'WakeupFDTests.test_set_wakeup_fd_resultc                 C   s   t   ¡ }|  |j¡ | d¡ | ¡ }t   ¡ }|  |j¡ | d¡ | ¡ }t |¡ |  t |¡|¡ |  t d¡|¡ |  t d¡d¡ d S )NFr^   )rp   rw   rr   Úsetblockingrq   r   rk   r   )r   Zsock1Úfd1Zsock2Úfd2r   r   r   Ú test_set_wakeup_fd_socket_resultÄ   s   


z.WakeupFDTests.test_set_wakeup_fd_socket_resultr   ztests specific to POSIXc                 C   s¢   t  ¡ \}}|  t j|¡ |  t j|¡ t  |d¡ |  t¡}t |¡ W d   ƒ n1 s/w   Y  |  	t
|jƒd| ¡ t  |d¡ t |¡ t d¡ d S )NTz&the fd %s must be in non-blocking modeFr^   )r9   rv   rw   rr   ru   r*   r+   r   rk   r   ÚstrÚ	exception)r   ZrfdZwfdÚcmr   r   r   Útest_set_wakeup_fd_blockingÖ   s   ÿÿ
z)WakeupFDTests.test_set_wakeup_fd_blockingN)r"   r   r#   rl   ro   rt   ry   r}   rW   ÚskipIfr   r   r   r   r   r   r   ri   œ   s    	ri   c                   @   st   e Zd Ze edu d¡ddœdd„ƒZe edu d¡dd„ ƒZd	d
„ Zdd„ Z	dd„ Z
e eedƒd¡dd„ ƒZdS )ÚWakeupSignalTestsNúneed _testcapiT©Úorderedc                G   s&   d  ttt|ƒƒ||¡}td|ƒ d S )Na  if 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        rL   )ÚformatÚtupleÚmapÚintr   )r   Z	test_bodyr†   ZsignalsÚcoder   r   r   Úcheck_wakeupë   s    à"zWakeupSignalTests.check_wakeupc              	   C   s|   d}t  ¡ \}}z#zt  |d¡ W n	 ty   Y nw |  d¡ W t  |¡ t  |¡ nt  |¡ t  |¡ w td|ƒ d S )Na&  if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        ó   xz9OS doesn't report write() error on the read end of a piperL   )r9   rv   Úwriterm   ÚskipTestrr   r   )r   r‹   ÚrÚwr   r   r   Útest_wakeup_write_error  s   "ÿ
€

ÿz)WakeupSignalTests.test_wakeup_write_errorc                 C   ó   |   dtj¡ d S )Na·  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        ©rŒ   r   rB   r.   r   r   r   Útest_wakeup_fd_earlyF  s   áz&WakeupSignalTests.test_wakeup_fd_earlyc                 C   r“   )Na`  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r”   r.   r   r   r   Útest_wakeup_fd_duringh  s   åz'WakeupSignalTests.test_wakeup_fd_duringc                 C   s   |   dtjtj¡ d S )Nz§def test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )rŒ   r   r1   rB   r.   r   r   r   Útest_signum†  s   üzWakeupSignalTests.test_signumÚpthread_sigmaskúneed signal.pthread_sigmask()c                 C   s   | j dtjtjdd d S )Naæ  def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Fr…   )rŒ   r   r1   ÚSIGUSR2r.   r   r   r   Útest_pending  s   

ôzWakeupSignalTests.test_pending)r"   r   r#   rW   r‚   Ú	_testcapirŒ   r’   r•   r–   r—   rX   rx   r   r›   r   r   r   r   rƒ   é   s    &
3"ÿrƒ   Ú
socketpairzneed socket.socketpairc                   @   sT   e Zd Ze edu d¡dd„ ƒZe edu d¡dd„ ƒZe edu d¡dd„ ƒZdS )	ÚWakeupSocketSignalTestsNr„   c                 C   ó   d}t d|ƒ d S )Na´  if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        rL   ©r   ©r   r‹   r   r   r   Útest_socket¢  s   z#WakeupSocketSignalTests.test_socketc                 C   ó.   t jdkrd}nd}dj|d}td|ƒ d S )NÚntÚsendrŽ   a+  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        ©ÚactionrL   ©r9   r   r‡   r   ©r   r§   r‹   r   r   r   Útest_send_errorÆ  s   
!ß"z'WakeupSocketSignalTests.test_send_errorc                 C   r£   )Nr¤   r¥   rŽ   aJ  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        written = 0
        if sys.platform == "vxworks":
            CHUNK_SIZES = (1,)
        else:
            # Start with large chunk size to reduce the
            # number of send needed to fill the buffer.
            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
        for chunk_size in CHUNK_SIZES:
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, TimeoutError):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        r¦   rL   r¨   r©   r   r   r   Útest_warn_on_full_bufferñ  s   
g™hz0WakeupSocketSignalTests.test_warn_on_full_buffer)	r"   r   r#   rW   r‚   rœ   r¢   rª   r«   r   r   r   r   rž   Ÿ  s    
#
*rž   Úsiginterruptzneeds signal.siginterrupt()c                   @   s,   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	S )
ÚSiginterruptTestc              	   C   s¶   	 d|f }t d|ƒF}z|j ¡ }|jtjd\}}W n tjy1   | ¡  Y W d   ƒ dS w || }| 	¡ }|dvrFt
d||f ƒ‚|dkW  d   ƒ S 1 sTw   Y  d S )Naò  if 1:
            import errno
            import os
            import signal
            import sys

            interrupt = %r
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        rL   )ÚtimeoutF)é   é   zChild error (exit code %s): %rr°   )r   ÚstdoutÚreadlineÚcommunicater   ÚSHORT_TIMEOUTrP   ÚTimeoutExpiredÚkillÚwaitÚ	Exception)r   Z	interruptr‹   rU   Z
first_liner±   rN   Úexitcoder   r   r   Úreadpipe_interruptedg  s*   #Ý$
øÿ$ñz%SiginterruptTest.readpipe_interruptedc                 C   s   |   d ¡}|  |¡ d S r&   ©rº   re   ©r   Zinterruptedr   r   r   Útest_without_siginterrupt£  ó   
z*SiginterruptTest.test_without_siginterruptc                 C   ó   |   d¡}|  |¡ d S ©NTr»   r¼   r   r   r   Útest_siginterrupt_onª  r¾   z%SiginterruptTest.test_siginterrupt_onc                 C   r¿   )NF)rº   ZassertFalser¼   r   r   r   Útest_siginterrupt_off±  r¾   z&SiginterruptTest.test_siginterrupt_offN)r"   r   r#   rº   r½   rÁ   rÂ   r   r   r   r   r­   c  s
    <r­   Ú	getitimerÚ	setitimerz/needs signal.getitimer() and signal.setitimer()c                   @   sn   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	e
 ejdv d¡dd„ ƒZdd„ Zdd„ ZdS )Ú
ItimerTestc                 C   s(   d| _ d| _d | _t tj| j¡| _d S )NFr   )Úhndl_calledÚ
hndl_countÚitimerr   rB   Úsig_alrmÚ	old_alarmr.   r   r   r   ÚsetUp½  s   zItimerTest.setUpc                 C   s0   t   t j| j¡ | jd urt  | jd¡ d S d S r?   )r   rB   rÊ   rÈ   rÄ   r.   r   r   r   ÚtearDownÃ  s   
þzItimerTest.tearDownc                 G   s
   d| _ d S rÀ   )rÆ   r'   r   r   r   rÉ   É  s   
zItimerTest.sig_alrmc                 G   sD   d| _ | jdkrt d¡‚| jdkrt tjd¡ |  jd7  _d S )NTr°   z.setitimer didn't disable ITIMER_VIRTUAL timer.r   é   )rÆ   rÇ   r   ÚItimerErrorrÄ   ÚITIMER_VIRTUALr'   r   r   r   Ú
sig_vtalrmÌ  s   


zItimerTest.sig_vtalrmc                 G   s   d| _ t tjd¡ d S )NTr   )rÆ   r   rÄ   ÚITIMER_PROFr'   r   r   r   Úsig_profÙ  s   zItimerTest.sig_profc                 C   s   |   tjtjdd¡ d S )Nr^   r   )r*   r   rÎ   rÄ   ÚITIMER_REALr.   r   r   r   Útest_itimer_excÝ  s   zItimerTest.test_itimer_excc                 C   s0   t j| _t  | jd¡ t  ¡  |  | jd¡ d S )Ng      ð?T)r   rÓ   rÈ   rÄ   Úpauser   rÆ   r.   r   r   r   Útest_itimer_realæ  s   zItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.c                 C   sž   t j| _t   t j| j¡ t  | jdd¡ t ¡ }t ¡ | dk r7tdddƒ}t  	| j¡dkr/nt ¡ | dk s |  
d¡ |  t  	| j¡d¡ |  | jd	¡ d S )
Ng333333Ó?çš™™™™™É?ç      N@é90  é2	 é“–˜ ©ç        rÝ   ú8timeout: likely cause: machine too slow or load too highT)r   rÏ   rÈ   Ú	SIGVTALRMrÐ   rÄ   ÚtimeÚ	monotonicÚpowrÃ   r   r   rÆ   ©r   Z
start_timeÚ_r   r   r   Útest_itimer_virtualí  s   ü
zItimerTest.test_itimer_virtualc                 C   sž   t j| _t   t j| j¡ t  | jdd¡ t ¡ }t ¡ | dk r7tdddƒ}t  	| j¡dkr/nt ¡ | dk s |  
d¡ |  t  	| j¡d¡ |  | jd¡ d S )	Nr×   rØ   rÙ   rÚ   rÛ   rÜ   rÞ   T)r   rÑ   rÈ   ÚSIGPROFrÒ   rÄ   rà   rá   râ   rÃ   r   r   rÆ   rã   r   r   r   Útest_itimer_prof  s   ü
zItimerTest.test_itimer_profc                 C   s2   t j| _t  | jd¡ t d¡ |  | jd¡ d S )Nçíµ ÷Æ°>rÍ   T)r   rÓ   rÈ   rÄ   rà   Úsleepr   rÆ   r.   r   r   r   Útest_setitimer_tiny  s   
zItimerTest.test_setitimer_tinyN)r"   r   r#   rË   rÌ   rÉ   rÐ   rÒ   rÔ   rÖ   rW   r‚   r   r   rå   rç   rê   r   r   r   r   rÅ   ¹  s    	ÿ
rÅ   c                   @   s¸  e Zd Z	 e eedƒd¡dd„ ƒZe eedƒd¡e eedƒd¡dd„ ƒƒZe eed	ƒd
¡dd„ ƒZ	e eedƒd¡dd„ ƒZ
e eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd „ ƒZe eedƒd¡e eedƒd¡d!d"„ ƒƒZe eedƒd¡d#d$„ ƒZe eedƒd¡d%d&„ ƒZe eedƒd¡d'd(„ ƒZe eed	ƒd
¡d)d*„ ƒZd+S ),ÚPendingSignalsTestsÚ
sigpendingzneed signal.sigpending()c                 C   s   |   t ¡ tƒ ¡ d S r&   )r   r   rì   rA   r.   r   r   r   Útest_sigpending_empty&  s   z)PendingSignalsTests.test_sigpending_emptyr˜   r™   c                 C   rŸ   )Na  if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rL   r    r¡   r   r   r   Útest_sigpending+  s   z#PendingSignalsTests.test_sigpendingÚpthread_killzneed signal.pthread_kill()c                 C   rŸ   )Naâ  if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rL   r    r¡   r   r   r   Útest_pthread_killJ  s   z%PendingSignalsTests.test_pthread_killc                 C   s    	 d|  ¡ |f }td|ƒ d S )Naw  if 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        %s

        blocked = %s
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        rL   )Ústripr   )r   ZblockedÚtestr‹   r   r   r   Úwait_helperc  s
   
 à%zPendingSignalsTests.wait_helperÚsigwaitzneed signal.sigwait()c                 C   ó   |   tjd¡ d S )Na   
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        ©ró   r   rB   r.   r   r   r   Útest_sigwait‘  ó   z PendingSignalsTests.test_sigwaitÚsigwaitinfozneed signal.sigwaitinfo()c                 C   rõ   )Nz×
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        rö   r.   r   r   r   Útest_sigwaitinfo  rø   z$PendingSignalsTests.test_sigwaitinfoÚsigtimedwaitzneed signal.sigtimedwait()c                 C   rõ   )Nzá
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        rö   r.   r   r   r   Útest_sigtimedwait¨  rø   z%PendingSignalsTests.test_sigtimedwaitc                 C   rõ   )Nzþ
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        rö   r.   r   r   r   Útest_sigtimedwait_poll³  s   z*PendingSignalsTests.test_sigtimedwait_pollc                 C   rõ   )Nz¿
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        rö   r.   r   r   r   Útest_sigtimedwait_timeoutÀ  rø   z-PendingSignalsTests.test_sigtimedwait_timeoutc                 C   s   t j}|  tt j|gd¡ d S )Ng      ð¿)r   rB   r*   r+   rû   )r   rj   r   r   r   Ú"test_sigtimedwait_negative_timeoutÊ  s   z6PendingSignalsTests.test_sigtimedwait_negative_timeoutc                 C   s   t ddƒ d S )NrL   a­  if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        r    r.   r   r   r   Útest_sigwait_threadÐ  s   	z'PendingSignalsTests.test_sigwait_threadc                 C   sþ   |   ttj¡ |   ttjd¡ |   ttjddd¡ |   ttjdg ¡ |   t¡ t tjtjg¡ W d   ƒ n1 s;w   Y  |   t¡ t tjdg¡ W d   ƒ n1 sXw   Y  |   t¡ t tjdd> g¡ W d   ƒ d S 1 sxw   Y  d S )NrÍ   r¯   r°   i¤  r   iè  )r*   r0   r   r˜   rm   r+   r   rD   r.   r   r   r   Útest_pthread_sigmask_argumentsò  s   ÿÿ"ÿz2PendingSignalsTests.test_pthread_sigmask_argumentsc                 C   sJ   t  t jt  ¡ ¡}|  t jt j|¡ t  t jt  ¡ ¡}|  |t  ¡ ¡ d S r&   )r   r˜   r   r@   rw   r
   r	   ZassertLessEqualrG   r   r   r   Ú"test_pthread_sigmask_valid_signals   s   z6PendingSignalsTests.test_pthread_sigmask_valid_signalsc                 C   rŸ   )Na-	  if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        rL   r    r¡   r   r   r   Útest_pthread_sigmask	  s   Hz(PendingSignalsTests.test_pthread_sigmaskc                 C   s`   d}t d|ƒ}| ¡ \}}| ¡ }|dkrtd||f ƒ‚W d   ƒ d S 1 s)w   Y  d S )Na7  if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        rL   r°   zChild error (exit code %s): %s)r   r³   r·   r¸   )r   r‹   rU   r±   rN   r¹   r   r   r   Útest_pthread_kill_main_threadV  s   ÿÿ"ýz1PendingSignalsTests.test_pthread_kill_main_threadN)r"   r   r#   rW   rX   rx   r   rí   rî   rð   ró   r÷   rú   rü   rý   rþ   rÿ   r   r  r  r  r  r   r   r   r   rë   !  sˆ    ÿ
ÿÿÿ
ÿ
,ÿ

ÿ
	ÿ
	ÿ
ÿ
ÿ
ÿÿÿ
ÿ
ÿ
Kÿrë   c                   @   st   e Zd Z	 dd„ Zdd„ Zdd„ Ze ee	dƒd¡d	d
„ ƒZ
e ee	dƒd¡dd„ ƒZe ee	dƒd¡dd„ ƒZdS )Ú
StressTestc                 C   s    t   ||¡}|  t j ||¡ d S r&   )r   rw   )r   rj   rf   Úold_handlerr   r   r   Úsetsigw  s   zStressTest.setsigc                    s    d‰ g ‰d
‡ ‡fdd„	}|   tjtjd¡ |  tj|¡ |ƒ  tˆƒˆ k r0t d¡ tˆƒˆ k s%‡fdd„t	tˆƒd ƒD ƒ}t
 |¡}tjrNtd	|f ƒ |S )Né   c                    s0   t ˆƒˆ k rˆ t ¡ ¡ t tjd¡ d S d S )Nrè   )rF   Úappendrà   Úperf_counterr   rÄ   rÓ   ©rj   Úframe©ÚNÚtimesr   r   rf     s   ûz5StressTest.measure_itimer_resolution.<locals>.handlerr   gü©ñÒMbP?c                    s    g | ]}ˆ |d   ˆ |  ‘qS )rÍ   r   )Ú.0Úi)r  r   r   Ú
<listcomp>  s     z8StressTest.measure_itimer_resolution.<locals>.<listcomp>rÍ   z,detected median itimer() resolution: %.6f s.©NN)rw   r   rÄ   rÓ   r  rB   rF   rà   ré   ÚrangeÚ
statisticsZmedianr   ÚverboseÚprint)r   rf   Z	durationsZmedr   r  r   Úmeasure_itimer_resolution{  s   
ÿ
z$StressTest.measure_itimer_resolutionc                 C   s4   |   ¡ }|dkr
dS |dkrdS |  d|f ¡ d S )Ng-Cëâ6?i'  g{®Gáz„?éd   z^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r  r   )r   Zresor   r   r   Údecide_itimer_count“  s   
þzStressTest.decide_itimer_countrÄ   ztest needs setitimer()c                    s.  	 |   ¡ }g ‰ dd„ }d	‡ fdd„	}|  tj|¡ |  tj|¡ |  tj|¡ d}t ¡ tj	 }||k rŒt
 t
 ¡ tj¡ |d7 }tˆ ƒ|k r^t ¡ |k r^t d¡ tˆ ƒ|k r^t ¡ |k sMt
 t
 ¡ tj¡ |d7 }tˆ ƒ|k rˆt ¡ |k rˆt d¡ tˆ ƒ|k rˆt ¡ |k sw||k s4|  tˆ ƒ|d¡ d S )
Nc                 S   s   t  t jdt ¡ d  ¡ d S )Nrè   çñhãˆµøä>)r   rÄ   rÓ   Úrandomr  r   r   r   Úfirst_handlerª  s   z@StressTest.test_stress_delivery_dependent.<locals>.first_handlerc                    ó   ˆ   | ¡ d S r&   ©r	  r  ©Zsigsr   r   Úsecond_handler²  ó   zAStressTest.test_stress_delivery_dependent.<locals>.second_handlerr   rÍ   r  úSome signals were lostr  )r  r  r   ræ   r1   rB   rà   rá   r   r´   r9   r¶   ÚgetpidrF   ré   r   )r   r  r  r!  Úexpected_sigsÚdeadliner   r   r   Útest_stress_delivery_dependent¡  s.   
ÿ
ÿ÷z)StressTest.test_stress_delivery_dependentc                    sÜ   	 |   ¡ }g ‰ ‡ fdd„}|  tj|¡ |  tj|¡ d}t ¡ tj }||k rct 	tj
dt ¡ d  ¡ t t ¡ tj¡ |d7 }tˆ ƒ|k r_t ¡ |k r_t d¡ tˆ ƒ|k r_t ¡ |k sN||k s(|  tˆ ƒ|d¡ d S )Nc                    r  r&   r  r  r   r   r   rf   Ø  r"  z=StressTest.test_stress_delivery_simultaneous.<locals>.handlerr   rè   r  r¯   r#  )r  r  r   r1   rB   rà   rá   r   r´   rÄ   rÓ   r  r9   r¶   r$  rF   ré   r   )r   r  rf   r%  r&  r   r   r   Ú!test_stress_delivery_simultaneousÏ  s"   
ÿøz,StressTest.test_stress_delivery_simultaneousr1   ztest needs SIGUSR1c                    s&  t j‰d‰d‰d‰‡fdd„‰ ‡‡‡fdd„}‡ ‡‡fdd„}t   ˆˆ ¡}|  t j ˆ|¡ tj|d	}zXd}t ¡ 3}| ¡  |ƒ  d
‰| ¡  |j	d urf|  
|j	jt¡ |  dˆd›dt|j	jƒ¡ d
}W d   ƒ n1 spw   Y  |s}|  ˆd¡ |  ˆˆ¡ W d
‰| ¡  d S d
‰| ¡  w )Nr   Fc                    s   ˆ d7 ‰ d S ©NrÍ   r   r  )Únum_received_signalsr   r   Úcustom_handlerù  s   zAStressTest.test_stress_modifying_handlers.<locals>.custom_handlerc                      s"   ˆ st  ˆ¡ ˆd7 ‰ˆ rd S d S r)  )r   Úraise_signalr   )Údo_stopÚnum_sent_signalsrj   r   r   Úset_interruptsý  s   
þzAStressTest.test_stress_modifying_handlers.<locals>.set_interruptsc                     sB   ˆdk rt dƒD ]} ˆ tjfD ]}t ˆ|¡ qqˆdk sd S d S )Nr  i N  )r  r   r   )r  rf   )r+  r.  rj   r   r   Úcycle_handlers  s   ÿýzAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlers)ÚtargetTzSignal Údz ignored due to race condition)r   r1   rw   Ú	threadingÚThreadr   Zcatch_unraisable_exceptionÚstartr=   Z
unraisabler   Ú	exc_valuerm   r5   r~   ZassertGreaterrE   )r   r/  r0  r  ÚtZignoredr€   r   )r+  r-  r*  r.  rj   r   Útest_stress_modifying_handlersð  sB   


þ€òÿ
z)StressTest.test_stress_modifying_handlersN)r"   r   r#   r  r  r  rW   rX   rx   r   r'  r(  r8  r   r   r   r   r  p  s"    ÿ
,ÿ
ÿr  c                   @   s>   e Zd Zdd„ Ze ejdkd¡dd„ ƒZdd„ Z	d	d
„ Z
dS )ÚRaiseSignalTestc                 C   s<   |   t¡ t tj¡ W d   ƒ d S 1 sw   Y  d S r&   )r*   ÚKeyboardInterruptr   r,  r6   r.   r   r   r   Útest_sigint-  s   "ÿzRaiseSignalTest.test_sigintr   zWindows specific testc              
   C   sV   zd}t  |¡ |  d¡ W d S  ty* } z|jtjkrn‚ W Y d }~d S d }~ww )NrÍ   z#OSError (Invalid argument) expected)r   r,  Zfailrm   ÚerrnoÚEINVAL)r   r3   Úer   r   r   Útest_invalid_argument1  s   
þ€þz%RaiseSignalTest.test_invalid_argumentc                    sJ   d‰ ‡ fdd„}t   t j|¡}|  t j t j|¡ t  t j¡ |  ˆ ¡ d S )NFc                    s   d‰ d S rÀ   r   )ÚaÚb©Zis_okr   r   rf   ?  s   z-RaiseSignalTest.test_handler.<locals>.handler)r   r6   rw   r,  re   )r   rf   Z
old_signalr   rB  r   Útest_handler=  s   zRaiseSignalTest.test_handlerc                 C   s$   d}t d|ƒ\}}}|  d|¡ d S )Nz•if 1:
        import _thread
        class Foo():
            def __del__(self):
                _thread.interrupt_main()

        x = Foo()
        rL   s/   OSError: Signal 2 ignored due to race condition)r   r5   )r   r‹   ÚrcÚoutÚerrr   r   r   Útest__thread_interrupt_mainH  s   	z+RaiseSignalTest.test__thread_interrupt_mainN)r"   r   r#   r;  rW   r‚   r   r   r?  rC  rG  r   r   r   r   r9  +  s    
r9  c                   @   s&   e Zd Ze eedƒd¡dd„ ƒZdS )ÚPidfdSignalTestÚpidfd_send_signalzpidfd support not built inc                 C   s   |   t¡}t dtj¡ W d   ƒ n1 sw   Y  |jjtjkr)|  d¡ n|jjtj	kr5|  d¡ |  
|jjtj¡ t dt ¡ › tj¡}|  tj|¡ |  td¡ t |tjtƒ d¡ W d   ƒ n1 slw   Y  |   t¡ t |tj¡ W d   ƒ d S 1 s‰w   Y  d S )Nr   zkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)r*   rm   r   rI  r6   r   r<  ZENOSYSr   ÚEPERMr   ÚEBADFr9   Úopenr$  ÚO_DIRECTORYrw   rr   ZassertRaisesRegexr0   Úobjectr:  )r   r€   Zmy_pidfdr   r   r   Útest_pidfd_send_signalZ  s    ÿ
ÿ"ÿz&PidfdSignalTest.test_pidfd_send_signalN)r"   r   r#   rW   rX   rx   r   rO  r   r   r   r   rH  X  s    þrH  c                   C   s   t  ¡  d S r&   )r   Úreap_childrenr   r   r   r   ÚtearDownModulem  s   rQ  Ú__main__))r<  r   r9   r  r   rp   r  rP   r   r3  rà   rW   rò   r   Ztest.supportr   Ztest.support.script_helperr   r   rœ   ÚImportErrorZTestCaser   r‚   r   r$   rX   rY   ri   rƒ   rx   rž   r­   rÅ   rë   r  r9  rH  rQ  r"   Úmainr   r   r   r   Ú<module>   sf    ÿ=/M 6 DTÿe  Q <-
ÿ