o
    HXhnx                     @   s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZmZ d dlmZ d dlZd dlZd dlmZ zd dlZW n ey[   dZY nw dZejd	kZdddZdd Zedd ZG dd dejZedkre  dS dS )    )contextmanagerN)support)	os_helper)script_helper
is_android)skip_if_sanitizer)dedentg      ?nt   c                 C   sH   |}|d|  7 }|d| 7 }d|k rd|d |d   | S d| d S )Nz#  File "<string>", line %s in func
z&  File "<string>", line %s in <module>r
   ^
$ )Zlineno1Zlineno2header	min_countregexr   r   @/opt/python-3.10.19/usr/lib/python3.10/test/test_faulthandler.pyexpected_traceback   s   r   c                 C   s   t td| S )Nz(raising SIGSEGV on Android is unreliable)unittestskipIfr   )testr   r   r   skip_segfault_on_android#   s
   r   c               	   c   s.    t  } z| V  W t|  d S t|  w N)tempfilemktempr   unlinkfilenamer   r   r   temporary_filename(   s
   r   c                	   @   sZ  e Zd ZdddZddddddddddd	Zdd
dZdd Zee	j
dddd Zedd Zedd Zdd Zdd Zee	j
dkddd Zeedu deeedd ed!d" Zeedu deeed#d$ed%d& Zd'd( Zd)d* Zd+d, Zee	j
d-d.eeed/ d0d1d2 Zed3d4 Zeddd5d6ed7d8 Z ee	j
dkd9eddd5d6ed:d; Z!ed<d= Z"ed>d? Z#ed@dA Z$dBdC Z%dDdE Z&dFdG Z'dHdI Z(dddJdKdLZ)dMdN Z*dOdP Z+ee	j
dkd9dQdR Z,dSdT Z-dUdV Z.dWdX Z/dYdZ Z0ddddJd\d]Z1d^d_ Z2d`da Z3dbdc Z4ddde Z5ee	j
dkd9dfdg Z6dhdi Z7eeedj dk		ddldmZ8dndo Z9dpdq Z:drds Z;ee	j
dkd9dtdu Z<dvdw Z=dxdy Z>e?dzd{ Z@d|d} ZAeeBd~dd ZCeeBd~dd ZDeeBd~dd ZEeeBd~dd ZFdd ZGdS )FaultHandlerTestsNc           
   	   C   sb  	 t | }g }|d ur|| t , tjd||d}| | \}}| }W d    n1 s6w   Y  W d    n1 sEw   Y  |	dd}|rx| 
|d t|d}	|	 }W d    n1 slw   Y  |	dd}n3|d ur| 
|d t|tjd t|ddd	}	|	 }W d    n1 sw   Y  |	dd}| |fS )
N-c)pass_fdsasciibackslashreplace rbr   F)closefd)r   stripappendr   SuppressCrashReportr   Zspawn_pythoncommunicatewaitdecodeassertEqualopenreadoslseekSEEK_SET
splitlines)
selfcoder   fdr!   processoutputstderrexitcodefpr   r   r   
get_output1   s8   	




zFaultHandlerTests.get_outputTFz<module>)r   all_threadsother_regexr6   know_current_threadpy_fatal_errorgarbage_collectingfunctionc                C   s   	 |r|rd}nd}nd}d| g}|	r| d | d | | d |
r.| d | d	| d
|  d|}|rId| d| d}d| }| j|||d\}}d|}| || | |d d S )NzCurrent thread 0x[0-9a-f]+zThread 0x[0-9a-f]+ZStackr   z!Python runtime state: initializedr$   z \(most recent call first\):z  Garbage-collectingz  File "<string>", line z in r   z(?:|)z(?m)r   r6   r   )r(   joinr<   assertRegexassertNotEqual)r4   r5   linenofatal_errorr   r=   r>   r6   r?   r@   rA   rB   r   r   r8   r:   r   r   r   check_errorR   s,   





zFaultHandlerTests.check_errorc                 K   s2   |rd||f }d| }| j |||fi | d S )Nz%s: %szFatal Python error: %srK   )r4   r5   line_number
name_regexfunckwrJ   r   r   r   check_fatal_errory   s   z#FaultHandlerTests.check_fatal_errorc                 K   s"   d| }| j |||fi | d S )NzWindows fatal exception: %srL   )r4   r5   rM   rN   rP   rJ   r   r   r   check_windows_exception   s   z)FaultHandlerTests.check_windows_exceptionZaixz5the first page of memory is a mapped read-only on AIXc                 C   s(   t s| ddd d S | ddd d S )Nz
                import faulthandler
                faulthandler.enable()
                faulthandler._read_null()
                   z4(?:Segmentation fault|Bus error|Illegal instruction)access violation)
MS_WINDOWSrQ   rR   r4   r   r   r   test_read_null   s   z FaultHandlerTests.test_read_nullc                 C      |  ddd d S )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv()
            rS   Segmentation faultrQ   rV   r   r   r   test_sigsegv      zFaultHandlerTests.test_sigsegvc                 C   s   | j dddddd d S )Na  
            import faulthandler
            import gc
            import sys

            faulthandler.enable()

            class RefCycle:
                def __del__(self):
                    faulthandler._sigsegv()

            # create a reference cycle which triggers a fatal
            # error in a destructor
            a = RefCycle()
            b = RefCycle()
            a.b = b
            b.a = a

            # Delete the objects, not the cycle
            a = None
            b = None

            # Break the reference cycle: call __del__()
            gc.collect()

            # Should not reach this line
            print("exit", file=sys.stderr)
            	   rY   __del__T)rB   rA   rZ   rV   r   r   r   test_gc   s   
zFaultHandlerTests.test_gcc                 C   s   | j ddddddd d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler._fatal_error_c_thread()
            rS   zin new threadFZfaulthandler_fatal_error_threadT)r?   rO   r@   rZ   rV   r   r   r   test_fatal_error_c_thread   s   
z+FaultHandlerTests.test_fatal_error_c_threadc                 C   rX   )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigabrt()
            rS   ZAbortedrZ   rV   r   r   r   test_sigabrt   s   zFaultHandlerTests.test_sigabrtwin32z"SIGFPE cannot be caught on Windowsc                 C   rX   )Nzr
            import faulthandler
            faulthandler.enable()
            faulthandler._sigfpe()
            rS   zFloating point exceptionrZ   rV   r   r   r   test_sigfpe   s   zFaultHandlerTests.test_sigfpezneed _testcapiSIGBUSzneed signal.SIGBUSc                 C   rX   )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGBUS)
               z	Bus errorrZ   rV   r   r   r   test_sigbus      zFaultHandlerTests.test_sigbusSIGILLzneed signal.SIGILLc                 C   rX   )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGILL)
            re   zIllegal instructionrZ   rV   r   r   r   test_sigill   rg   zFaultHandlerTests.test_sigillc                 C   sJ   t   | jd| dddddd W d    d S 1 sw   Y  d S )NzP
                import _testcapi
                _testcapi.fatal_error(b'xyz', )
                   Zxyztest_fatal_errorT)rO   r@   )r   r)   rQ   )r4   Zrelease_gilr   r   r   check_fatal_error_func  s   
"z(FaultHandlerTests.check_fatal_error_funcc                 C      |  d d S )NFrm   rV   r   r   r   rl        z"FaultHandlerTests.test_fatal_errorc                 C   rn   )NTro   rV   r   r   r   test_fatal_error_without_gil  rp   z.FaultHandlerTests.test_fatal_error_without_gilZopenbsdzVIssue #12868: sigaltstack() doesn't work on OpenBSD if Python is compiled with pthreadZ_stack_overflowz#need faulthandler._stack_overflow()c                 C      | j ddddd d S )Nzz
            import faulthandler
            faulthandler.enable()
            faulthandler._stack_overflow()
            rS   z (?:Segmentation fault|Bus error)z unable to raise a stack overflow)r>   rZ   rV   r   r   r   test_stack_overflow  s
   
z%FaultHandlerTests.test_stack_overflowc                 C   rX   )Nzw
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv(True)
            rS   rY   rZ   rV   r   r   r   test_gil_released%  r\   z#FaultHandlerTests.test_gil_releasedz0sanitizer builds change crashing process output.)ZmemoryZubreasonc                 C   sJ   t  }| jdjt|ddd|d W d    d S 1 sw   Y  d S )Nz
                import faulthandler
                output = open({filename}, 'wb')
                faulthandler.enable(output)
                faulthandler._sigsegv()
                r      rY   )r   rQ   formatreprr4   r   r   r   r   test_enable_file/  s   "z"FaultHandlerTests.test_enable_filez.subprocess doesn't support pass_fds on Windowsc                 C   sN   t d}| }| jd| dd|d W d    d S 1 s w   Y  d S )Nwb+z
                import faulthandler
                import sys
                faulthandler.enable(%s)
                faulthandler._sigsegv()
                rv   rY   r6   )r   TemporaryFilefilenorQ   )r4   r;   r6   r   r   r   test_enable_fd>  s   "z FaultHandlerTests.test_enable_fdc                 C   rr   )Nz
            import faulthandler
            faulthandler.enable(all_threads=False)
            faulthandler._sigsegv()
            rS   rY   Fr=   rZ   rV   r   r   r   test_enable_single_threadP  s
   
z+FaultHandlerTests.test_enable_single_threadc                 C   sH   d}d}|  |\}}d|}| ||vd||f  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            faulthandler._sigsegv()
            zFatal Python errorr   z%r is present in %rr   )r<   rF   
assertTruerH   )r4   r5   Znot_expectedr9   r:   r   r   r   test_disable[  s   


zFaultHandlerTests.test_disablec                 C   sr   d}|  |\}}d|}td|tj}|s | d| t|d 	d}dD ]}| 
|| q.d S )Nz
            import faulthandler
            import sys
            # Don't filter stdlib module names
            sys.stdlib_module_names = frozenset()
            faulthandler.enable()
            faulthandler._sigsegv()
            r   z*^Extension modules:(.*) \(total: [0-9]+\)$z$Cannot find 'Extension modules:' in r
   z, )sysfaulthandler)r<   rF   research	MULTILINEZfailsetgroupr'   splitassertIn)r4   r5   r9   r:   matchmodulesnamer   r   r   test_dump_ext_modulesj  s   
z'FaultHandlerTests.test_dump_ext_modulesc                 C   s   t j}zDt jt _t }z#t  | t  t  | t  W |r+t  nt  n|r7t  w t  w W |t _d S W |t _d S |t _w r   )	r   r9   
__stderr__r   
is_enabledenabler   disableZassertFalse)r4   Zorig_stderrZwas_enabledr   r   r   test_is_enabled~  s$   



z!FaultHandlerTests.test_is_enabledc                 C   s0   d}t jdd|f}t|}| | d d S )N5import faulthandler; print(faulthandler.is_enabled())-Er       False)r   
executable
subprocesscheck_outputr-   rstrip)r4   r5   argsr8   r   r   r   test_disabled_by_default  s   
z*FaultHandlerTests.test_disabled_by_defaultc                 C   s`   d}t d tjtjjrdndddd|f}tj }|dd  t	j
||d}| | d	 d S )
Nr   r   r$   z-Xr   r    PYTHONFAULTHANDLERenv   True)filterr   r   flagsignore_environmentr0   environcopypopr   r   r-   r   r4   r5   r   r   r8   r   r   r   test_sys_xoptions  s   
z#FaultHandlerTests.test_sys_xoptionsc                 C   s   d}t jd|f}ttj}d|d< d|d< tj||d}| | d ttj}d|d< d|d< tj||d}| | d	 d S )
Nr   r    r$   r   ZPYTHONDEVMODEr   r   1r   )	r   r   dictr0   r   r   r   r-   r   r   r   r   r   test_env_var  s   

