o
    HXîhþ+  ã                   @   sÄ   d Z ddlZddlZddlmZ ddlmZ e d¡ Zddl	Z	ddl
Z
ddlZe ¡ dd„ ƒZG dd	„ d	ejƒZG d
d„ dejƒZG dd„ deƒZG dd„ deƒZdd„ Zedkr`e ¡  dS dS )z&Unit tests for socket timeout feature.é    N)Úsupport)Úsocket_helperÚnetworkc                 C   sJ   t  | ¡ t | |tjtj¡d d W  d  ƒ S 1 sw   Y  dS )z—Resolve an (host, port) to an address.

    We must perform name resolution before timeout tests, otherwise it will be
    performed by connect().
    r   é   N)r   Útransient_internetÚsocketÚgetaddrinfoÚAF_INETÚSOCK_STREAM)ÚhostÚport© r   ú;/opt/python-3.10.19/usr/lib/python3.10/test/test_timeout.pyÚresolve_address   s   ÿÿÿ$ÿr   c                   @   sX   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ Zdd„ Z	dd„ Z
dd„ Zdd„ ZdS )ÚCreationTestCasez9Test case for socket.gettimeout() and socket.settimeout()c                 C   ó   t   t jt j¡| _d S ©N)r   r	   r
   Úsock©Úselfr   r   r   ÚsetUp   ó   zCreationTestCase.setUpc                 C   ó   | j  ¡  d S r   ©r   Úcloser   r   r   r   ÚtearDown"   ó   zCreationTestCase.tearDownc                 C   s   |   | j ¡ d d¡ d S )Nztimeout not disabled by default)ÚassertEqualr   Ú
gettimeoutr   r   r   r   ÚtestObjectCreation%   s   ÿz#CreationTestCase.testObjectCreationc                 C   s^   | j  d¡ |  | j  ¡ d¡ | j  d¡ |  | j  ¡ d¡ | j  d ¡ |  | j  ¡ d ¡ d S )Ngáz®Ga@é   )r   Ú
settimeoutr   r   r   r   r   r   ÚtestFloatReturnValue*   s   z%CreationTestCase.testFloatReturnValuec                 C   sP   | j  d¡ |  t| j  ¡ ƒtdƒ¡ | j  d¡ |  t| j  ¡ ƒtdƒ¡ d S )Né   ç      ð?g333333@)r   r!   r   Útyper   r   r   r   r   ÚtestReturnType5   s   zCreationTestCase.testReturnTypec                 C   s    | j  d¡ | j  d¡ | j  d¡ | j  d ¡ |  t| j jd¡ |  t| j jd¡ |  t| j jd¡ |  t| j jg ¡ |  t| j ji ¡ |  t| j jd¡ d S )Nr   ç        Ú r   y                )r   r!   ÚassertRaisesÚ	TypeErrorr   r   r   r   ÚtestTypeCheck=   s   zCreationTestCase.testTypeCheckc                 C   s:   |   t| jjd¡ |   t| jjd¡ |   t| jjd¡ d S )Néÿÿÿÿg      ð¿)r)   Ú
ValueErrorr   r!   r   r   r   r   ÚtestRangeCheckJ   s   zCreationTestCase.testRangeCheckc                 C   s”   | j  d¡ | j  d¡ |  | j  ¡ d ¡ | j  d¡ |  | j  ¡ d¡ | j  d¡ | j  d¡ |  | j  ¡ d¡ | j  d¡ |  | j  ¡ d ¡ d S )Né
   TFr'   )r   r!   Úsetblockingr   r   r   r   r   r   ÚtestTimeoutThenBlockingP   s   z(CreationTestCase.testTimeoutThenBlockingc                 C   sX   | j  d¡ | j  d¡ |  | j  ¡ d¡ | j  d¡ | j  d¡ |  | j  ¡ d¡ d S )NFr#   T)r   r0   r!   r   r   r   r   r   r   ÚtestBlockingThenTimeout^   s   z(CreationTestCase.testBlockingThenTimeoutN)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r"   r&   r+   r.   r1   r2   r   r   r   r   r      s    r   c                   @   s*   e Zd ZdZejZdd„ ZeZdd„ Z	dS )ÚTimeoutTestCaseg       @c                 C   s   t ƒ ‚r   )ÚNotImplementedErrorr   r   r   r   r   t   s   zTimeoutTestCase.setUpc           	      G   sœ   | j  |¡ t| j |ƒ}t|ƒD ]%}t ¡ }z||Ž  W q ty5 } zt ¡ | }W Y d}~ n
d}~ww |  d¡ |  ||| j	 ¡ |  
||d ¡ dS )z°
        Test the specified socket method.

        The method is run at most `count` times and must raise a TimeoutError
        within `timeout` + self.fuzz seconds.
        NzTimeoutError was not raisedr$   )r   r!   ÚgetattrÚrangeÚtimeÚ	monotonicÚTimeoutErrorZfailZ
assertLessÚfuzzZassertGreater)	r   ÚcountÚtimeoutÚmethodÚargsÚiÚt1ÚeZdeltar   r   r   Ú_sock_operationy   s   €þ
zTimeoutTestCase._sock_operationN)
r3   r4   r5   r>   r   ZHOSTÚ	localhostr   r   rF   r   r   r   r   r7   i   s    r7   c                   @   s\   e Zd ZdZdd„ Zdd„ Ze dd¡dd	„ ƒZd
d„ Z	dd„ Z
dd„ Zdd„ Zdd„ ZdS )ÚTCPTimeoutTestCasez3TCP test case for socket.socket() timeout functionsc                 C   s"   t   t jt j¡| _tddƒ| _d S )Nzwww.python.org.éP   )r   r	   r
   r   r   Úaddr_remoter   r   r   r   r   “   s   zTCPTimeoutTestCase.setUpc                 C   r   r   r   r   r   r   r   r   —   r   zTCPTimeoutTestCase.tearDownTz*need to replace these hosts; see bpo-35518c              
   C   s"  t ddƒ}t ddƒ}d}t tjtj¡}tj}| |¡ z1z| |¡ W n" ty-   Y n t	yF } z|j
t
jkr<d}W Y d }~nd }~ww W | ¡  ~n| ¡  ~w |rj|  d |d |d	 ||d |d	 ¡¡ || _t | jd ¡ |  d	d
d| j¡ W d   ƒ d S 1 sŠw   Y  d S )Nzblackhole.snakebite.netiZÝ  zwhitehole.snakebite.neti[Ý  TFzÇWe didn't receive a connection reset (RST) packet from {}:{} within {} seconds, so we're unable to test connect timeout against the corresponding {}:{} (which is configured to silently drop packets).r   r#   gü©ñÒMbP?Úconnect)r   r   r	   r
   r   ZLOOPBACK_TIMEOUTr!   rK   r=   ÚOSErrorÚerrnoZECONNREFUSEDr   ZskipTestÚformatrJ   r   r   rF   )r   Z	blackholeZ	whiteholeÚskipr   r@   Úerrr   r   r   ÚtestConnectTimeoutš   sF   


€þ€ÿûû"ÿz%TCPTimeoutTestCase.testConnectTimeoutc                 C   sT   t  | jd ¡ | j | j¡ |  dddd¡ W d   ƒ d S 1 s#w   Y  d S )Nr   r#   ç      ø?Úrecvé   )r   r   rJ   r   rK   rF   r   r   r   r   ÚtestRecvTimeoutì   s   "þz"TCPTimeoutTestCase.testRecvTimeoutc                 C   s,   t  | j| j¡ | j ¡  |  ddd¡ d S )Nr#   rR   Úaccept)r   Ú	bind_portr   rG   ÚlistenrF   r   r   r   r   ÚtestAcceptTimeoutò   s   
z$TCPTimeoutTestCase.testAcceptTimeoutc                 C   óp   t   t jt j¡&}t || j¡ | ¡  | j | 	¡ ¡ |  
ddddd ¡ W d   ƒ d S 1 s1w   Y  d S )Néd   rR   Úsendó   Xé@ ©r   r	   r
   r   rW   rG   rX   r   rK   ÚgetsocknamerF   ©r   Zservr   r   r   ÚtestSendø   ó   "ûzTCPTimeoutTestCase.testSendc              	   C   sv   t   t jt j¡)}t || j¡ | ¡  | j | 	¡ ¡ |  
ddddd | 	¡ ¡ W d   ƒ d S 1 s4w   Y  d S )Nr[   rR   Úsendtor]   r^   r_   ra   r   r   r   Ú
testSendto  s   ÿ"ûzTCPTimeoutTestCase.testSendtoc                 C   rZ   )Nr[   rR   Úsendallr]   r^   r_   ra   r   r   r   ÚtestSendall  rc   zTCPTimeoutTestCase.testSendallN)r3   r4   r5   r6   r   r   ÚunittestZskipIfrQ   rU   rY   rb   re   rg   r   r   r   r   rH      s    

Q	
rH   c                   @   s(   e Zd ZdZdd„ Zdd„ Zdd„ ZdS )	ÚUDPTimeoutTestCasez3UDP test case for socket.socket() timeout functionsc                 C   r   r   )r   r	   Ú
SOCK_DGRAMr   r   r   r   r   r     r   zUDPTimeoutTestCase.setUpc                 C   r   r   r   r   r   r   r   r     r   zUDPTimeoutTestCase.tearDownc                 C   s$   t  | j| j¡ |  dddd¡ d S )Nr#   rR   ÚrecvfromrT   )r   rW   r   rG   rF   r   r   r   r   ÚtestRecvfromTimeout  s   z&UDPTimeoutTestCase.testRecvfromTimeoutN)r3   r4   r5   r6   r   r   rl   r   r   r   r   ri     s
    ri   c                   C   s   t  d¡ d S )Nr   )r   Zrequiresr   r   r   r   ÚsetUpModule%  r   rm   Ú__main__)r6   Ú	functoolsrh   Útestr   Ztest.supportr   Zis_resource_enabledZskip_expectedr;   rM   r   Ú	lru_cacher   ZTestCaser   r7   rH   ri   rm   r3   Úmainr   r   r   r   Ú<module>   s(    
M' ÿ