o
    M,hP'                     @   s   d dl Z d dlZd dlmZmZmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ 	 G dd de jZed	kr@e   dS dS )
    N)Mockpatch	MagicMock)ABNF)	WebSocket)WebSocketProtocolExceptionWebSocketPayloadException)SSLErrorc                   @   sL   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dS )LargePayloadTestc              	   C   s   g d}|D ]B\}}| j ||d/ d| }t|tj}| }| |t | t||k | 	t|j
| W d   n1 sCw   Y  qdS )z:Test WebSocket frame length encoding at various boundaries)	)}   zSingle byte length)~   zTwo byte length start)   zTwo byte length)  zTwo byte length max)   zEight byte length start) @  z16KB boundary)@  zJust over 16KB)   32KB)i   128KB)lengthdescription   AN)subTestr   create_frameOPCODE_BINARYformatassertIsInstancebytes
assertTruelenassertEqualdata)self
test_casesr   r   payloadframe	formatted r'   o/var/www/www-root/data/www/bot.pdev.uz/venv/lib/python3.10/site-packages/websocket/tests/test_large_payloads.py%test_frame_length_encoding_boundaries   s   z6LargePayloadTest.test_frame_length_encoding_boundariesc                    s   dd }g d}t dt||D ]}||||   qd  fdd}ddlm} ||dd	}|t|}| || |  d
 dS )zHTest receiving large payloads in chunks (simulating the 16KB recv issue)   Br   r   r   c                    $    t krdS   } d7  |S N       r   bufsizeresult
call_countchunksr'   r(   	mock_recvO   
   zCLargePayloadTest.test_recv_large_payload_chunked.<locals>.mock_recvframe_bufferTskip_utf8_validationr.   N)ranger   appendwebsocket._abnfr9   recv_strictr    assertGreater)r"   large_payload
chunk_sizeir6   r9   fbr2   r'   r3   r(   test_recv_large_payload_chunkedA   s   	z0LargePayloadTest.test_recv_large_payload_chunkedc                    sr   d}g   fdd}ddl m} ||dd}||}| t|| | tdd	  D  | t d
 dS )z&Simulate SSL BAD_LENGTH error scenarior   c                    s(     |  | dkrtddt| d S )Nr   z[SSL: BAD_LENGTH] unknown error   C)r=   r	   min)r1   
recv_callsr'   r(   mock_recv_with_ssl_limitk   s   
zTLargePayloadTest.test_ssl_large_payload_simulation.<locals>.mock_recv_with_ssl_limitr   r8   Tr:   c                 s   s    | ]}|d kV  qdS )r   Nr'   ).0callr'   r'   r(   	<genexpr>|   s    zELargePayloadTest.test_ssl_large_payload_simulation.<locals>.<genexpr>r.   N)r>   r9   r?   r    r   r   allr@   )r"   payload_sizerJ   r9   rD   r2   r'   rH   r(   !test_ssl_large_payload_simulationc   s   
z2LargePayloadTest.test_ssl_large_payload_simulationc              	   C   s   g d}|D ]Z}| j |dJ d| }t|tj}| }| |t | t|j	| d}|tj
k r9d| }n|tjk rCd| }nd| }| t|||  W d   n1 s[w   Y  qdS )z6Test frame formatting with various large payload sizes)i?  r   r   r   r   r   )size   D      
   N)r   r   r   r   r   r   r   r    r   r!   LENGTH_7	LENGTH_16)r"   
test_sizesrQ   r$   r%   r&   	mask_sizeexpected_header_sizer'   r'   r(    test_frame_format_large_payloads   s*   

z1LargePayloadTest.test_frame_format_large_payloadsc                    s   t  }g   fdd}||_d|j_t }||_d|_dd }td}dd	 |_|	|}| 
|d
 W d   dS 1 s?w   Y  dS )z?Test that large payloads are sent in chunks to avoid SSL issuesc                    s     t|  t| S N)r=   r   )r!   sent_chunksr'   r(   	mock_send   s   zDLargePayloadTest.test_send_large_payload_chunking.<locals>.mock_sendg      >@T   Er   zwebsocket._core.sendc                 S   s   t |S r\   r/   )sockr!   r'   r'   r(   <lambda>   s    zCLargePayloadTest.test_send_large_payload_chunking.<locals>.<lambda>r   N)r   send
gettimeoutreturn_valuer   ra   	connectedr   side_effectsend_binaryr@   )r"   	mock_sockr_   wsrA   mock_send_funcr2   r'   r]   r(    test_send_large_payload_chunking   s   


"z1LargePayloadTest.test_send_large_payload_chunkingc                 C   s   dd }t |t j}| }| |t tddd }t ddddt jd|}| 	t
 |jdd	 W d
   d
S 1 s?w   Y  d
S )z.Test UTF-8 validation with large text payloadsu   Hello 世界! i  z!Hi  s    invalid utf8r.   r   Fr:   N)r   r   OPCODE_TEXTr   r   r   structpackOPCODE_CLOSEassertRaisesr   validate)r"   
large_textr%   r&   invalid_utf8_close_datar'   r'   r(   test_utf8_validation_large_text   s   "z0LargePayloadTest.test_utf8_validation_large_textc                    sv   dd fddt dtdD d  fdd}dd	lm} ||d
d}|d}| | |  d dS )z9Test frame buffer with edge cases that could trigger bugs   Fr   c                    s   g | ]
} ||d   qS )   r'   )rK   rC   )payload_16kr'   r(   
<listcomp>   s    zALargePayloadTest.test_frame_buffer_edge_cases.<locals>.<listcomp>r   rw   c                    r+   r,   r/   r0   r3   r'   r(   r6      r7   z@LargePayloadTest.test_frame_buffer_edge_cases.<locals>.mock_recvr8   Tr:   rS   N)r<   r   r>   r9   r?   r    )r"   r6   r9   rD   r2   r'   )r4   r5   rx   r(   test_frame_buffer_edge_cases   s   
z-LargePayloadTest.test_frame_buffer_edge_casesc                 C   sD   d}d| }t |t j}| }| |t | t|j| dS )z4Test behavior at WebSocket maximum frame size limitsi      GN)	r   r   r   r   r   r   r    r   r!   )r"   
large_sizer$   r%   r&   r'   r'   r(   test_max_frame_size_limits   s   z+LargePayloadTest.test_max_frame_size_limitsN)__name__
__module____qualname__r)   rE   rP   r[   rl   ru   rz   r}   r'   r'   r'   r(   r
      s    ""'!r
   __main__)unittestrn   unittest.mockr   r   r   r>   r   websocket._corer   websocket._exceptionsr   r   websocket._ssl_compatr	   TestCaser
   r~   mainr'   r'   r'   r(   <module>   s    s