zFaultHandlerTests.test_env_varrE   c                C   sn   	 d}|j ||d}|rd}n	|d urd}nd}dd| dd	g}| |||\}}| || | |d
 d S )Na[  
            import faulthandler

            filename = {filename!r}
            fd = {fd}

            def funcB():
                if filename:
                    with open(filename, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=False)
                elif fd is not None:
                    faulthandler.dump_traceback(fd,
                                                all_threads=False)
                else:
                    faulthandler.dump_traceback(all_threads=False)

            def funcA():
                funcB()

            funcA()
            rE   r]         Stack (most recent call first):z#  File "<string>", line %s in funcBz#  File "<string>", line 17 in funcAz&  File "<string>", line 19 in <module>r   rw   r<   r-   )r4   r   r6   r5   rI   expectedtracer:   r   r   r   check_dump_traceback  s&   z&FaultHandlerTests.check_dump_tracebackc                 C      |    d S r   )r   rV   r   r   r   test_dump_traceback     z%FaultHandlerTests.test_dump_tracebackc                 C   8   t  }| j|d W d    d S 1 sw   Y  d S Nr   )r   r   ry   r   r   r   test_dump_traceback_file     "z*FaultHandlerTests.test_dump_traceback_filec                 C   @   t d}| j| d W d    d S 1 sw   Y  d S Nr{   r|   )r   r}   r   r~   r4   r;   r   r   r   test_dump_traceback_fd     "z(FaultHandlerTests.test_dump_traceback_fdc                 C   sd   d}d|d  }d| d }d}|j |d}dd| d	g}| |\}}| || | |d
 d S )Ni  x2   z...z
            import faulthandler

            def {func_name}():
                faulthandler.dump_traceback(all_threads=False)

            {func_name}()
            )	func_namer   z  File "<string>", line 4 in %sz%  File "<string>", line 6 in <module>r   r   )r4   maxlenr   Z	truncatedr5   r   r   r:   r   r   r   test_truncate  s   zFaultHandlerTests.test_truncatec                 C   sr   	 d}|j t|d}| ||\}}d|}|rd}nd}d}t|j |d }| || | |d d S )	Na  
            import faulthandler
            from threading import Thread, Event
            import time

            def dump():
                if {filename}:
                    with open({filename}, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=True)
                else:
                    faulthandler.dump_traceback(all_threads=True)

            class Waiter(Thread):
                # avoid blocking if the main thread raises an exception.
                daemon = True

                def __init__(self):
                    Thread.__init__(self)
                    self.running = Event()
                    self.stop = Event()

                def run(self):
                    self.running.set()
                    self.stop.wait()

            waiter = Waiter()
            waiter.start()
            waiter.running.wait()
            dump()
            waiter.stop.set()
            waiter.join()
            r   r      
   a  
            ^Thread 0x[0-9a-f]+ \(most recent call first\):
            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
            ){{1,3}}  File "<string>", line 23 in run
              File ".*threading.py", line [0-9]+ in _bootstrap_inner
              File ".*threading.py", line [0-9]+ in _bootstrap

            Current thread 0x[0-9a-f]+ \(most recent call first\):
              File "<string>", line {lineno} in dump
              File "<string>", line 28 in <module>$
            )rI   r   )rw   rx   r<   rF   r   r'   rG   r-   )r4   r   r5   r8   r:   rI   r   r   r   r   check_dump_traceback_threads  s    
