
    0hh#              	          S SK r S SKrS SKrS SKrS SKrS SKrS SKrS SKrS SKrS SK	r	S SK
r
S SKrS SKrS SKrS SKJr  S SKJrJrJrJr  S SKJrJr   S SKr " S S\R4                  5      r\R8                  " \
R:                  S:H  S5       " S	 S
\R4                  5      5       r\R>                  " \
R:                  S:H  S5       " S S\R4                  5      5       r  " S S\R4                  5      r!\R8                  " \
R:                  S:H  S5       " S S\R4                  5      5       r"\R>                  " \#" \S5      S5       " S S\R4                  5      5       r$\R8                  " \
R:                  S:H  S5      \R>                  " \#" \S5      S5      \RJ                  " 5       \R>                  " \#" \S5      S5       " S S\R4                  5      5       5       5       5       r&\R8                  " \
R:                  S:H  S5      \R>                  " \#" \S5      =(       a	    \#" \S5      S5       " S S \R4                  5      5       5       r' " S! S"\R4                  5      r( " S# S$\R4                  5      r) " S% S&\R4                  5      r* " S' S(\R4                  5      r+S) r,\-S*:X  a  \R\                  " 5         gg! \ a    Sr GNjf = f)+    N)support)is_appleis_apple_mobile	os_helperthreading_helper)assert_python_okspawn_pythonc                        \ rS rSrS rS rSrg)GenericTests   c                 ,   [        [        5       GH
  n[        [        U5      nUS;   a"  U R                  U[        R                  5        M<  US;   a"  U R                  U[        R
                  5        Md  UR                  S5      (       a8  UR                  S5      (       d"  U R                  U[        R                  5        M  UR                  S5      (       d  M  U R                  U[        R                  5        U R                  [        R                  S5        GM     [        R                  " [        R                  SSS	 [        S
9n[        R                  " U[        R                  5        [        R                  " [        R                  SSS [        S
9n[        R                  " U[        R                  5        [        [        SS 5      nUbB  [        R                  " [        R                  SSS [        S
9n[        R                  " Xe5        g g )N>   SIG_DFLSIG_IGN>   	SIG_BLOCKSIG_SETMASKSIG_UNBLOCKSIGSIG_CTRL_win32Signalssignalc                     U R                  5       =(       a.    U R                  S5      =(       a    U R                  S5      (       + =(       d    U R                  S5      $ )Nr   r   r   )isupper
startswithnames    9/opt/python-3.13.8/usr/lib/python3.13/test/test_signal.py<lambda>)GenericTests.test_enums.<locals>.<lambda>+   sD    LLN Q/O8O4O0w/0    )sourceHandlersc                     U S;   $ )N)r   r    r   s    r   r   r    5   s
    T%;;r!   Sigmasksc                     U S;   $ )N)r   r   r   r%   r   s    r   r   r    >   s
    )T!Tr!   )dirr   getattrassertIsInstancer#   r&   r   r   assertEqualsysplatformenum_old_convert_IntEnum_test_simple_enum)selfr   sigCheckedSignalsCheckedHandlersr&   CheckedSigmaskss          r   
test_enumsGenericTests.test_enums   s|   KD&$'C--%%c6??;DD%%c6??;''0G0G%%c6>>:))%%c6>>:  w7   ++i0  	~v~~>,,j(;
 	@6:t4"00LL*hT!O
 ""?=  r!   c                     [        [        5       Hi  n[        [        U5      n[        R                  " U5      (       d  M0  [        R
                  " U5      (       a  MM  U R                  UR                  S5        Mk     g )Nr   )r(   r   r)   inspect	isroutine	isbuiltinr+   
__module__)r2   r   values      r   test_functions_module_attr'GenericTests.test_functions_module_attrC   sU     KDFD)E  ''0A0A%0H0H  !1!18<  r!   r%   N)__name__r=   __qualname____firstlineno__r7   r?   __static_attributes__r%   r!   r   r   r      s    %>N=r!   r   r   zNot valid on Windowsc                   P   \ rS rSrS rS rS rS rS rS r	\
R                  " \R                  R                  S5      S	5      S
 5       rS r\
R"                  " \" \S5      S5      S 5       r\
R"                  " \R*                  S5      \R.                  " 5       S 5       5       rSrg)
PosixTestsL   c                     g Nr%   r2   argss     r   trivial_signal_handler!PosixTests.trivial_signal_handlerN   s    r!   c                 D    [         R                  " U R                  U5      $ rI   )	functoolspartialrL   )r2   arguments     r   create_handler_with_partial&PosixTests.create_handler_with_partialQ   s      !<!<hGGr!   c                     U R                  [        [        R                  S5        U R                  [        [        R                  SU R                  5        U R                  [        [        R
                  S5        g )Ni  )assertRaises
ValueErrorr   	getsignalrL   	strsignalr2   s    r   ,test_out_of_range_signal_number_raises_error7PosixTests.test_out_of_range_signal_number_raises_errorT   sU    *f&6&6=*fmmT55	7 	*f&6&6=r!   c                 l    U R                  [        [        R                  [        R                  S 5        g rI   )rU   	TypeErrorr   SIGUSR1rY   s    r   0test_setting_signal_handler_to_none_raises_error;PosixTests.test_setting_signal_handler_to_none_raises_error\   s!    )V]] ..$	0r!   c                    [         R                   " [         R                  U R                  5      nU R                  U[         R                  5        U R                  [         R                  " [         R                  5      U R                  5        [         R                   " [         R                  U5        U R                  [         R                  " [         R                  5      U5        g rI   )r   SIGHUPrL   r*   r#   r+   rW   )r2   hups     r   test_getsignalPosixTests.test_getsignal`   s    mmFMM4+F+FGc6??3))&--844	6fmmS)))&--8#>r!   c                 \    " S S5      nU" 5       nU R                  SUR                  5        U R                  U5      n[        R                  " [        R                  U5      nU R                  U[        R                  5        U R                  [        R                  " [        R                  5      U5        [        R                  " [        R                  U5        U R                  [        R                  " [        R                  5      U5        U R                  SUR                  5        g )Nc                   .   ^  \ rS rSrS rU 4S jrSrU =r$ )GPosixTests.test_no_repr_is_called_on_signal_handler.<locals>.MyArgumentk   c                     SU l         g Nr   
repr_countrY   s    r   __init__PPosixTests.test_no_repr_is_called_on_signal_handler.<locals>.MyArgument.__init__l   s	    "#r!   c                 J   > U =R                   S-  sl         [        TU ]	  5       $ N   )rm   super__repr__)r2   	__class__s    r   rt   PPosixTests.test_no_repr_is_called_on_signal_handler.<locals>.MyArgument.__repr__o   s    1$w'))r!   rl   )rA   r=   rB   rC   rn   rt   rD   __classcell__)ru   s   @r   
MyArgumentrh   k   s    $* *r!   rx   r   )r+   rm   rR   r   rb   r*   r#   rW   )r2   rx   rQ   handlerrc   s        r   (test_no_repr_is_called_on_signal_handler3PosixTests.test_no_repr_is_called_on_signal_handlerh   s    	* 	* <H//0228<mmFMM73c6??3))&--8'BfmmS)))&--8#>H//0r!   netbsdz/gh-124083: strsignal is not supported on NetBSDc                 <   U R                  S[        R                  " [        R                  5      5        U R                  S[        R                  " [        R                  5      5        U R                  S[        R                  " [        R
                  5      5        g )N	Interrupt
TerminatedHangup)assertInr   rX   SIGINTSIGTERMrb   rY   s    r   test_strsignalPosixTests.test_strsignal~   s[     	k6#3#3FMM#BClF$4$4V^^$DEh 0 0 ?@r!   c                     [         R                  R                  [        5      n[         R                  R	                  US5      n[        U5        g )Nzsignalinterproctester.py)ospathdirname__file__joinr   )r2   r   scripts      r   test_interprocess_signal#PosixTests.test_interprocess_signal   s1    ''//(+g'AB r!   valid_signalszrequires signal.valid_signalsc                    [         R                  " 5       nU R                  U[        5        U R	                  [         R
                  R                  U5        U R	                  [         R
                  R                  U5        U R                  SU5        U R                  [         R                  U5        U R                  [        U5      [         R                  5        [        [         5       H}  nUR                  S5      (       d  M  US;   a  M#  U R                  US9   [        [         U5      nU R!                  US5        U R                  U[         R                  5        S S S 5        M     g ! , (       d  f       M  = f)Nr   r   >   r   r   r   )r   r   r*   setr   r   r   SIGALRMassertNotInNSIG
assertLesslenr(   r   subTestr)   assertGreaterEqual)r2   sr   signums       r   test_valid_signalsPosixTests.test_valid_signals   s   
   "a%fnn++Q/fnn,,a0Aa(A, KD??5))--4( .''24 )(   )(s   AE,,
E;	sys.executable required.c                     [         R                  " [        R                  SS/[         R                  S9nU R                  SUR                  5        U R                  UR                  [        R                  * 5        g)z+KeyboardInterrupt triggers exit via SIGINT.-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)stderr   KeyboardInterruptN)
subprocessrunr,   
executablePIPEr   r   r+   
returncoder   r   )r2   processs     r   !test_keyboard_interrupt_exit_code,PosixTests.test_keyboard_interrupt_exit_code   s_     ..9: "( 	*GNN;++fmm^<r!   r%   N)rA   r=   rB   rC   rL   rR   rZ   r_   rd   rz   unittestskipIfr,   r-   r   r   r   
skipUnlesshasattrr   r   r   r   requires_subprocessr   rD   r%   r!   r   rF   rF   L   s    H>0?1, __S\\,,X6FHAHA!
 ('5	5, )CD  "	= # E	=r!   rF   zWindows specificc                       \ rS rSrS rS r\R                  " \R                  S5      \
R                  " 5       S 5       5       rSrg)WindowsSignalTests   c                    [         R                  " 5       nU R                  U[        5        U R	                  [        U5      S5        U R                  [         R                  R                  U5        U R                  SU5        U R                  [         R                  U5        U R                  [        U5      [         R                  5        g )N   r   )r   r   r*   r   r   r   r   r   r   r   r   r   r2   r   s     r   r   %WindowsSignalTests.test_valid_signals   s      "a%A*fnn++Q/Aa(A,r!   c                    S n[        5       n[        R                  [        R                  [        R                  [        R
                  [        R                  [        R                  [        R                  4 HX  n[        R                  " U5      c  M  [        R                  " U[        R                  " X15      5        UR                  U5        MZ     U R                  U5        U R                  [        5         [        R                  " SU5        S S S 5        U R                  [        5         [        R                  " SU5        S S S 5        g ! , (       d  f       ND= f! , (       d  f       g = f)Nc                     g rI   r%   )xys     r   r   3WindowsSignalTests.test_issue9324.<locals>.<lambda>   s    tr!      )r   r   SIGABRTSIGBREAKSIGFPESIGILLr   SIGSEGVr   rW   add
assertTruerU   rV   )r2   ry   checkedr3   s       r   test_issue9324!WindowsSignalTests.test_issue9324   s    #%NNFOOV]]MM6==&..NN$C
 $0c6==#>?C $ 	 z*MM"g& + z*MM!W% +* +* +*s   ;E0E"
E"
E0r   c                     [         R                  " [        R                  SS/[         R                  S9nU R                  SUR                  5        SnU R                  UR                  U5        g)z?KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.r   zraise KeyboardInterruptr   r   l   :   N)	r   r   r,   r   r   r   r   r+   r   )r2   r   STATUS_CONTROL_C_EXITs      r   r   4WindowsSignalTests.test_keyboard_interrupt_exit_code   sZ     ..'@A!( 	*GNN; *++-BCr!   r%   N)rA   r=   rB   rC   r   r   r   r   r,   r   r   r   r   rD   r%   r!   r   r   r      sF    -&* )CD  "D # EDr!   r   c                   2   \ rS rSrS rS r\R                  " \R                  S5      S 5       r
\R                  " \R                  S5      \R                  " \" \S5      S5      S	 5       5       r\R                  " \R                  S5      \R                  " \R                  S5      S
 5       5       r\R                  " \R$                  S:H  S5      \R                  " \R                  S5      \R                  " \" \S5      S5      S 5       5       5       rSrg)WakeupFDTests   c                 N   U R                  [        5         [        R                  " [        R                  S9  S S S 5        U R                  [        5         [        R                  " [        R                  S5        S S S 5        g ! , (       d  f       NR= f! , (       d  f       g = f)N)r   F)rU   r]   r   set_wakeup_fdr   rY   s    r   test_invalid_callWakeupFDTests.test_invalid_call   se    y)  6 * y)  6 *)	 *) *)s   #B&B
B
B$c                     [         R                  " 5       nU R                  [        [        4[
        R                  U5        g rI   )r   make_bad_fdrU   rV   OSErrorr   r   )r2   fds     r   test_invalid_fdWakeupFDTests.test_invalid_fd   s0    ""$:w/ ..	4r!   zneeds working sockets.c                     [         R                   " 5       nUR                  5       nUR                  5         U R                  [        [
        4[        R                  U5        g rI   )socketfilenocloserU   rV   r   r   r   )r2   sockr   s      r   test_invalid_socket!WakeupFDTests.test_invalid_socket   sA    }}[[]

:w/ ..	4r!   zEmscripten cannot fstat pipes.piperequires os.pipe()c                    [         R                  " 5       u  pU R                  [         R                  U5        U R                  [         R                  U5        [         R                  " 5       u  p4U R                  [         R                  U5        U R                  [         R                  U5        [	        [         S5      (       a.  [         R
                  " US5        [         R
                  " US5        [        R                  " U5        U R                  [        R                  " U5      U5        U R                  [        R                  " S5      U5        U R                  [        R                  " S5      S5        g )Nset_blockingFr   )	r   r   
addCleanupr   r   r   r   r   r+   )r2   r1w1r2w2s        r   test_set_wakeup_fd_result'WakeupFDTests.test_set_wakeup_fd_result  s     "%"%"%"%2~&&OOB&OOB&R --b126--b126--b126r!   c                 X   [         R                   " 5       nU R                  UR                  5        UR                  S5        UR	                  5       n[         R                   " 5       nU R                  UR                  5        UR                  S5        UR	                  5       n[
        R                  " U5        U R                  [
        R                  " U5      U5        U R                  [
        R                  " S5      U5        U R                  [
        R                  " S5      S5        g )NFr   )r   r   r   setblockingr   r   r   r+   )r2   sock1fd1sock2fd2s        r    test_set_wakeup_fd_socket_result.WakeupFDTests.test_set_wakeup_fd_socket_result  s     $% lln$% llnS!--c2C8--b137--b126r!   r   ztests specific to POSIXc                 @   [         R                  " 5       u  pU R                  [         R                  U5        U R                  [         R                  U5        [         R                  " US5        U R                  [        5       n[        R                  " U5        S S S 5        U R                  [        WR                  5      SU-  5        [         R                  " US5        [        R                  " U5        [        R                  " S5        g ! , (       d  f       Nz= f)NTz&the fd %s must be in non-blocking modeFr   )r   r   r   r   r   rU   rV   r   r   r+   str	exception)r2   rfdwfdcms       r   test_set_wakeup_fd_blocking)WakeupFDTests.test_set_wakeup_fd_blocking)  s     779#&#& 	T"z*b  % +R\\*ACG	I 	U#S!R  +*s   D
Dr%   N)rA   r=   rB   rC   r   r   r   r   r   has_socket_supportr   r   is_emscriptenr   r   r   r   r,   r-   r   rD   r%   r!   r   r   r      s!   74
 335MN4 O4 __W**,LMV,.BC7 D N7" __W**,LM335MN7 O N7$ __S\\W,.GH__W**,LMV,.BC! D N I!r!   r   c                      \ rS rSr\R
                  " \SL S5      SS.S j5       r\R
                  " \SL S5      \R                  " \	" \
S5      S5      S	 5       5       rS
 rS rS r\R                  " \	" \S5      S5      S 5       rSrg)WakeupSignalTestsi>  Nneed _testcapiTorderedc                n    SR                  [        [        [        U5      5      X!5      n[	        SU5        g )Na  if 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        r   )formattuplemapintr   )r2   	test_bodyr   signalscodes        r   check_wakeupWakeupSignalTests.check_wakeup@  s5     @ F5S'*+W@A 	D 	t$r!   r   r   c                 z   Sn[         R                  " 5       u  p# [         R                  " US5        U R                  S5        [         R
                  " U5        [         R
                  " U5        [        SU5        g ! [         a     NEf = f! [         R
                  " U5        [         R
                  " U5        f = f)Na&  if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
           xz9OS doesn't report write() error on the read end of a piper   )r   r   writeskipTestr   r   r   )r2   r  rws       r   test_wakeup_write_error)WakeupSignalTests.test_wakeup_write_errorg  s    !D wwy	HHQ MMUVHHQKHHQKt$  		
 HHQKHHQKs(   A< B <
B	B B		B .B:c                 D    U R                  S[        R                  5        g )Na  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r	  r   r   rY   s    r   test_wakeup_fd_early&WakeupSignalTests.test_wakeup_fd_early  s     > ^^?	r!   c                 D    U R                  S[        R                  5        g )Na`  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r  rY   s    r   test_wakeup_fd_during'WakeupSignalTests.test_wakeup_fd_during  s     6 ^^7	r!   c                 b    U R                  S[        R                  [        R                  5        g )Nzdef test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )r	  r   r^   r   rY   s    r   test_signumWakeupSignalTests.test_signum  s$      ^^V^^		-r!   pthread_sigmaskneed signal.pthread_sigmask()c                 `    U R                  S[        R                  [        R                  SS9  g )Na  def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Fr   )r	  r   r^   SIGUSR2rY   s    r   test_pendingWakeupSignalTests.test_pending  s-     	  nnfnne 	 	=r!   r%   )rA   r=   rB   rC   r   r   	_testcapir	  r   r   r   r  r  r  r  r   r!  rD   r%   r!   r   r   r   >  s    __Y$&(898< $% :$%L __Y$&(89V,.BC1% D :1%f D<- ):;8:=:=r!   r   
socketpairzneed socket.socketpairc                       \ rS rSr\R
                  " \SL S5      S 5       r\R
                  " \SL S5      S 5       r\R
                  " \SL S5      S 5       r	Sr
g)WakeupSocketSignalTestsi  Nr   c                      Sn[        SU5        g )Na  if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        r   r   r2   r  s     r   test_socket#WakeupSocketSignalTests.test_socket  s    > 	t$r!   c                 l    [         R                  S:X  a  SnOSnSR                  US9n[        SU5        g )Nntsendr  a+  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        actionr   r   r   r  r   r2   r0  r  s      r   test_send_error'WakeupSocketSignalTests.test_send_error  s@     77d?FF!B F&F!C 	D 	t$r!   c                 l    [         R                  S:X  a  SnOSnSR                  US9n[        SU5        g )Nr-  r.  r  aJ  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        written = 0
        if sys.platform == "vxworks":
            CHUNK_SIZES = (1,)
        else:
            # Start with large chunk size to reduce the
            # number of send needed to fill the buffer.
            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
        for chunk_size in CHUNK_SIZES:
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, TimeoutError):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        r/  r   r1  r2  s      r   test_warn_on_full_buffer0WakeupSocketSignalTests.test_warn_on_full_bufferG  sA     77d?FFgN F&F!O 	P 	t$r!   r%   )rA   r=   rB   rC   r   r   r#  r*  r3  r6  rD   r%   r!   r   r&  r&    su     __Y$&(89!% :!%F __Y$&(89(% :(%T __Y$&(89n% :n%r!   r&  siginterruptzneeds signal.siginterrupt()r   r   c                   p    \ rS rSr\R
                  4S jrS rS r\R                  " S5      S 5       r
Srg)	SiginterruptTesti  c                 ~   SU< S3n[        SU5       n UR                  R                  5       nUR                  US9u  pgXV-   nUR	                  5       nUS;  a  [        SU< SU< 35      eUS:H  sS	S	S	5        $ ! [        R                   a    UR                  5          S	S	S	5        g
f = f! , (       d  f       g	= f)zPerform a read during which a signal will arrive.  Return True if the
read is interrupted by the signal and raises an exception.  Return False
if it returns normally.
zif 1:
            import errno
            import os
            import signal
            import sys

            interrupt = aq  
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        r   timeout)      Child error (exit code ): r?  NF)	r	   stdoutreadlinecommunicatewait	Exceptionr   TimeoutExpiredkill)	r2   	interruptr=  r  r   
first_linerB  r   exitcodes	            r   readpipe_interrupted%SiginterruptTest.readpipe_interrupted  s    T G#H $%'$^^446
!(!4!4W!4!E
 $,"<<>6)#'/%9 : : A &% ,,  &% &%s.   B.+A<1B.<$B+ B.*B++B..
B<c                 H    U R                  S 5      nU R                  U5        g rI   rL  r   r2   interrupteds     r   test_without_siginterrupt*SiginterruptTest.test_without_siginterrupt        //5$r!   c                 H    U R                  S5      nU R                  U5        g NTrO  rP  s     r   test_siginterrupt_on%SiginterruptTest.test_siginterrupt_on  rT  r!   walltimec                 F    U R                  SSS9nU R                  U5        g )NFr>  r<  )rL  assertFalserP  s     r   test_siginterrupt_off&SiginterruptTest.test_siginterrupt_off	  s'    
 //q/A%r!   r%   N)rA   r=   rB   rC   r   SHORT_TIMEOUTrL  rR  rW  requires_resourcer\  rD   r%   r!   r   r:  r:    s<     7>6K6K :'x%% z*& +&r!   r:  	getitimer	setitimerz/needs signal.getitimer() and signal.setitimer()c                       \ rS rSrS rS rS rS rS rS r	S r
\R                  " \R                  S	;   =(       d    \S
5      S 5       rS rS rSrg)
ItimerTesti  c                     SU l         SU l        S U l        [        R                  " [        R                  U R
                  5      U l        g )NFr   )hndl_called
hndl_countitimerr   r   sig_alrm	old_alarmrY   s    r   setUpItimerTest.setUp  s2     v~~t}}Er!   c                     [         R                   " [         R                  U R                  5        U R                  b"  [         R                  " U R                  S5        g g rk   )r   r   ri  rg  ra  rY   s    r   tearDownItimerTest.tearDown  s;    fnndnn5;;"T[[!, #r!   c                     SU l         g rV  )re  rJ   s     r   rh  ItimerTest.sig_alrm"  s
    r!   c                     SU l         U R                  S:  a  [        R                  " S5      eU R                  S:X  a%  [        R                  " [        R
                  S5        U =R                  S-  sl        g )NTr?  z.setitimer didn't disable ITIMER_VIRTUAL timer.r   rr   )re  rf  r   ItimerErrorra  ITIMER_VIRTUALrJ   s     r   
sig_vtalrmItimerTest.sig_vtalrm%  s^    ??Q$$ &  __!V22A61r!   c                 \    SU l         [        R                  " [        R                  S5        g )NTr   )re  r   ra  ITIMER_PROFrJ   s     r   sig_profItimerTest.sig_prof2  s     ++Q/r!   c                 d    U R                  [        R                  [        R                  SS5        g )Nr   r   )rU   r   rr  ra  ITIMER_REALrY   s    r   test_itimer_excItimerTest.test_itimer_exc6  s'     	&,,f.>.>AFr!   c                     [         R                  U l        [         R                  " U R                  S5        [         R                  " 5         U R                  U R                  S5        g )Ng      ?T)r   r{  rg  ra  pauser+   re  rY   s    r   test_itimer_realItimerTest.test_itimer_real?  sB    ((c*))40r!   )netbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.c                 @   [         R                  U l        [         R                   " [         R                  U R                  5        [         R
                  " U R                  SS5        [        R                  " [        R                  5       HD  n[        S [        S5       5       5      n[         R                  " U R                  5      S:X  d  MD    O   U R                  [         R                  " U R                  5      S5        U R                  U R                  S5        g )NMbP?c              3   *   #    U  H	  oU-  v   M     g 7frI   r%   .0is     r   	<genexpr>1ItimerTest.test_itimer_virtual.<locals>.<genexpr>O       0<aE<   順         r  T)r   rs  rg  	SIGVTALRMrt  ra  r   
busy_retryLONG_TIMEOUTsumranger`  r+   re  r2   _s     r   test_itimer_virtualItimerTest.test_itimer_virtualF  s     ++f&&8eU3##G$8$89A05<00A,
: : 	))$++6
C))40r!   c                 @   [         R                  U l        [         R                   " [         R                  U R                  5        [         R
                  " U R                  SS5        [        R                  " [        R                  5       HD  n[        S [        S5       5       5      n[         R                  " U R                  5      S:X  d  MD    O   U R                  [         R                  " U R                  5      S5        U R                  U R                  S5        g )Ng?c              3   *   #    U  H	  oU-  v   M     g 7frI   r%   r  s     r   r  .ItimerTest.test_itimer_prof.<locals>.<genexpr>`  r  r  r  r  T)r   rw  rg  SIGPROFrx  ra  r   r  r  r  r  r`  r+   re  r  s     r   test_itimer_profItimerTest.test_itimer_profY  s    ((fnndmm4c3/##G$8$89A05<00A,
: : 	))$++6
C))40r!   c                     [         R                  U l        [         R                  " U R                  S5        [        R
                  " S5        U R                  U R                  S5        g )Nư>rr   T)r   r{  rg  ra  timesleepr+   re  rY   s    r   test_setitimer_tinyItimerTest.test_setitimer_tinyj  sF     ((d+

1))40r!   )re  rf  rg  ri  N)rA   r=   rB   rC   rj  rm  rh  rt  rx  r|  r  r   r   r,   r-   r   r  r  r  rD   r%   r!   r   rc  rc    sc    F- 0H1 __S\\\1D_NP1P1"1"1r!   rc  c                      \ rS rSrSr\R                  " \" \S5      S5      S 5       r	\R                  " \" \S5      S5      \R                  " \" \S5      S5      S 5       5       r
\R                  " \" \S	5      S
5      \R                  " 5       S 5       5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      \R                  " \" \S5      S5      \R                  " 5       S 5       5       5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      S 5       r\R                  " \" \S5      S5      \R                  " 5       S 5       5       r\R                  " \" \S	5      S
5      \R                  " 5       S 5       5       rSrg) PendingSignalsTestsit  zO
Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
functions.

sigpendingzneed signal.sigpending()c                 ^    U R                  [        R                  " 5       [        5       5        g rI   )r+   r   r  r   rY   s    r   test_sigpending_empty)PendingSignalsTests.test_sigpending_emptyy  s     	**,ce4r!   r  r  c                      Sn[        SU5        g )Na  if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        r   r(  r)  s     r   test_sigpending#PendingSignalsTests.test_sigpending~  s    
0 	t$r!   pthread_killzneed signal.pthread_kill()c                      Sn[        SU5        g )Na  if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        r   r(  r)  s     r   test_pthread_kill%PendingSignalsTests.test_pthread_kill  s    ( 	t$r!   c                 N    SUR                  5       < SU< S3n[        SU5        g)zW
test: body of the "def test(signum):" function.
blocked: number of the blocked signal
zif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        z

        blocked = a  
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        r   N)stripr   )r2   blockedtestr  s       r   wait_helperPendingSignalsTests.wait_helper  s%    N zz|WA &J 	t$r!   sigwaitzneed signal.sigwait()c                 D    U R                  [        R                  S5        g )Na   
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        r  r   r   rY   s    r   test_sigwait PendingSignalsTests.test_sigwait  s     	 * 	r!   sigwaitinfozneed signal.sigwaitinfo()c                 D    U R                  [        R                  S5        g )Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        r  rY   s    r   test_sigwaitinfo$PendingSignalsTests.test_sigwaitinfo       	 * 	r!   sigtimedwaitzneed signal.sigtimedwait()c                 D    U R                  [        R                  S5        g )Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r  rY   s    r   test_sigtimedwait%PendingSignalsTests.test_sigtimedwait  r  r!   c                 D    U R                  [        R                  S5        g )Nz
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r  rY   s    r   test_sigtimedwait_poll*PendingSignalsTests.test_sigtimedwait_poll  s     	 * 	r!   c                 D    U R                  [        R                  S5        g )Nz
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        r  rY   s    r   test_sigtimedwait_timeout-PendingSignalsTests.test_sigtimedwait_timeout  s     	 * 	r!   c                 r    [         R                  nU R                  [        [         R                  U/S5        g )Ng      )r   r   rU   rV   r  )r2   r   s     r   "test_sigtimedwait_negative_timeout6PendingSignalsTests.test_sigtimedwait_negative_timeout  s)     *f&9&9F8TJr!   c                     [        SS5        g )Nr   a  if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        r(  rY   s    r   test_sigwait_thread'PendingSignalsTests.test_sigwait_thread$  s     	   	r!   c                 P   U R                  [        [        R                  5        U R                  [        [        R                  S5        U R                  [        [        R                  SSS5        U R                  [        [        R                  S/ 5        U R                  [
        5         [        R                  " [        R                  [        R                  /5        S S S 5        U R                  [
        5         [        R                  " [        R                  S/5        S S S 5        U R                  [
        5         [        R                  " [        R                  SS-  /5        S S S 5        g ! , (       d  f       N= f! , (       d  f       Ng= f! , (       d  f       g = f)Nrr   r>  r?  i  r   i  )rU   r]   r   r  r   rV   r   r   rY   s    r   test_pthread_sigmask_arguments2PendingSignalsTests.test_pthread_sigmask_argumentsG  s    	)V%;%;<)V%;%;Q?)V%;%;Q1E'6#9#94Dz*""6#3#3fkk]C +z*""6#3#3aS9 +z*""6#3#3agY? +*	 +*****s$   ,5E5>'F*F5
F
F
F%c                    [         R                  " [         R                  [         R                  " 5       5      nU R	                  [         R                  [         R
                  U5        [         R                  " [         R                  [         R                  " 5       5      nU R                  U[         R                  " 5       5        g rI   )r   r  r   r   r   r   r   assertLessEqualr   s     r   "test_pthread_sigmask_valid_signals6PendingSignalsTests.test_pthread_sigmask_valid_signalsU  s{     ""6#3#3V5I5I5KL..0B0BAF""6#5#5v7K7K7MNQ 4 4 67r!   c                      Sn[        SU5        g )Na-	  if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        r   r(  r)  s     r   test_pthread_sigmask(PendingSignalsTests.test_pthread_sigmask^  s    GP 	t$r!   c                     Sn[        SU5       nUR                  5       u  p4UR                  5       nUS:w  a  [        SU< SU< 35      e S S S 5        g ! , (       d  f       g = f)Na7  if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        r   r?  r@  rA  )r	   rD  rE  rF  )r2   r  r   rB  r   rK  s         r   test_pthread_kill_main_thread1PendingSignalsTests.test_pthread_kill_main_thread  sa     $%$002NF||~H1}!)6!3 4 4  &%%s   <A
A#r%   N)rA   r=   rB   rC   __doc__r   r   r   r   r  r  r   requires_working_threadingr  r  r  r  r  r  r  r  r  r  r  r  r  rD   r%   r!   r   r  r  t  s    635555 ):;8:635%5:%6 857002% 37%. ):;8:*%:*%X 3022 7466 8577 857	7	 8577 857K7K 302):;8:002 3:2
< ):;8:
@:
@ ):;8:8:8 ):;8:002I% 3:I%V 8570024 374r!   r  c                   >   \ rS rSrSrS rS rS r\R                  " \
" \S5      S5      S 5       r\R                  " \
" \S5      S5      S	 5       r\R                  " \S
5      \R                  " \
" \S5      S5      \R"                  " 5       S 5       5       5       rSrg)
StressTesti  z
Stress signal delivery, especially when a signal arrives in
the middle of recomputing the signal state or executing
previously tripped signal handlers.
c                 p    [         R                   " X5      nU R                  [         R                   X5        g rI   )r   r   )r2   r   ry   old_handlers       r   setsigStressTest.setsig  s"    mmF4v;r!   c                    ^^ Sm/ mSUU4S jjnU R                  [        R                  [        R                  S5        U R	                  [        R
                  U5        U" 5         [        T5      T:  a'  [        R                  " S5        [        T5      T:  a  M'  [        [        T5      S-
  5       Vs/ s H  nTUS-      TU   -
  PM     nn[        R                  " U5      n[        R                  (       a  [        SU4-  5        U$ s  snf )N   c                    > [        T5      T:  aJ  TR                  [        R                  " 5       5        [        R
                  " [        R                  S5        g g )Nr  )r   appendr  perf_counterr   ra  r{  )r   frameNtimess     r   ry   5StressTest.measure_itimer_resolution.<locals>.handler  s@    5zA~T..01   !3!3T: r!   r   r  rr   z,detected median itimer() resolution: %.6f s.NN)r   r   ra  r{  r  r   r   r  r  r  
statisticsmedianr   verboseprint)r2   ry   r  	durationsmedr  r  s        @@r   measure_itimer_resolution$StressTest.measure_itimer_resolution  s    	; 	; 	((&*<*<a@FNNG,	%j1nJJt %j1n 5:#e*q.4IJ4IqU1Q3Z%(*4I	J	*??@C6IJ
	 Ks   5Dc                 j    U R                  5       nUS::  a  gUS::  a  gU R                  SU4-  5        g )Ng-C6?i'  g{Gz?d   z^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r  r  )r2   resos     r   decide_itimer_countStressTest.decide_itimer_count  sB     --/4<T\MM M!G$ %r!   ra  ztest needs setitimer()c                 8  ^ U R                  5       n/ mS nSU4S jjnU R                  [        R                  U5        U R                  [        R                  U5        U R                  [        R
                  U5        Sn[        R                  " 5       [        R                  -   nXA:  GaR  [        R                  " [        R                  " 5       [        R                  5        US-  n[        T5      U:  aY  [        R                  " 5       U:  a@  [        R                  " S5        [        T5      U:  a  [        R                  " 5       U:  a  M@  [        R                  " [        R                  " 5       [        R                  5        US-  n[        T5      U:  aY  [        R                  " 5       U:  a@  [        R                  " S5        [        T5      U:  a  [        R                  " 5       U:  a  M@  XA:  a  GMR  U R                  [        T5      US5        g)	z+
This test uses dependent signal handlers.
c                     [         R                  " [         R                  S[        R                  " 5       S-  -   5        g )Nr  h㈵>)r   ra  r{  random)r   r  s     r   first_handler@StressTest.test_stress_delivery_dependent.<locals>.first_handler  s*     V//$8N1NOr!   Nc                 (   > TR                  U 5        g rI   r  r   r  sigss     r   second_handlerAStressTest.test_stress_delivery_dependent.<locals>.second_handler	      KKr!   r   rr   r   Some signals were lostr  )r  r  r   r  r^   r   r  	monotonicr   r^  r   rH  getpidr   r  r+   )r2   r  r  r  expected_sigsdeadliner  s         @r   test_stress_delivery_dependent)StressTest.test_stress_delivery_dependent  sh    $$&	P	  	FNNM2FNNM2FNNN3>>#g&;&;;GGBIIK0QMd)m+0@80K

4  d)m+0@80K GGBIIK0QMd)m+0@80K

4  d)m+0@80K  	TA'?@r!   c                 x  ^ U R                  5       n/ mU4S jnU R                  [        R                  U5        U R                  [        R                  U5        SnX1:  a  [        R
                  " [        R                  S[        R                  " 5       S-  -   5        [        R                  " [        R                  " 5       [        R                  5        US-  n[        R                  " [        R                  5       H  n[        T5      U:  d  M    O   X1:  a  M  U R                  [        T5      US5        g)z.
This test uses simultaneous signal handlers.
c                 (   > TR                  U 5        g rI   r  r  s     r   ry   =StressTest.test_stress_delivery_simultaneous.<locals>.handler/  r
  r!   r   r  r   r>  r  N)r  r  r   r   r   ra  r{  r  r   rH  r  r   sleeping_retryr^  r   r+   )r2   r  ry   r  r  r  s        @r   !test_stress_delivery_simultaneous,StressTest.test_stress_delivery_simultaneous&  s     $$&	  	FNNG,FNNG, V//$8N1NOGGBIIK0QM++G,A,ABt9- C  	TA'?@r!   z&crashes due to system bug (FB13453490)r^   ztest needs SIGUSR1c                 8  ^^^	^
^ [         R                  mSm
Sm	SmU	4S jmUU
U4S jnUU	U
U4S jn[         R                   " TT5      nU R                  [         R                   TU5        [        R                  " US9n Sn[
        R                  " 5        nUR                  5         U" 5         SmUR                  5         UR                  b`  U R                  UR                  R                  [        5        U R                  STS	 S
3[        UR                  R                  5      5        SnS S S 5        U(       d  U R                  T	S5        U R!                  T	T
5        SmUR                  5         g ! , (       d  f       NL= f! SmUR                  5         f = f)Nr   Fc                    > TS-  mg rq   r%   )r   r  num_received_signalss     r   custom_handlerAStressTest.test_stress_modifying_handlers.<locals>.custom_handlerT  s     A% r!   c                  ^   > T (       d%  [         R                  " T5        TS-  mT (       d  M$  g g rq   )r   raise_signal)do_stopnum_sent_signalsr   s   r   set_interruptsAStressTest.test_stress_modifying_handlers.<locals>.set_interruptsX  s&    ##F+ A%  gr!   c                     > TS:  d  TS:  aS  [        S5       H3  n T[        R                  4 H  n[        R                  " TU5        M     M5     TS:  a  MJ  TS:  a  MR  g g )Nr  rr   i N  )r  r   r   )r  ry   r  r  r   r   s     r   cycle_handlersAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlers^  sU    "S(,@1,DuA$2FNN#Cfg6 $D & #S(,@1,Dr!   )targetTzSignal dz ignored due to race condition)r   r^   r   	threadingThreadr   catch_unraisable_exceptionstartr   
unraisabler*   	exc_valuer   r   r   assertGreaterr  )r2   r!  r$  r  tignoredr   r  r  r  r   r   s          @@@@@r   test_stress_modifying_handlers)StressTest.test_stress_modifying_handlersI  sM     	&	&	7 	7 mmFN;v{;N3	G335	 ==, ))"--*A*A7KMM!&+IJBMM3346 #G 6$ ""#7;  !57GHGFFH1 65. GFFHs%    F BE4.3F 4
F>F Fr%   N)rA   r=   rB   rC   r  r  r  r  r   r   r   r   r  r  r   r   r   r  r1  rD   r%   r!   r   r  r    s    <0% 513*A3*AX 513A3AB __XGH3-/0026 3/ I6r!   r  c                   r    \ rS rSrS r\R                  " \R                  S:g  S5      S 5       r	S r
S rSrg	)
RaiseSignalTesti  c                     U R                  [        5         [        R                  " [        R                  5        S S S 5        g ! , (       d  f       g = frI   )rU   KeyboardInterruptr   r  r   rY   s    r   test_sigintRaiseSignalTest.test_sigint  s/    01. 211s   %A
Ar   zWindows specific testc                      Sn[         R                  " U5        U R                  S5        g ! [         a)  nUR                  [        R
                  :X  a   S nAg e S nAff = f)Nrr   z#OSError (Invalid argument) expected)r   r  failr   errnoEINVAL)r2   rb   es      r   test_invalid_argument%RaiseSignalTest.test_invalid_argument  sM    	F'II;< 	ww%,,&		s   ), 
AAAAc                 (  ^ SmU4S jn[         R                   " [         R                  U5      nU R                  [         R                   [         R                  U5        [         R                  " [         R                  5        U R	                  T5        g )NFc                 
   > Smg rV  r%   )abis_oks     r   ry   -RaiseSignalTest.test_handler.<locals>.handler  s    Er!   )r   r   r   r  r   )r2   ry   
old_signalrD  s      @r   test_handlerRaiseSignalTest.test_handler  sY    	 ]]6==':
v}}jAFMM*r!   c                 J    Sn[        SU5      u  p#nU R                  SU5        g )Nzif 1:
        import _thread
        class Foo():
            def __del__(self):
                _thread.interrupt_main()

        x = Foo()
        r   s/   OSError: Signal 2 ignored due to race condition)r   r   )r2   r  rcouterrs        r   test__thread_interrupt_main+RaiseSignalTest.test__thread_interrupt_main  s*     (d3H#Nr!   r%   N)rA   r=   rB   rC   r7  r   r   r,   r-   r>  rG  rM  rD   r%   r!   r   r4  r4    s<    / __S\\W,.EF	 G		Or!   r4  c                   T    \ rS rSr\R
                  " \" \S5      S5      S 5       rSr	g)PidfdSignalTesti  pidfd_send_signalzpidfd support not built inc                    U R                  [        5       n[        R                  " S[        R                  5        S S S 5        WR
                  R                  [        R                  :X  a  U R                  S5        O9UR
                  R                  [        R                  :X  a  U R                  S5        U R                  UR
                  R                  [        R                  5        [        R                  " S[        R                  " 5        3[        R                  5      nU R!                  [        R"                  U5        U R%                  [&        S5         [        R                  " U[        R                  [)        5       S5        S S S 5        U R                  [*        5         [        R                  " U[        R                  5        S S S 5        g ! , (       d  f       GN= f! , (       d  f       Nd= f! , (       d  f       g = f)Nr   zkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)rU   r   r   rQ  r   r   r;  ENOSYSr  EPERMr+   EBADFr   openr  O_DIRECTORYr   r   assertRaisesRegexr]   objectr6  )r2   r   my_pidfds      r   test_pidfd_send_signal&PidfdSignalTest.test_pidfd_send_signal  s:   
 w'2$$Q6 (<<-MM:;\\5;;.MM>?++U[[977VBIIK=12>>B(+##I/GH$$Xv}}fhJ I01$$Xv}}= 21 (' IH11s#   &G0G*)&G;
G'*
G8;
H	r%   N)
rA   r=   rB   rC   r   r   r   r   r[  rD   r%   r!   r   rP  rP    s-    +,$>	>r!   rP  c                  .    [         R                  " 5         g rI   )r   reap_childrenr%   r!   r   tearDownModuler_    s    r!   __main__)/r.   r;  rO   r:   r   r  r   r   r  r   r,   r(  r  r   r  r   test.supportr   r   r   r   test.support.script_helperr   r	   r#  ImportErrorTestCaser   r   r-   rF   r   r   r   r   r   r&  r   r:  rc  r  r  r4  rP  r_  rA   mainr%   r!   r   <module>rf     s       	      
      F
/=8$$ /=d 
(*@Ac="" c= Bc=T 
S\\W,.@A-D** -D B-D`S!H%% S!l 
(*@As=)) s= Bs=l 
WV\24LM@%h// @% N@%F 
(*@A	WV^46ST	WR(*>?R&x(( R& @  U BR&j 
(*@A	WV[1Rgfk6RJL\1"" \1L B\1~P4(++ P4f
|"" |~)Oh'' )OZ>h'' >* zMMO k-  Is   J7 7KK