Files
esp-protocols/components/libwebsockets/examples/client/pytest_websocket.py

238 lines
8.6 KiB
Python

# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import json
import random
import re
import socket
import ssl
import string
import sys
from threading import Event, Thread
from SimpleWebSocketServer import (SimpleSSLWebSocketServer,
SimpleWebSocketServer, WebSocket)
def get_my_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('8.8.8.8', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
class WebsocketTestEcho(WebSocket):
def handleMessage(self):
if isinstance(self.data, bytes):
print(f'\n Server received binary data: {self.data.hex()}\n')
self.sendMessage(self.data, binary=True)
else:
print(f'\n Server received: {self.data}\n')
self.sendMessage(self.data)
def handleConnected(self):
print('Connection from: {}'.format(self.address))
def handleClose(self):
print('{} closed the connection'.format(self.address))
# Simple Websocket server for testing purposes
class Websocket(object):
def send_data(self, data):
for nr, conn in self.server.connections.items():
conn.sendMessage(data)
def run(self):
if self.use_tls is True:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile='main/certs/server/server_cert.pem', keyfile='main/certs/server/server_key.pem')
if self.client_verify is True:
ssl_context.load_verify_locations(cafile='main/certs/ca_cert.pem')
ssl_context.verify = ssl.CERT_REQUIRED
ssl_context.check_hostname = False
self.server = SimpleSSLWebSocketServer('', self.port, WebsocketTestEcho, ssl_context=ssl_context)
else:
self.server = SimpleWebSocketServer('', self.port, WebsocketTestEcho)
while not self.exit_event.is_set():
self.server.serveonce()
def __init__(self, port, use_tls, verify):
self.port = port
self.use_tls = use_tls
self.client_verify = verify
self.exit_event = Event()
self.thread = Thread(target=self.run)
self.thread.start()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.exit_event.set()
self.thread.join(10)
if self.thread.is_alive():
print('Thread cannot be joined', 'orange')
def test_examples_protocol_websocket(dut):
"""
steps:
1. obtain IP address
2. connect to uri specified in the config
3. send and receive data
"""
# Test for echo functionality:
# Sends a series of simple "hello" messages to the WebSocket server and verifies that each one is echoed back correctly.
# This tests the basic responsiveness and correctness of the WebSocket connection.
def test_echo(dut):
dut.expect('WEBSOCKET_EVENT_CONNECTED')
for i in range(0, 5):
dut.expect(re.compile(b'Received=hello (\\d)'))
print('All echos received')
sys.stdout.flush()
# Test for clean closure of the WebSocket connection:
# Ensures that the WebSocket can correctly receive a close frame and terminate the connection without issues.
def test_close(dut):
dut.expect('__lws_lc_untag')
# Test for JSON message handling:
# Sends a JSON formatted string and verifies that the received message matches the expected JSON structure.
def test_json(dut, websocket):
json_string = """
[
{
"id":"1",
"name":"user1"
},
{
"id":"2",
"name":"user2"
}
]
"""
websocket.send_data(json_string)
data = json.loads(json_string)
match = dut.expect(
re.compile(b'Json=({[a-zA-Z0-9]*).*}')).group(0).decode()[5:]
if match == str(data[0]):
print('\n Sent message and received message are equal \n')
sys.stdout.flush()
else:
raise ValueError(
'DUT received string do not match sent string, \nexpected: {}\nwith length {}\
\nreceived: {}\nwith length {}'.format(
data[0], len(data[0]), match, len(match)))
# Test for receiving long messages:
# This sends a message with a specified length (2000 characters) to ensure the WebSocket can handle large data payloads. Repeated 3 times for reliability.
def test_recv_long_msg(dut, websocket, msg_len, repeats):
send_msg = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase +
string.digits) for _ in range(msg_len))
for _ in range(repeats):
websocket.send_data(send_msg)
recv_msg = ''
while len(recv_msg) < msg_len:
match = dut.expect(re.compile(
b'Received=([a-zA-Z0-9]*).*\n')).group(1).decode()
recv_msg += match
if recv_msg == send_msg:
print('\n Sent message and received message are equal \n')
sys.stdout.flush()
else:
raise ValueError(
'DUT received string do not match sent string, \nexpected: {}\nwith length {}\
\nreceived: {}\nwith length {}'.format(
send_msg, len(send_msg), recv_msg, len(recv_msg)))
# Test for receiving the first fragment of a large message:
# Verifies the WebSocket's ability to correctly process the initial segment of a fragmented message.
def test_recv_fragmented_msg1(dut):
dut.expect('Total payload length=2000, data_len=1024')
# Test for receiving fragmented text messages:
# Checks if the WebSocket can accurately reconstruct a message sent in several smaller parts.
def test_fragmented_txt_msg(dut):
dut.expect('Received=' + 32 * 'a' + 32 * 'b')
print('\nFragmented data received\n')
# Extract the hexdump portion of the log line
def parse_hexdump(line):
match = re.search(r'\(.*\) Received binary data: ([0-9A-Fa-f ]+)', line)
if match:
hexdump = match.group(1).strip().replace(' ', '')
# Convert the hexdump string to a bytearray
return bytearray.fromhex(hexdump)
return bytearray()
# Capture the binary log output from the DUT
def test_fragmented_binary_msg(dut):
match = dut.expect(r'\(.*\) Received binary data: .*')
if match:
line = match.group(0).strip()
if isinstance(line, bytes):
line = line.decode('utf-8')
# Parse the hexdump from the log line
received_data = parse_hexdump(line)
# Create the expected bytearray with the specified pattern
expected_data = bytearray([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
# Validate the received data
assert received_data == expected_data, f'Received data does not match expected data. Received: {received_data}, Expected: {expected_data}'
print('\nFragmented data received\n')
else:
assert False, 'Log line with binary data not found'
# Starting of the test
try:
if dut.app.sdkconfig.get('WEBSOCKET_URI_FROM_STDIN') is True:
uri_from_stdin = True
else:
uri = dut.app.sdkconfig['WEBSOCKET_URI']
uri_from_stdin = False
if dut.app.sdkconfig.get('WS_OVER_TLS_MUTUAL_AUTH') is True:
use_tls = True
client_verify = True
else:
use_tls = False
client_verify = False
except Exception:
print('ENV_TEST_FAILURE: Cannot find uri settings in sdkconfig')
raise
if uri_from_stdin:
server_port = 8080
with Websocket(server_port, use_tls, client_verify) as ws:
uri = '{}'.format(get_my_ip())
print('DUT connecting to {}'.format(uri))
dut.expect('Please enter uri of websocket endpoint', timeout=30)
dut.write(uri)
test_echo(dut)
test_recv_long_msg(dut, ws, 2000, 3)
test_json(dut, ws)
test_fragmented_txt_msg(dut)
test_fragmented_binary_msg(dut)
test_recv_fragmented_msg1(dut)
test_close(dut)
else:
print('DUT connecting to {}'.format(uri))
test_echo(dut)