z.FaultHandlerTests.check_dump_traceback_threadsc                 C   s   |  d  d S r   )r   rV   r   r   r   test_dump_traceback_threadsH  rp   z-FaultHandlerTests.test_dump_traceback_threadsc                 C   s6   t  }| | W d    d S 1 sw   Y  d S r   )r   r   ry   r   r   r    test_dump_traceback_threads_fileK  s   "z2FaultHandlerTests.test_dump_traceback_threads_filer
   c                C   s   	 t tjtd}d}|jt|||||d}| ||\}}	d|}|s@|}
|r-|
d9 }
d| }tdd||
d	}| || n| 	|d
 | 	|	d d S )N)Zsecondsa  
            import faulthandler
            import time
            import sys

            timeout = {timeout}
            repeat = {repeat}
            cancel = {cancel}
            loops = {loops}
            filename = {filename!r}
            fd = {fd}

            def func(timeout, repeat, cancel, file, loops):
                for loop in range(loops):
                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
                    if cancel:
                        faulthandler.cancel_dump_traceback_later()
                    time.sleep(timeout * 5)
                    faulthandler.cancel_dump_traceback_later()

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            func(timeout, repeat, cancel, file, loops)
            if filename:
                file.close()
            )timeoutrepeatcancelloopsr   r6   r   rk   zATimeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n      )r   r$   r   )
