2024-02-06 22:55:06 +04:00
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
2022-03-17 16:24:26 +04:00
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import json
import random
import re
import socket
2024-02-06 22:55:06 +04:00
import ssl
2022-03-17 16:24:26 +04:00
import string
2024-02-06 22:55:06 +04:00
import sys
2022-03-17 16:24:26 +04:00
from threading import Event , Thread
2024-02-06 22:55:06 +04:00
from SimpleWebSocketServer import ( SimpleSSLWebSocketServer ,
SimpleWebSocketServer , WebSocket )
2022-03-17 16:24:26 +04:00
def get_my_ip ( ) :
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM )
try :
# doesn't even have to be reachable
s . connect ( ( ' 8.8.8.8 ' , 1 ) )
IP = s . getsockname ( ) [ 0 ]
except Exception :
IP = ' 127.0.0.1 '
finally :
s . close ( )
return IP
class WebsocketTestEcho ( WebSocket ) :
def handleMessage ( self ) :
2024-04-12 13:14:22 +04:00
if isinstance ( self . data , bytes ) :
print ( f ' \n Server received binary data: { self . data . hex ( ) } \n ' )
self . sendMessage ( self . data , binary = True )
else :
print ( f ' \n Server received: { self . data } \n ' )
self . sendMessage ( self . data )
2022-03-17 16:24:26 +04:00
def handleConnected ( self ) :
print ( ' Connection from: {} ' . format ( self . address ) )
def handleClose ( self ) :
print ( ' {} closed the connection ' . format ( self . address ) )
# Simple Websocket server for testing purposes
class Websocket ( object ) :
def send_data ( self , data ) :
for nr , conn in self . server . connections . items ( ) :
conn . sendMessage ( data )
def run ( self ) :
2024-02-06 22:55:06 +04:00
if self . use_tls is True :
ssl_context = ssl . SSLContext ( ssl . PROTOCOL_TLS_SERVER )
ssl_context . load_cert_chain ( certfile = ' main/certs/server_cert.pem ' , keyfile = ' main/certs/server_key.pem ' )
if self . client_verify is True :
ssl_context . load_verify_locations ( cafile = ' main/certs/ca_cert.pem ' )
ssl_context . verify = ssl . CERT_REQUIRED
ssl_context . check_hostname = False
self . server = SimpleSSLWebSocketServer ( ' ' , self . port , WebsocketTestEcho , ssl_context = ssl_context )
else :
self . server = SimpleWebSocketServer ( ' ' , self . port , WebsocketTestEcho )
2022-03-17 16:24:26 +04:00
while not self . exit_event . is_set ( ) :
self . server . serveonce ( )
2024-02-06 22:55:06 +04:00
def __init__ ( self , port , use_tls , verify ) :
2022-03-17 16:24:26 +04:00
self . port = port
2024-02-06 22:55:06 +04:00
self . use_tls = use_tls
self . client_verify = verify
2022-03-17 16:24:26 +04:00
self . exit_event = Event ( )
self . thread = Thread ( target = self . run )
self . thread . start ( )
def __enter__ ( self ) :
return self
def __exit__ ( self , exc_type , exc_value , traceback ) :
self . exit_event . set ( )
self . thread . join ( 10 )
if self . thread . is_alive ( ) :
print ( ' Thread cannot be joined ' , ' orange ' )
def test_examples_protocol_websocket ( dut ) :
"""
steps :
1. obtain IP address
2. connect to uri specified in the config
3. send and receive data
"""
2022-10-11 16:31:57 +02:00
2024-04-12 13:14:22 +04:00
# Test for echo functionality:
# Sends a series of simple "hello" messages to the WebSocket server and verifies that each one is echoed back correctly.
# This tests the basic responsiveness and correctness of the WebSocket connection.
2022-03-17 16:24:26 +04:00
def test_echo ( dut ) :
dut . expect ( ' WEBSOCKET_EVENT_CONNECTED ' )
for i in range ( 0 , 5 ) :
dut . expect ( re . compile ( b ' Received=hello ( \\ d) ' ) )
print ( ' All echos received ' )
2024-02-06 22:55:06 +04:00
sys . stdout . flush ( )
2022-03-17 16:24:26 +04:00
2024-04-12 13:14:22 +04:00
# Test for clean closure of the WebSocket connection:
# Ensures that the WebSocket can correctly receive a close frame and terminate the connection without issues.
2022-03-17 16:24:26 +04:00
def test_close ( dut ) :
code = dut . expect (
re . compile (
2023-02-15 15:41:46 +01:00
b ' websocket: Received closed message with code=( \\ d*) ' ) ) [ 0 ]
2022-03-17 16:24:26 +04:00
print ( ' Received close frame with code {} ' . format ( code ) )
2022-07-11 16:05:24 +04:00
2024-04-12 13:14:22 +04:00
# Test for JSON message handling:
# Sends a JSON formatted string and verifies that the received message matches the expected JSON structure.
2022-07-11 16:05:24 +04:00
def test_json ( dut , websocket ) :
json_string = """
[
{
" id " : " 1 " ,
" name " : " user1 "
} ,
{
" id " : " 2 " ,
" name " : " user2 "
}
]
"""
websocket . send_data ( json_string )
data = json . loads ( json_string )
match = dut . expect (
2022-10-31 20:53:37 +04:00
re . compile ( b ' Json=( { [a-zA-Z0-9]*).*} ' ) ) . group ( 0 ) . decode ( ) [ 5 : ]
2022-07-11 16:05:24 +04:00
if match == str ( data [ 0 ] ) :
2024-04-12 13:14:22 +04:00
print ( ' \n Sent message and received message are equal \n ' )
2024-02-06 22:55:06 +04:00
sys . stdout . flush ( )
2022-07-11 16:05:24 +04:00
else :
raise ValueError (
' DUT received string do not match sent string, \n expected: {} \n with length {} \
\nreceived : { } \nwith length { } ' .format(
data [ 0 ] , len ( data [ 0 ] ) , match , len ( match ) ) )
2024-04-12 13:14:22 +04:00
# Test for receiving long messages:
# This sends a message with a specified length (2000 characters) to ensure the WebSocket can handle large data payloads. Repeated 3 times for reliability.
2022-03-17 16:24:26 +04:00
def test_recv_long_msg ( dut , websocket , msg_len , repeats ) :
2022-07-11 16:05:24 +04:00
2022-03-17 16:24:26 +04:00
send_msg = ' ' . join (
random . choice ( string . ascii_uppercase + string . ascii_lowercase +
string . digits ) for _ in range ( msg_len ) )
for _ in range ( repeats ) :
websocket . send_data ( send_msg )
2022-10-11 16:31:57 +02:00
2022-03-17 16:24:26 +04:00
recv_msg = ' '
while len ( recv_msg ) < msg_len :
match = dut . expect ( re . compile (
2022-10-31 20:53:37 +04:00
b ' Received=([a-zA-Z0-9]*).* \n ' ) ) . group ( 1 ) . decode ( )
2022-03-17 16:24:26 +04:00
recv_msg + = match
2022-10-11 16:31:57 +02:00
2022-03-17 16:24:26 +04:00
if recv_msg == send_msg :
2024-04-12 13:14:22 +04:00
print ( ' \n Sent message and received message are equal \n ' )
2024-02-06 22:55:06 +04:00
sys . stdout . flush ( )
2022-03-17 16:24:26 +04:00
else :
raise ValueError (
' DUT received string do not match sent string, \n expected: {} \n with length {} \
\nreceived : { } \nwith length { } ' .format(
send_msg , len ( send_msg ) , recv_msg , len ( recv_msg ) ) )
2022-10-11 16:31:57 +02:00
2024-04-12 13:14:22 +04:00
# Test for receiving the first fragment of a large message:
# Verifies the WebSocket's ability to correctly process the initial segment of a fragmented message.
def test_recv_fragmented_msg1 ( dut ) :
dut . expect ( ' websocket: Total payload length=2000, data_len=1024, current payload offset=0 ' )
# Test for receiving the second fragment of a large message:
# Confirms that the WebSocket can correctly handle and process the subsequent segment of a fragmented message.
def test_recv_fragmented_msg2 ( dut ) :
dut . expect ( ' websocket: Total payload length=2000, data_len=976, current payload offset=1024 ' )
# Test for receiving fragmented text messages:
# Checks if the WebSocket can accurately reconstruct a message sent in several smaller parts.
def test_fragmented_txt_msg ( dut ) :
2023-09-26 14:46:21 +04:00
dut . expect ( ' Received= ' + 32 * ' a ' + 32 * ' b ' )
2024-04-12 13:14:22 +04:00
print ( ' \n Fragmented data received \n ' )
# Extract the hexdump portion of the log line
def parse_hexdump ( line ) :
match = re . search ( r ' \ (.* \ ) Received binary data: ([0-9A-Fa-f ]+) ' , line )
if match :
hexdump = match . group ( 1 ) . strip ( ) . replace ( ' ' , ' ' )
# Convert the hexdump string to a bytearray
return bytearray . fromhex ( hexdump )
return bytearray ( )
# Capture the binary log output from the DUT
def test_fragmented_binary_msg ( dut ) :
match = dut . expect ( r ' \ (.* \ ) Received binary data: .* ' )
if match :
line = match . group ( 0 ) . strip ( )
if isinstance ( line , bytes ) :
line = line . decode ( ' utf-8 ' )
# Parse the hexdump from the log line
received_data = parse_hexdump ( line )
# Create the expected bytearray with the specified pattern
expected_data = bytearray ( [ 0 , 0 , 0 , 0 , 0 , 1 , 1 , 1 , 1 , 1 ] )
# Validate the received data
assert received_data == expected_data , f ' Received data does not match expected data. Received: { received_data } , Expected: { expected_data } '
print ( ' \n Fragmented data received \n ' )
else :
assert False , ' Log line with binary data not found '
2023-09-26 14:46:21 +04:00
2022-03-17 16:24:26 +04:00
# Starting of the test
try :
if dut . app . sdkconfig . get ( ' WEBSOCKET_URI_FROM_STDIN ' ) is True :
uri_from_stdin = True
else :
uri = dut . app . sdkconfig [ ' WEBSOCKET_URI ' ]
uri_from_stdin = False
2024-02-06 22:55:06 +04:00
if dut . app . sdkconfig . get ( ' WS_OVER_TLS_MUTUAL_AUTH ' ) is True :
use_tls = True
client_verify = True
else :
use_tls = False
client_verify = False
2022-03-17 16:24:26 +04:00
except Exception :
print ( ' ENV_TEST_FAILURE: Cannot find uri settings in sdkconfig ' )
raise
if uri_from_stdin :
server_port = 8080
2024-02-06 22:55:06 +04:00
with Websocket ( server_port , use_tls , client_verify ) as ws :
if use_tls is True :
uri = ' wss:// {} : {} ' . format ( get_my_ip ( ) , server_port )
else :
uri = ' ws:// {} : {} ' . format ( get_my_ip ( ) , server_port )
2022-03-17 16:24:26 +04:00
print ( ' DUT connecting to {} ' . format ( uri ) )
dut . expect ( ' Please enter uri of websocket endpoint ' , timeout = 30 )
dut . write ( uri )
test_echo ( dut )
test_recv_long_msg ( dut , ws , 2000 , 3 )
2022-07-11 16:05:24 +04:00
test_json ( dut , ws )
2024-04-12 13:14:22 +04:00
test_fragmented_txt_msg ( dut )
test_fragmented_binary_msg ( dut )
test_recv_fragmented_msg1 ( dut )
test_recv_fragmented_msg2 ( dut )
2022-03-17 16:24:26 +04:00
test_close ( dut )
else :
print ( ' DUT connecting to {} ' . format ( uri ) )
test_echo ( dut )