o
    HXhr*                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZe	e j
dkdG dd dZG dd deejZG dd	 d	eejZG d
d deZG dd deejZG dd deejZG dd deZG dd deejZG dd deejZedkre  dS dS )    Nposixztests requires a posix system.c                   @   sV   e Zd Zdd Zdd Zdd Z		dd	d
Zdd ZdZdd Z	dd Z
dd ZdS )TestFileIOSignalInterruptc                 C   s
   d | _ d S N)_processself r   >/opt/python-3.10.19/usr/lib/python3.10/test/test_file_eintr.pysetUp   s   
zTestFileIOSignalInterrupt.setUpc                 C   sB   | j r| j  d u rz| j   W d S  ty   Y d S w d S d S r   )r   pollkillOSErrorr   r   r   r	   tearDown   s   z"TestFileIOSignalInterrupt.tearDownc                 C      	 d| j  S )Nz=import %s as io ;infile = io.FileIO(sys.stdin.fileno(), "rb")modnamer   r   r   r	   _generate_infile_setup_code$   s   z5TestFileIOSignalInterrupt._generate_infile_setup_code    Tc                 C   s~   	 | j  d u rtd z| j   W n	 ty   Y nw |r/| j  \}}||7 }||7 }| d|| | f  d S )Ng?z/Error from IO process %s:
STDOUT:
%sSTDERR:
%s
)	r   r   timesleep	terminater   communicatefaildecode)r   whystdoutstderrr   Z
stdout_endZ
stderr_endr   r   r	   fail_with_process_info-   s   

z0TestFileIOSignalInterrupt.fail_with_process_infoc                 C   s8  	 |   }tjtjddd| d d | d d gtjtjtjd| _| jjt	d}|dkr7| j
d	|d
 | jj| d}g }|smt| jjgddd\}}}| jtj |d7 }|dkrk| j  | d |rD| jj }|dkr~| j
d|d
 | jjdd\}	}
| jjr| j
d| jj |	|
dd d S d S )Nz-uz-czXimport signal, sys ;signal.signal(signal.SIGINT, lambda s, f: sys.stderr.write("$\n")) ;z ;z"sys.stderr.write("Worm Sign!\n") ;zinfile.close())stdinr   r   s   Worm Sign!
zwhile awaiting a sign)r   r   r   g?      z,reader process failed to handle our signals.s   $
zwhile awaiting signal   
)inputzexited rc=%dF)r   )r   
subprocessPopensys
executablePIPEr   r   readlenr   r   writeselectsend_signalsignalSIGINTr   r   readliner   
returncode)r   data_to_writeread_and_verify_codeZinfile_setup_codeZ	worm_signZsignals_sentZrlist_Zsignal_liner   r   r   r   r	   _test_readingH   sb   	

	

z'TestFileIOSignalInterrupt._test_readingzgot = infile.{read_method_name}() ;expected = {expected!r} ;assert got == expected, ("{read_method_name} returned wrong data.\n""got data %r\nexpected %r" % (got, expected))c                 C       	 | j d| jjdddd d S )N   hello, world!r/   s   hello, world!
Zread_method_nameZexpectedr1   r2   r4   _READING_CODE_TEMPLATEformatr   r   r   r	   test_readline      
z'TestFileIOSignalInterrupt.test_readlinec                 C   $   	 | j d| jjdddgdd d S )N   hello
world!	readliness   hello
s   world!
r7   r8   r9   r   r   r   r	   test_readlines      
z(TestFileIOSignalInterrupt.test_readlinesc                 C   s:   	 | j d| jjdddd | j d| jjdddd d S )Nr?   readall   hello
world!
r7   r8   r(   r9   r   r   r   r	   test_readall   s   
z&TestFileIOSignalInterrupt.test_readallN)r   r   T)__name__
__module____qualname__r
   r   r   r   r4   r:   r<   rA   rE   r   r   r   r	   r      s    	
Ir   c                   @      e Zd ZdZdS )CTestFileIOSignalInterrupt_ioNrF   rG   rH   r   r   r   r   r	   rJ          rJ   c                   @   rI   )PyTestFileIOSignalInterrupt_pyioNrL   r   r   r   r	   rN      rM   rN   c                   @   s   e Zd Zdd Zdd ZdS )TestBufferedIOSignalInterruptc                 C   r   )Nziimport %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;assert isinstance(infile, io.BufferedReader)r   r   r   r   r	   r      s   z9TestBufferedIOSignalInterrupt._generate_infile_setup_codec                 C   r5   )Nr?   r(   rD   r7   r8   r9   r   r   r   r	   rE      r=   z*TestBufferedIOSignalInterrupt.test_readallN)rF   rG   rH   r   rE   r   r   r   r	   rP      s    rP   c                   @   rI   )CTestBufferedIOSignalInterruptrK   NrL   r   r   r   r	   rQ      rM   rQ   c                   @   rI   )PyTestBufferedIOSignalInterruptrO   NrL   r   r   r   r	   rR      rM   rR   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
TestTextIOSignalInterruptc                 C   r   )Nzimport %s as io ;infile = io.open(sys.stdin.fileno(), encoding="utf-8", newline=None) ;assert isinstance(infile, io.TextIOWrapper)r   r   r   r   r	   r      s   z5TestTextIOSignalInterrupt._generate_infile_setup_codec                 C   r5   )Nr6   r/   zhello, world!
r7   r8   r9   r   r   r   r	   r<      r=   z'TestTextIOSignalInterrupt.test_readlinec                 C   r>   )Ns   hello
world!r@   zhello
zworld!
r7   r8   r9   r   r   r   r	   rA      rB   z(TestTextIOSignalInterrupt.test_readlinesc                 C   r5   )Nr?   r(   zhello
world!
r7   r8   r9   r   r   r   r	   rE      r=   z&TestTextIOSignalInterrupt.test_readallN)rF   rG   rH   r   r<   rA   rE   r   r   r   r	   rS      s
    rS   c                   @   rI   )CTestTextIOSignalInterruptrK   NrL   r   r   r   r	   rT      rM   rT   c                   @   rI   )PyTestTextIOSignalInterruptrO   NrL   r   r   r   r	   rU      rM   rU   __main__)osr+   r-   r#   r%   r   ZunittestrK   rO   Z
skipUnlessnamer   ZTestCaserJ   rN   rP   rQ   rR   rS   rT   rU   rF   mainr   r   r   r	   <module>   s.   
  