strdatetimeZ	timedeltaTIMEOUTrw   r<   rF   r   rG   r-   )r4   r   r   r   r   r6   Ztimeout_strr5   r   r:   countr   r   r   r   r   check_dump_traceback_laterO  s,   
z,FaultHandlerTests.check_dump_traceback_laterc                 C   r   r   r   rV   r   r   r   test_dump_traceback_later  r   z+FaultHandlerTests.test_dump_traceback_laterc                 C      | j dd d S )NT)r   r   rV   r   r   r    test_dump_traceback_later_repeat     z2FaultHandlerTests.test_dump_traceback_later_repeatc                 C   r   )NT)r   r   rV   r   r   r    test_dump_traceback_later_cancel  r   z2FaultHandlerTests.test_dump_traceback_later_cancelc                 C   r   r   )r   r   ry   r   r   r   test_dump_traceback_later_file  r   z0FaultHandlerTests.test_dump_traceback_later_filec                 C   r   r   )r   r}   r   r~   r   r   r   r   test_dump_traceback_later_fd  r   z.FaultHandlerTests.test_dump_traceback_later_fdc                 C   r   )Nrk   )r   r   rV   r   r   r   test_dump_traceback_later_twice  r   z1FaultHandlerTests.test_dump_traceback_later_twiceregisterzneed faulthandler.registerc                 C   s   	 t j}d}|j||||||d}| ||\}}	d|}|s4|r%d}
nd}
tdd|
}
| ||
 n| |d |rD| |	d	 d S | |	d	 d S )
