diff --git a/.gitlab/ci/target-test.yml b/.gitlab/ci/target-test.yml index a1921bd906..83c924955c 100644 --- a/.gitlab/ci/target-test.yml +++ b/.gitlab/ci/target-test.yml @@ -869,14 +869,6 @@ example_test_001C: - ESP32 - Example_GENERIC -example_test_protocols: - extends: - - .example_test_esp32_template - - .rules:test:example_test-esp32-wifi - tags: - - ESP32 - - wifi_router - example_test_002: extends: - .example_test_esp32_template diff --git a/examples/protocols/sockets/README.md b/examples/protocols/sockets/README.md index 3dc8c7802a..771d642a7c 100644 --- a/examples/protocols/sockets/README.md +++ b/examples/protocols/sockets/README.md @@ -30,7 +30,38 @@ There are many host-side tools which can be used to interact with the UDP/TCP se One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets. Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following commands. -In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples. +In addition to those tools, There are some python scripts under `examples/protocols/sockets/scripts`. +And scripts for automated tests named `pytest_xxx.py` can be found under each example directory. + + +### Python Scripts Socket Tools + +Python scripts under `examples/protocols/sockets/scripts` could be used to exercise the socket communication. +Command line arguments such as IP version and IP address shall be supplied. Use `python xxxx.py --help` to see how to use these scripts. + +Examples: +```bash +# python run_tcp_client.py --help +python run_tcp_client.py 192.168.1.2 [--port=3333] [--message="Data to ESP"] +python run_tcp_client.py fe80::2%eth0 [--port=3333] [--message="Data to ESP"] +# python run_tcp_server.py --help +python run_tcp_server.py [--port=3333] [--ipv6] +``` + +### Python Scripts For Automated Tests + +Script named `pytest_xxxx` in the application directory can be used for automated tests. +They can also be run locally. Ref: [ESP-IDF Tests with Pytest Guide](https://docs.espressif.com/projects/esp-idf/en/latest/esp32/contribute/esp-idf-tests-with-pytest.html). + +Example: +```bash +$ cd $IDF_PATH +$ bash install.sh --enable-pytest +$ . ./export.sh +$ cd examples/protocols/sockets/tcp_client +$ python $IDF_PATH/tools/ci/ci_build_apps.py . --target esp32 -vv --pytest-apps +$ pytest --target esp32 +``` ### Send UDP packet via netcat ``` @@ -62,16 +93,6 @@ nc 192.168.0.167 3333 nc -l 192.168.0.167 -p 3333 ``` -### Python scripts -Each script in the application directory could be used to exercise the socket communication. -Command line arguments such as IP version (IPv4 or IPv6) and IP address and payload data (only clients) shall be supplied. -In addition to that, port number and interface id are hardcoded in the scripts and might need to be altered to match the values used by the application. Example: - -``` -PORT = 3333 -INTERFACE = 'en0' -``` - ### Note about IPv6 addresses Examples are configured to obtain multiple IPv6 addresses. The actual behavior may differ depending on the local network, typically the ESP gets assigned these two addresses diff --git a/examples/protocols/sockets/non_blocking/example_test.py b/examples/protocols/sockets/non_blocking/example_test.py deleted file mode 100644 index ecd9522a85..0000000000 --- a/examples/protocols/sockets/non_blocking/example_test.py +++ /dev/null @@ -1,26 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- - -from __future__ import print_function, unicode_literals - -import re - -import ttfw_idf - - -@ttfw_idf.idf_example_test(env_tag='Example_GENERIC') -def test_examples_protocol_socket_non_block(env, _): - dut = env.get_dut('non_blocking_socket', 'examples/protocols/sockets/non_blocking', dut_class=ttfw_idf.ESP32DUT) - - # start the test and expect the client to receive back it's original data - dut.start_app() - dut.expect(re.compile(r'nonblocking-socket-client: Received: GET / HTTP/1.1'), timeout=30) - - -if __name__ == '__main__': - test_examples_protocol_socket_non_block() diff --git a/examples/protocols/sockets/non_blocking/pytet_socket_non_blocking.py b/examples/protocols/sockets/non_blocking/pytet_socket_non_blocking.py new file mode 100644 index 0000000000..03d9c502c6 --- /dev/null +++ b/examples/protocols/sockets/non_blocking/pytet_socket_non_blocking.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from pytest_embedded import Dut + + +@pytest.mark.supported_targets +@pytest.mark.generic +def test_examples_non_block_socket_localhost(dut: Dut) -> None: + dut.expect(r'nonblocking-socket-client: Received: GET / HTTP/1.1', timeout=30) diff --git a/examples/protocols/sockets/scripts/run_tcp_client.py b/examples/protocols/sockets/scripts/run_tcp_client.py new file mode 100644 index 0000000000..0839721c42 --- /dev/null +++ b/examples/protocols/sockets/scripts/run_tcp_client.py @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +import argparse +import os +import socket + +DEF_PORT = 3333 +DEF_MESSAGE = 'Data to ESP' + + +def tcp_client(address: str, port: int, payload: str) -> str: + for res in socket.getaddrinfo(address, port, socket.AF_UNSPEC, + socket.SOCK_STREAM, 0, socket.AI_PASSIVE): + family_addr, _, _, _, addr = res + try: + sock = socket.socket(family_addr, socket.SOCK_STREAM) + sock.settimeout(60.0) + except socket.error as msg: + print('Could not create socket') + print(os.strerror(msg.errno)) + raise + try: + sock.connect(addr) + except socket.error as e: + print('Could not open socket: ' + str(e)) + sock.close() + raise + sock.sendall(payload.encode()) + data = sock.recv(1024) + if not data: + return '' + print('Reply : ' + data.decode()) + sock.close() + return data.decode() + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('remote_ip', help='TCP server ip address, eg: 192.168.1.1, fe80::2%%eth0') + parser.add_argument('--port', default=DEF_PORT, type=int, help='TCP server port') + parser.add_argument('--message', default=DEF_MESSAGE, help='Message to send to the server.') + args = parser.parse_args() + + print(f'Send message to the server: {args.remote_ip}') + data = tcp_client(args.remote_ip, args.port, args.message) + print(f'Received From server: {data}') + + +if __name__ == '__main__': + main() diff --git a/examples/protocols/sockets/scripts/run_tcp_server.py b/examples/protocols/sockets/scripts/run_tcp_server.py new file mode 100644 index 0000000000..c2cb9a8c7a --- /dev/null +++ b/examples/protocols/sockets/scripts/run_tcp_server.py @@ -0,0 +1,85 @@ +# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +import argparse +import socket +from threading import Event, Thread + +DEF_PORT = 3333 + + +class TcpServer(object): + def __init__(self, port, family_addr, persist=False, timeout=60): # type: ignore + self.port = port + self.socket = socket.socket(family_addr, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.settimeout(timeout) + self.shutdown = Event() + self.persist = persist + self.family_addr = family_addr + self.server_thread = None + + def __enter__(self): # type: ignore + try: + self.socket.bind(('', self.port)) + except socket.error as e: + print('Bind failed:{}'.format(e)) + raise + self.socket.listen(1) + + print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr)) + self.server_thread = Thread(target=self.run_server) + self.server_thread.start() + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore + if self.persist: + sock = socket.socket(self.family_addr, socket.SOCK_STREAM) + sock.connect(('localhost', self.port)) + sock.sendall(b'Stop', ) + sock.close() + self.shutdown.set() + self.shutdown.set() + self.server_thread.join() + self.socket.close() + + def run_server(self) -> None: + while not self.shutdown.is_set(): + try: + conn, address = self.socket.accept() # accept new connection + print('Connection from: {}'.format(address)) + conn.setblocking(1) + data = conn.recv(1024) + if not data: + return + data = data.decode() + print('Received data: ' + data) + reply = 'OK: ' + data + conn.send(reply.encode()) + conn.close() + except socket.timeout: + print(f'socket accept timeout ({self.socket.timeout}s)') + except socket.error as e: + print('Running server failed:{}'.format(e)) + raise + if not self.persist: + break + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--port', default=DEF_PORT, type=int, help='TCP server port') + parser.add_argument('--ipv6', action='store_true', help='Create IPv6 server.') + parser.add_argument('--timeout', default=10, type=int, help='socket accept/recv timeout.') + args = parser.parse_args() + + if args.ipv6: + family = socket.AF_INET6 + else: + family = socket.AF_INET + + with TcpServer(args.port, family, persist=True, timeout=args.timeout): + input('Server Running. Press Enter or CTRL-C to exit...\n') + + +if __name__ == '__main__': + main() diff --git a/examples/protocols/sockets/scripts/run_udp_client.py b/examples/protocols/sockets/scripts/run_udp_client.py new file mode 100644 index 0000000000..abb6c30f38 --- /dev/null +++ b/examples/protocols/sockets/scripts/run_udp_client.py @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +import argparse +import os +import socket + +DEF_PORT = 3333 +DEF_MESSAGE = 'Data to ESP' + + +def udp_client(address: str, port: int, payload: str) -> str: + for res in socket.getaddrinfo(address, port, socket.AF_UNSPEC, + socket.SOCK_DGRAM, 0, socket.AI_PASSIVE): + family_addr, _, _, _, addr = res + try: + sock = socket.socket(family_addr, socket.SOCK_DGRAM) + sock.settimeout(20.0) + except socket.error as msg: + print('Could not create socket') + print(os.strerror(msg.errno)) + raise + try: + sock.sendto(payload.encode(), addr) + reply, addr = sock.recvfrom(128) + if not reply: + return '' + print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply)) + except socket.timeout: + print('Socket operation timeout') + return '' + except socket.error as msg: + print('Error while sending or receiving data from the socket') + print(os.strerror(msg.errno)) + sock.close() + raise + return reply.decode() + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('remote_ip', help='UDP server ip address, eg: 192.168.1.1, fe80::2%%eth0') + parser.add_argument('--port', default=DEF_PORT, type=int, help='UDP server port') + parser.add_argument('--message', default=DEF_MESSAGE, help='Message to send to the server.') + args = parser.parse_args() + + print(f'Send message to the server: {args.remote_ip}') + data = udp_client(args.remote_ip, args.port, args.message) + print(f'Received From server: {data}') + + +if __name__ == '__main__': + main() diff --git a/examples/protocols/sockets/scripts/run_udp_server.py b/examples/protocols/sockets/scripts/run_udp_server.py new file mode 100644 index 0000000000..e01b19d136 --- /dev/null +++ b/examples/protocols/sockets/scripts/run_udp_server.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +import argparse +import socket +from threading import Event, Thread + +DEF_PORT = 3333 + + +class UdpServer: + + def __init__(self, port, family_addr, persist=False, timeout=60): # type: ignore + self.port = port + self.family_addr = family_addr + self.socket = socket.socket(family_addr, socket.SOCK_DGRAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.settimeout(timeout) + self.shutdown = Event() + self.persist = persist + self.server_thread = None + + def __enter__(self): # type: ignore + try: + self.socket.bind(('', self.port)) + except socket.error as e: + print('Bind failed:{}'.format(e)) + raise + + print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr)) + self.server_thread = Thread(target=self.run_server) + self.server_thread.start() + return self + + def __exit__(self, exc_type, exc_value, traceback): # type: ignore + if self.persist: + sock = socket.socket(self.family_addr, socket.SOCK_DGRAM) + sock.sendto(b'Stop', ('localhost', self.port)) + sock.close() + self.shutdown.set() + self.server_thread.join() + self.socket.close() + + def run_server(self) -> None: + while not self.shutdown.is_set(): + try: + data, addr = self.socket.recvfrom(1024) + print(addr) + if not data: + return + data = data.decode() + print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data) + reply = 'OK: ' + data + self.socket.sendto(reply.encode(), addr) + except socket.timeout: + print(f'socket recvfrom timeout ({self.socket.timeout}s)') + except socket.error as e: + print('Running server failed:{}'.format(e)) + raise + if not self.persist: + break + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--port', default=DEF_PORT, type=int, help='UDP server port') + parser.add_argument('--ipv6', action='store_true', help='Create IPv6 server.') + parser.add_argument('--timeout', default=10, type=int, help='socket recvfrom timeout.') + args = parser.parse_args() + + if args.ipv6: + family = socket.AF_INET6 + else: + family = socket.AF_INET + + with UdpServer(args.port, family, persist=True, timeout=args.timeout): + input('Server Running. Press Enter or CTRL-C to exit...\n') + + +if __name__ == '__main__': + main() diff --git a/examples/protocols/sockets/tcp_client/README.md b/examples/protocols/sockets/tcp_client/README.md index 0d90afe16c..3e4b571f40 100644 --- a/examples/protocols/sockets/tcp_client/README.md +++ b/examples/protocols/sockets/tcp_client/README.md @@ -41,27 +41,13 @@ In order to create TCP server that communicates with TCP Client example, choose There are many host-side tools which can be used to interact with the UDP/TCP server/client. One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets. -In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples. +Ref to the [upper level README](../README.md#host-tools) for more information. ### TCP server using netcat ``` nc -l 192.168.0.167 3333 ``` -### Python scripts -Script example_test.py could be used as a counter part to the tcp-client project, ip protocol name (IPv4 or IPv6) shall be stated as argument. - -Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported. -Please run the following commands to configure the terminal to execute the script. -``` -export PYTHONPATH="$IDF_PATH/tools:$IDF_PATH/tools/ci/python_packages" -python -m pip install -r $IDF_PATH/tools/ci/python_packages/ttfw_idf/requirements.txt -``` - -Example: -``` -python example_test.py IPv4 -``` ## Hardware Required This example can be run on any commonly available ESP32 development board. diff --git a/examples/protocols/sockets/tcp_client/example_test.py b/examples/protocols/sockets/tcp_client/example_test.py deleted file mode 100644 index 6ab409fda5..0000000000 --- a/examples/protocols/sockets/tcp_client/example_test.py +++ /dev/null @@ -1,129 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- -import os -import re -import socket -import sys -from threading import Event, Thread - -import netifaces -import ttfw_idf -from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip - -# ----------- Config ---------- -PORT = 3333 -# ------------------------------- - - -class TcpServer: - - def __init__(self, port, family_addr, persist=False): - self.port = port - self.socket = socket.socket(family_addr, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.settimeout(60.0) - self.shutdown = Event() - self.persist = persist - self.family_addr = family_addr - - def __enter__(self): - try: - self.socket.bind(('', self.port)) - except socket.error as e: - print('Bind failed:{}'.format(e)) - raise - self.socket.listen(1) - - print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr)) - self.server_thread = Thread(target=self.run_server) - self.server_thread.start() - return self - - def __exit__(self, exc_type, exc_value, traceback): - if self.persist: - sock = socket.socket(self.family_addr, socket.SOCK_STREAM) - sock.connect(('localhost', self.port)) - sock.sendall(b'Stop', ) - sock.close() - self.shutdown.set() - self.shutdown.set() - self.server_thread.join() - self.socket.close() - - def run_server(self): - while not self.shutdown.is_set(): - try: - conn, address = self.socket.accept() # accept new connection - print('Connection from: {}'.format(address)) - conn.setblocking(1) - data = conn.recv(1024) - if not data: - return - data = data.decode() - print('Received data: ' + data) - reply = 'OK: ' + data - conn.send(reply.encode()) - conn.close() - except socket.error as e: - print('Running server failed:{}'.format(e)) - raise - if not self.persist: - break - - -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_tcpclient(env, extra_data): - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_client', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'tcp_client.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('tcp_client_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') - env_name = 'wifi_router' - ap_ssid = get_env_config_variable(env_name, 'ap_ssid') - ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') - - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - - my_interface = get_my_interface_by_dest_ip(ipv4) - # test IPv4 - with TcpServer(PORT, socket.AF_INET): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET) - print('Connect tcp client to server IP={}'.format(server_ip)) - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) - # test IPv6 - with TcpServer(PORT, socket.AF_INET6): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6) - print('Connect tcp client to server IP={}'.format(server_ip)) - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) - - -if __name__ == '__main__': - if sys.argv[1:] and sys.argv[1].startswith('IPv'): # if additional arguments provided: - # Usage: example_test.py - family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET - with TcpServer(PORT, family_addr, persist=True) as s: - print(input('Press Enter stop the server...')) - else: - test_examples_protocol_socket_tcpclient() diff --git a/examples/protocols/sockets/tcp_client/pytest_tcp_client.py b/examples/protocols/sockets/tcp_client/pytest_tcp_client.py new file mode 100644 index 0000000000..8408f5f162 --- /dev/null +++ b/examples/protocols/sockets/tcp_client/pytest_tcp_client.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import socket + +import pytest +from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, + get_my_interface_by_dest_ip) +from pytest_embedded import Dut + +try: + from run_tcp_server import TcpServer +except ImportError: + import os + import sys + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts'))) + from run_tcp_server import TcpServer + + +PORT = 3333 + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_client_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') + + # test IPv4 + with TcpServer(PORT, socket.AF_INET): + server_ip = get_host_ip4_by_dest_ip(ipv4) + print('Connect tcp client to server IP={}'.format(server_ip)) + dut.write(server_ip) + dut.expect('OK: Message from ESP32') + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_client_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) + + # test IPv6 + my_interface = get_my_interface_by_dest_ip(ipv4) + with TcpServer(PORT, socket.AF_INET6): + server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface) + print('Connect tcp client to server IP={}'.format(server_ip)) + dut.write(server_ip) + dut.expect('OK: Message from ESP32') diff --git a/examples/protocols/sockets/tcp_server/README.md b/examples/protocols/sockets/tcp_server/README.md index afeefaf140..8b0312320b 100644 --- a/examples/protocols/sockets/tcp_server/README.md +++ b/examples/protocols/sockets/tcp_server/README.md @@ -16,23 +16,13 @@ There are many host-side tools which can be used to interact with the UDP/TCP se One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets. Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following command. -In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples. +Ref to the [upper level README](../README.md#host-tools) for more information. ### TCP client using netcat ``` nc 192.168.0.167 3333 ``` -### Python scripts -Script example_test.py could be used as a counter part to the tcp-server application, -IP address and the message to be send to the server shall be stated as arguments. Example: - -``` -python example_test.py 192.168.0.167 Message -``` -Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported; -please add `$IDF_PATH/tools/ci/python_packages` to `PYTHONPATH`. - ## Hardware Required This example can be run on any commonly available ESP32 development board. diff --git a/examples/protocols/sockets/tcp_server/example_test.py b/examples/protocols/sockets/tcp_server/example_test.py deleted file mode 100644 index fe83f705df..0000000000 --- a/examples/protocols/sockets/tcp_server/example_test.py +++ /dev/null @@ -1,96 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- - -from __future__ import print_function, unicode_literals - -import os -import re -import socket -import sys - -import ttfw_idf -from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip - -# ----------- Config ---------- -PORT = 3333 -# ------------------------------- - - -def tcp_client(address, payload): - for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC, - socket.SOCK_STREAM, 0, socket.AI_PASSIVE): - family_addr, socktype, proto, canonname, addr = res - try: - sock = socket.socket(family_addr, socket.SOCK_STREAM) - sock.settimeout(60.0) - except socket.error as msg: - print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1]) - raise - try: - sock.connect(addr) - except socket.error as msg: - print('Could not open socket: ', msg) - sock.close() - raise - sock.sendall(payload.encode()) - data = sock.recv(1024) - if not data: - return - print('Reply : ' + data.decode()) - sock.close() - return data.decode() - - -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_tcpserver(env, extra_data): - MESSAGE = 'Data to ESP' - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_server', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'tcp_server.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('tcp_server_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') - env_name = 'wifi_router' - ap_ssid = get_env_config_variable(env_name, 'ap_ssid') - ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') - - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - - interface = get_my_interface_by_dest_ip(ipv4) - # test IPv4 - received = tcp_client(ipv4, MESSAGE) - if not received == MESSAGE: - raise - dut1.expect(MESSAGE) - # test IPv6 - received = tcp_client('{}%{}'.format(ipv6, interface), MESSAGE) - if not received == MESSAGE: - raise - dut1.expect(MESSAGE) - - -if __name__ == '__main__': - if sys.argv[2:]: # if two arguments provided: - # Usage: example_test.py - tcp_client(sys.argv[1], sys.argv[2]) - else: # otherwise run standard example test as in the CI - test_examples_protocol_socket_tcpserver() diff --git a/examples/protocols/sockets/tcp_server/pytest_tcp_server.py b/examples/protocols/sockets/tcp_server/pytest_tcp_server.py new file mode 100644 index 0000000000..35af88a523 --- /dev/null +++ b/examples/protocols/sockets/tcp_server/pytest_tcp_server.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import time + +import pytest +from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip +from pytest_embedded import Dut + +try: + from run_tcp_client import tcp_client +except ImportError: + import os + import sys + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts'))) + from run_tcp_client import tcp_client + + +PORT = 3333 +MESSAGE = 'Data to ESP' + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_server_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') + time.sleep(1) + + # test IPv4 + received = tcp_client(ipv4, PORT, MESSAGE) + if not received == MESSAGE: + raise + dut.expect(MESSAGE) + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_server_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') + time.sleep(1) + + interface = get_my_interface_by_dest_ip(ipv4) + # test IPv6 + received = tcp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE) + if not received == MESSAGE: + raise + dut.expect(MESSAGE) diff --git a/examples/protocols/sockets/udp_client/README.md b/examples/protocols/sockets/udp_client/README.md index 90f0a110d6..87a6eb1a7e 100644 --- a/examples/protocols/sockets/udp_client/README.md +++ b/examples/protocols/sockets/udp_client/README.md @@ -16,7 +16,7 @@ There are many host-side tools which can be used to interact with the UDP/TCP se One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets. Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following commands. -In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples. +Ref to the [upper level README](../README.md#host-tools) for more information. ### Send UDP packet via netcat ``` @@ -33,16 +33,6 @@ echo "Hello from PC" | nc -w1 -u 192.168.0.167 3333 nc -u -l 192.168.0.167 3333 ``` -### Python scripts -Script example_test.py could be used as a counter part to the udp-client application, ip protocol name (IPv4 or IPv6) shall be stated as argument. Example: - -``` -python example_test.py IPv4 -``` -Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported; -please add `$IDF_PATH/tools/ci/python_packages` to `PYTHONPATH`. - - ## Hardware Required This example can be run on any commonly available ESP32 development board. diff --git a/examples/protocols/sockets/udp_client/example_test.py b/examples/protocols/sockets/udp_client/example_test.py deleted file mode 100644 index 5b4593fc15..0000000000 --- a/examples/protocols/sockets/udp_client/example_test.py +++ /dev/null @@ -1,138 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- -import os -import re -import socket -import sys -from threading import Event, Thread - -import netifaces -import ttfw_idf -from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip -from tiny_test_fw.DUT import ExpectTimeout - -# ----------- Config ---------- -PORT = 3333 -# ------------------------------- - - -class UdpServer: - - def __init__(self, port, family_addr, persist=False): - self.port = port - self.family_addr = family_addr - self.socket = socket.socket(family_addr, socket.SOCK_DGRAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.settimeout(60.0) - self.shutdown = Event() - self.persist = persist - - def __enter__(self): - try: - self.socket.bind(('', self.port)) - except socket.error as e: - print('Bind failed:{}'.format(e)) - raise - - print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr)) - self.server_thread = Thread(target=self.run_server) - self.server_thread.start() - return self - - def __exit__(self, exc_type, exc_value, traceback): - if self.persist: - sock = socket.socket(self.family_addr, socket.SOCK_DGRAM) - sock.sendto(b'Stop', ('localhost', self.port)) - sock.close() - self.shutdown.set() - self.server_thread.join() - self.socket.close() - - def run_server(self): - while not self.shutdown.is_set(): - try: - data, addr = self.socket.recvfrom(1024) - print(addr) - if not data: - return - data = data.decode() - print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data) - reply = 'OK: ' + data - self.socket.sendto(reply.encode(), addr) - except socket.error as e: - print('Running server failed:{}'.format(e)) - raise - if not self.persist: - break - - -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_udpclient(env, extra_data): - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('udp_client', 'examples/protocols/sockets/udp_client', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'udp_client.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('udp_client_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') - env_name = 'wifi_router' - ap_ssid = get_env_config_variable(env_name, 'ap_ssid') - ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') - - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - - my_interface = get_my_interface_by_dest_ip(ipv4) - # test IPv4 - with UdpServer(PORT, socket.AF_INET): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET) - print('Connect udp client to server IP={}'.format(server_ip)) - for _ in range(3): - try: - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) - break - except ExpectTimeout: - pass - else: - raise ValueError('Failed to send/recv udp packets.') - # test IPv6 - with UdpServer(PORT, socket.AF_INET6): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6) - print('Connect udp client to server IP={}'.format(server_ip)) - for _ in range(3): - try: - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) - break - except ExpectTimeout: - pass - else: - raise ValueError('Failed to send/recv udp packets.') - - -if __name__ == '__main__': - if sys.argv[1:] and sys.argv[1].startswith('IPv'): # if additional arguments provided: - # Usage: example_test.py - family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET - with UdpServer(PORT, family_addr, persist=True) as s: - print(input('Press Enter stop the server...')) - else: - test_examples_protocol_socket_udpclient() diff --git a/examples/protocols/sockets/udp_client/pytest_udp_client.py b/examples/protocols/sockets/udp_client/pytest_udp_client.py new file mode 100644 index 0000000000..fa6d7e4f59 --- /dev/null +++ b/examples/protocols/sockets/udp_client/pytest_udp_client.py @@ -0,0 +1,85 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import socket + +import pytest +from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, + get_my_interface_by_dest_ip) +from pexpect.exceptions import TIMEOUT +from pytest_embedded import Dut + +try: + from run_udp_server import UdpServer +except ImportError: + import os + import sys + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts'))) + from run_udp_server import UdpServer + + +PORT = 3333 +MAX_RETRIES = 3 + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_client_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') + + # test IPv4 + with UdpServer(PORT, socket.AF_INET): + server_ip = get_host_ip4_by_dest_ip(ipv4) + print('Connect udp client to server IP={}'.format(server_ip)) + for _ in range(MAX_RETRIES): + try: + dut.write(server_ip) + dut.expect('OK: Message from ESP32') + break + except TIMEOUT: + pass + else: + raise ValueError('Failed to send/recv udp packets.') + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_client_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') + + interface = get_my_interface_by_dest_ip(ipv4) + # test IPv6 + with UdpServer(PORT, socket.AF_INET6): + server_ip = get_host_ip6_by_dest_ip(ipv6, interface) + print('Connect udp client to server IP={}'.format(server_ip)) + for _ in range(MAX_RETRIES): + try: + dut.write(server_ip) + dut.expect('OK: Message from ESP32') + break + except TIMEOUT: + pass + else: + raise ValueError('Failed to send/recv udpv6 packets.') diff --git a/examples/protocols/sockets/udp_server/README.md b/examples/protocols/sockets/udp_server/README.md index 03c96b884b..5a868c6f32 100644 --- a/examples/protocols/sockets/udp_server/README.md +++ b/examples/protocols/sockets/udp_server/README.md @@ -17,7 +17,7 @@ One command line tool is [netcat](http://netcat.sourceforge.net) which can send Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following commands. If want to use this RECVINFO function, please enable LWIP_NETBUF_RECVINFO in menuconfig,this function can only resolve the destination address of IPV4. -In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples. +Ref to the [upper level README](../README.md#host-tools) for more information. ### Send UDP packet via netcat ``` @@ -34,16 +34,6 @@ echo "Hello from PC" | nc -w1 -u 192.168.0.167 3333 nc -u 192.168.0.167 3333 ``` -### Python scripts -Script example_test.py could be used as a counter part to the udp-server application, -IP address and the message to be send to the server shall be stated as arguments. Example: - -``` -python example_test.py 192.168.0.167 Message -``` -Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported; -please add `$IDF_PATH/tools/ci/python_packages` to `PYTHONPATH`. - ## Hardware Required This example can be run on any commonly available ESP32 development board. diff --git a/examples/protocols/sockets/udp_server/example_test.py b/examples/protocols/sockets/udp_server/example_test.py deleted file mode 100644 index 9207138a89..0000000000 --- a/examples/protocols/sockets/udp_server/example_test.py +++ /dev/null @@ -1,112 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- - -from __future__ import print_function, unicode_literals - -import os -import re -import socket -import sys - -import ttfw_idf -from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip - -# ----------- Config ---------- -PORT = 3333 -# ------------------------------- - - -def udp_client(address, payload): - for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC, - socket.SOCK_DGRAM, 0, socket.AI_PASSIVE): - family_addr, socktype, proto, canonname, addr = res - try: - sock = socket.socket(family_addr, socket.SOCK_DGRAM) - sock.settimeout(20.0) - except socket.error as msg: - print('Could not create socket') - print(os.strerror(msg.errno)) - raise - try: - sock.sendto(payload.encode(), addr) - reply, addr = sock.recvfrom(128) - if not reply: - return - print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply)) - except socket.timeout: - print('Socket operation timeout') - return str(None) - except socket.error as msg: - print('Error while sending or receiving data from the socket') - print(os.strerror(msg.errno)) - sock.close() - raise - return reply.decode() - - -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_udpserver(env, extra_data): - MESSAGE = 'Data to ESP' - MAX_RETRIES = 3 - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('udp_server', 'examples/protocols/sockets/udp_server', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'udp_server.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('udp_server_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') - env_name = 'wifi_router' - ap_ssid = get_env_config_variable(env_name, 'ap_ssid') - ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') - - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - dut1.expect(re.compile(r'Waiting for data'), timeout=10) - - interface = get_my_interface_by_dest_ip(ipv4) - # test IPv4 - for _ in range(MAX_RETRIES): - print('Testing UDP on IPv4...') - received = udp_client(ipv4, MESSAGE) - if received == MESSAGE: - print('OK') - break - else: - raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) - dut1.expect(MESSAGE) - - # test IPv6 - for _ in range(MAX_RETRIES): - print('Testing UDP on IPv6...') - received = udp_client('{}%{}'.format(ipv6, interface), MESSAGE) - if received == MESSAGE: - print('OK') - break - else: - raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) - dut1.expect(MESSAGE) - - -if __name__ == '__main__': - if sys.argv[2:]: # if two arguments provided: - # Usage: example_test.py - udp_client(sys.argv[1], sys.argv[2]) - else: # otherwise run standard example test as in the CI - test_examples_protocol_socket_udpserver() diff --git a/examples/protocols/sockets/udp_server/pytest_udp_server.py b/examples/protocols/sockets/udp_server/pytest_udp_server.py new file mode 100644 index 0000000000..addea4b883 --- /dev/null +++ b/examples/protocols/sockets/udp_server/pytest_udp_server.py @@ -0,0 +1,77 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging + +import pytest +from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip +from pytest_embedded import Dut + +try: + from run_udp_client import udp_client +except ImportError: + import os + import sys + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts'))) + from run_udp_client import udp_client + + +PORT = 3333 +MESSAGE = 'Data to ESP' +MAX_RETRIES = 3 + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_server_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') + + # test IPv4 + for _ in range(MAX_RETRIES): + print('Testing UDP on IPv4...') + received = udp_client(ipv4, PORT, MESSAGE) + if received == MESSAGE: + print('OK') + break + else: + raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) + dut.expect(MESSAGE) + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_server_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') + + interface = get_my_interface_by_dest_ip(ipv4) + # test IPv6 + for _ in range(MAX_RETRIES): + print('Testing UDP on IPv6...') + received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE) + if received == MESSAGE: + print('OK') + break + else: + raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) + dut.expect(MESSAGE) diff --git a/tools/ci/python_packages/common_test_methods.py b/tools/ci/python_packages/common_test_methods.py index 85ace23373..80392fbc27 100644 --- a/tools/ci/python_packages/common_test_methods.py +++ b/tools/ci/python_packages/common_test_methods.py @@ -4,7 +4,7 @@ import logging import os import socket -from typing import Any +from typing import Any, List import netifaces import yaml @@ -26,10 +26,23 @@ $IDF_PATH/EnvConfig.yml: def get_host_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str: - for _addr in netifaces.ifaddresses(interface_name)[ip_type]: - host_ip = _addr['addr'].replace('%{}'.format(interface_name), '') - assert isinstance(host_ip, str) - return host_ip + if ip_type == netifaces.AF_INET: + for _addr in netifaces.ifaddresses(interface_name)[ip_type]: + host_ip = _addr['addr'].replace('%{}'.format(interface_name), '') + assert isinstance(host_ip, str) + return host_ip + elif ip_type == netifaces.AF_INET6: + ip6_addrs: List[str] = [] + for _addr in netifaces.ifaddresses(interface_name)[ip_type]: + host_ip = _addr['addr'].replace('%{}'.format(interface_name), '') + assert isinstance(host_ip, str) + # prefer to use link local address due to example settings + if host_ip.startswith('FE80::'): + ip6_addrs.insert(0, host_ip) + else: + ip6_addrs.append(host_ip) + if ip6_addrs: + return ip6_addrs[0] return '' @@ -45,6 +58,17 @@ def get_host_ip4_by_dest_ip(dest_ip: str = '') -> str: return host_ip +def get_host_ip6_by_dest_ip(dest_ip: str, interface: str) -> str: + addr_info = socket.getaddrinfo(f'{dest_ip}%{interface}', 80, socket.AF_INET6, socket.SOCK_DGRAM) + s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s1.connect(addr_info[0][-1]) + host_ip = s1.getsockname()[0] + s1.close() + assert isinstance(host_ip, str) + print(f'Using host ip: {host_ip}') + return host_ip + + def get_my_interface_by_dest_ip(dest_ip: str = '') -> str: my_ip = get_host_ip4_by_dest_ip(dest_ip) interfaces = netifaces.interfaces()