forked from espressif/esp-protocols
		
	
		
			
	
	
		
			129 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			129 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import os | ||
|  | import random | ||
|  | import re | ||
|  | import socket | ||
|  | import string | ||
|  | from threading import Event, Thread | ||
|  | import pytest | ||
|  | import sys | ||
|  | 
 | ||
|  | from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket | ||
|  | from pytest_embedded import Dut | ||
|  | 
 | ||
|  | 
 | ||
|  | def get_my_ip(): | ||
|  |     s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
|  |     try: | ||
|  |         # doesn't even have to be reachable | ||
|  |         s.connect(('8.8.8.8', 1)) | ||
|  |         IP = s.getsockname()[0] | ||
|  |     except Exception: | ||
|  |         IP = '127.0.0.1' | ||
|  |     finally: | ||
|  |         s.close() | ||
|  |     return IP | ||
|  | 
 | ||
|  | 
 | ||
|  | class WebsocketTestEcho(WebSocket): | ||
|  | 
 | ||
|  |     def handleMessage(self): | ||
|  |         self.sendMessage(self.data) | ||
|  |         print('Server sent: {}'.format(self.data)) | ||
|  | 
 | ||
|  |     def handleConnected(self): | ||
|  |         print('Connection from: {}'.format(self.address)) | ||
|  | 
 | ||
|  |     def handleClose(self): | ||
|  |         print('{} closed the connection'.format(self.address)) | ||
|  | 
 | ||
|  | 
 | ||
|  | # Simple Websocket server for testing purposes | ||
|  | class Websocket(object): | ||
|  | 
 | ||
|  |     def send_data(self, data): | ||
|  |         for nr, conn in self.server.connections.items(): | ||
|  |             conn.sendMessage(data) | ||
|  | 
 | ||
|  |     def run(self): | ||
|  |         self.server = SimpleWebSocketServer('', self.port, WebsocketTestEcho) | ||
|  |         while not self.exit_event.is_set(): | ||
|  |             self.server.serveonce() | ||
|  | 
 | ||
|  |     def __init__(self, port): | ||
|  |         self.port = port | ||
|  |         self.exit_event = Event() | ||
|  |         self.thread = Thread(target=self.run) | ||
|  |         self.thread.start() | ||
|  | 
 | ||
|  |     def __enter__(self): | ||
|  |         return self | ||
|  | 
 | ||
|  |     def __exit__(self, exc_type, exc_value, traceback): | ||
|  |         self.exit_event.set() | ||
|  |         self.thread.join(10) | ||
|  |         if self.thread.is_alive(): | ||
|  |             Utility.console_log('Thread cannot be joined', 'orange') | ||
|  | 
 | ||
|  | 
 | ||
|  | def test_examples_protocol_websocket(dut): | ||
|  |     """
 | ||
|  |     steps: | ||
|  |       1. obtain IP address | ||
|  |       2. connect to uri specified in the config | ||
|  |       3. send and receive data | ||
|  |     """
 | ||
|  |     def test_echo(dut): | ||
|  |         dut.expect('WEBSOCKET_EVENT_CONNECTED') | ||
|  |         for i in range(0, 5): | ||
|  |             dut.expect(re.compile(b'Received=hello (\\d)')) | ||
|  |         print('All echos received') | ||
|  | 
 | ||
|  |     def test_close(dut): | ||
|  |         code = dut.expect(re.compile(b'WEBSOCKET: Received closed message with code=(\\d*)'))[0] | ||
|  |         print('Received close frame with code {}'.format(code)) | ||
|  |   | ||
|  |     def test_recv_long_msg(dut, websocket, msg_len, repeats): | ||
|  |         send_msg = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(msg_len)) | ||
|  | 
 | ||
|  |         for _ in range(repeats): | ||
|  |             websocket.send_data(send_msg) | ||
|  |      | ||
|  |             recv_msg = '' | ||
|  |             while len(recv_msg) < msg_len: | ||
|  |                 match = dut.expect(re.compile(b'Received=([a-zA-Z0-9]*).*')).group(1).decode() | ||
|  |                 recv_msg += match | ||
|  |      | ||
|  |             if recv_msg == send_msg: | ||
|  |                 print('Sent message and received message are equal') | ||
|  |             else: | ||
|  |                 raise ValueError('DUT received string do not match sent string, \nexpected: {}\nwith length {}\
 | ||
|  |                                 \nreceived: {}\nwith length {}'.format(send_msg, len(send_msg), recv_msg, len(recv_msg))) | ||
|  |      | ||
|  |     # Starting of the test | ||
|  |     try: | ||
|  |         if dut.app.sdkconfig.get('WEBSOCKET_URI_FROM_STDIN') is True: | ||
|  |             uri_from_stdin = True | ||
|  |         else: | ||
|  |             uri = dut.app.sdkconfig['WEBSOCKET_URI'] | ||
|  |             uri_from_stdin = False | ||
|  | 
 | ||
|  |     except Exception: | ||
|  |         print('ENV_TEST_FAILURE: Cannot find uri settings in sdkconfig') | ||
|  |         raise | ||
|  | 
 | ||
|  |     if uri_from_stdin: | ||
|  |         server_port = 8080 | ||
|  |         with Websocket(server_port) as ws: | ||
|  |             uri = 'ws://{}:{}'.format(get_my_ip(), server_port) | ||
|  |             print('DUT connecting to {}'.format(uri)) | ||
|  |             dut.expect('Please enter uri of websocket endpoint', timeout=30) | ||
|  |             dut.write(uri) | ||
|  |             test_echo(dut) | ||
|  |             # Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte | ||
|  |             test_recv_long_msg(dut, ws, 2000, 3) | ||
|  |             test_close(dut) | ||
|  |     else: | ||
|  |         print('DUT connecting to {}'.format(uri)) | ||
|  |         test_echo(dut) | ||
|  | 
 |