Naz  
            import faulthandler
            import os
            import signal
            import sys

            all_threads = {all_threads}
            signum = {signum:d}
            unregister = {unregister}
            chain = {chain}
            filename = {filename!r}
            fd = {fd}

            def func(signum):
                os.kill(os.getpid(), signum)

            def handler(signum, frame):
                handler.called = True
            handler.called = False

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            if chain:
                signal.signal(signum, handler)
            faulthandler.register(signum, file=file,
                                  all_threads=all_threads, chain={chain})
            if unregister:
                faulthandler.unregister(signum)
            func(signum)
            if chain and not handler.called:
                if file is not None:
                    output = file
                else:
                    output = sys.stderr
                print("Error: signal handler not called!", file=output)
                exitcode = 1
            else:
                exitcode = 0
            if filename:
                file.close()
            sys.exit(exitcode)
            )r=   signum
unregisterchainr   r6   r   z8Current thread 0x[0-9a-f]+ \(most recent call first\):\nz#Stack \(most recent call first\):\nr       r$   r   )	signalSIGUSR1rw   r<   rF   r   rG   r-   rH   )r4   r   r=   r   r   r6   r   r5   r   r:   r   r   r   r   check_register  s.   .
z FaultHandlerTests.check_registerc                 C   r   r   r   rV   r   r   r   test_register  r   zFaultHandlerTests.test_registerc                 C   r   )NT)r   r   rV   r   r   r   test_unregister  r   z!FaultHandlerTests.test_unregisterc                 C   r   r   )r   r   ry   r   r   r   test_register_file  r   z$FaultHandlerTests.test_register_filec                 C   r   r   )r   r}   r   r~   r   r   r   r   test_register_fd   r   z"FaultHandlerTests.test_register_fdc                 C   r   )NTr   r   rV   r   r   r   test_register_threads  r   z'FaultHandlerTests.test_register_threadsc                 C   r   )NT)r   r   rV   r   r   r   test_register_chain	  r   z%FaultHandlerTests.test_register_chainc                 c   sf    t j}z*d t _| t}d V  W d    n1 sw   Y  | t|jd W |t _d S |t _w )Nzsys.stderr is None)r   r9   ZassertRaisesRuntimeErrorr-   r   	exception)r4   r9   cmr   r   r   check_stderr_none  s   z#FaultHandlerTests.check_stderr_nonec                 C   s   |    t  W d    n1 sw   Y  |    t  W d    n1 s+w   Y  |    td W d    n1 sDw   Y  ttdrk|    ttj W d    d S 1 sdw   Y  d S d S )NgMbP?r   )	r   r   r   Zdump_tracebackZdump_traceback_laterhasattrr   r   r   rV   r   r   r   test_stderr_None  s   






"z"FaultHandlerTests.test_stderr_Nonezspecific to Windowsc                 C   s(   dD ]\}}|  d| dd| qd S )N))ZEXCEPTION_ACCESS_VIOLATIONrT   )ZEXCEPTION_INT_DIVIDE_BY_ZEROzint divide by zero)ZEXCEPTION_STACK_OVERFLOWzstack overflowz
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(faulthandler._rj   rS   )rR   )r4   excr   r   r   r   test_raise_exception$  s   z&FaultHandlerTests.test_raise_exceptionc                 C   sH   dD ]}d| d}t |}| |\}}| |g  | || qd S )N)l   cs@ l   RC@ z
                    import faulthandler
                    faulthandler.enable()
                    faulthandler._raise_exception(z)
                    r   r<   r-   )r4   Zexc_coder5   r8   r:   r   r   r   test_ignore_exception3  s   z'FaultHandlerTests.test_ignore_exceptionc                 C   sF   dD ]}|  d|dd\}}| |g  | |||d@ f qd S )N)r   ixV4i   @i  @i   piz{
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(0xr   rj   i)r<   r-   r   )r4   r   r8   r:   r   r   r   test_raise_nonfatal_exceptionC  s   
z/FaultHandlerTests.test_raise_nonfatal_exceptionc                 C   2   t d}| |\}}| |g  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
            faulthandler._raise_exception(code)
        l       r   r4   r5   r8   r:   r   r   r    test_disable_windows_exc_handler_  s   z2FaultHandlerTests.test_disable_windows_exc_handlerc                 C   r   )Nz`
            import faulthandler
            faulthandler.cancel_dump_traceback_later()
        r   r   r   r   r   r   .test_cancel_later_without_dump_traceback_laterl  s   z@FaultHandlerTests.test_cancel_later_without_dump_traceback_later)NNr   )FFr
   )FFFFN)H__name__
__module____qualname__r<   rK   rQ   rR   r   r   r   platform
startswithrW   r   r[   r_   r`   ra   rc   	_testcapiZ
skipUnlessr   r   rf   ri   rm   rl   rq   r   rs   rt   r   rz   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rU   r   r   r   r   r   r   r   r   r   r   0   s    
"
'

	
#	
	

	



.
;>
Q










r   __main__)r
   ) 
contextlibr   r   r   r0   r   r   r   r   r   r   Ztest.supportr   r   r   r   r   r   textwrapr   r  ImportErrorr   r   rU   r   r   r   ZTestCaser   r   mainr   r   r   r   <module>   sH    

	
      N