diff --git a/examples/openthread/ot_ci_function.py b/examples/openthread/ot_ci_function.py index c84b96593e..0585eb636c 100644 --- a/examples/openthread/ot_ci_function.py +++ b/examples/openthread/ot_ci_function.py @@ -8,6 +8,8 @@ import socket import struct import subprocess import time +from functools import wraps +from typing import Callable from typing import Tuple import netifaces @@ -16,9 +18,28 @@ import yaml from pytest_embedded_idf.dut import IdfDut -class thread_parameter: +def extract_address( + command: str, pattern: str, default_return: str = '' +) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]: + def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]: + @wraps(func) + def wrapper(dut: IdfDut) -> str: + clean_buffer(dut) + execute_command(dut, command) + try: + result = dut.expect(pattern, timeout=5)[1].decode() + except Exception as e: + print(f'Error: {e}') + return default_return + return func(result) - def __init__(self, deviceRole:str='', dataset:str='', channel:str='', exaddr:str='', bbr:bool=False): + return wrapper + + return decorator + + +class thread_parameter: + def __init__(self, deviceRole: str = '', dataset: str = '', channel: str = '', exaddr: str = '', bbr: bool = False): self.deviceRole = deviceRole self.dataset = dataset self.channel = channel @@ -30,31 +51,30 @@ class thread_parameter: self.networkkey = '' self.pskc = '' - def setnetworkname(self, networkname:str) -> None: + def setnetworkname(self, networkname: str) -> None: self.networkname = networkname - def setpanid(self, panid:str) -> None: + def setpanid(self, panid: str) -> None: self.panid = panid - def setextpanid(self, extpanid:str) -> None: + def setextpanid(self, extpanid: str) -> None: self.extpanid = extpanid - def setnetworkkey(self, networkkey:str) -> None: + def setnetworkkey(self, networkkey: str) -> None: self.networkkey = networkkey - def setpskc(self, pskc:str) -> None: + def setpskc(self, pskc: str) -> None: self.pskc = pskc class wifi_parameter: - - def __init__(self, ssid:str='', psk:str='', retry_times:int=10): + def __init__(self, ssid: str = '', psk: str = '', retry_times: int = 10): self.ssid = ssid self.psk = psk self.retry_times = retry_times -def joinThreadNetwork(dut:IdfDut, thread:thread_parameter) -> None: +def joinThreadNetwork(dut: IdfDut, thread: thread_parameter) -> None: if thread.dataset: command = 'dataset set active ' + thread.dataset execute_command(dut, command) @@ -104,7 +124,7 @@ def joinThreadNetwork(dut:IdfDut, thread:thread_parameter) -> None: assert wait_for_join(dut, thread.deviceRole) -def wait_for_join(dut:IdfDut, role:str) -> bool: +def wait_for_join(dut: IdfDut, role: str) -> bool: for _ in range(1, 30): if getDeviceRole(dut) == role: wait(dut, 5) @@ -113,7 +133,7 @@ def wait_for_join(dut:IdfDut, role:str) -> bool: return False -def joinWiFiNetwork(dut:IdfDut, wifi:wifi_parameter) -> Tuple[str, int]: +def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> Tuple[str, int]: clean_buffer(dut) ip_address = '' for order in range(1, wifi.retry_times): @@ -128,7 +148,7 @@ def joinWiFiNetwork(dut:IdfDut, wifi:wifi_parameter) -> Tuple[str, int]: raise Exception(f'{dut} connect wifi {str(wifi.ssid)} with password {str(wifi.psk)} fail') -def getDeviceRole(dut:IdfDut) -> str: +def getDeviceRole(dut: IdfDut) -> str: wait(dut, 1) execute_command(dut, 'state') role = dut.expect(r'\W+(\w+)\W+Done', timeout=5)[1].decode() @@ -136,24 +156,24 @@ def getDeviceRole(dut:IdfDut) -> str: return str(role) -def changeDeviceRole(dut:IdfDut, role:str) -> None: +def changeDeviceRole(dut: IdfDut, role: str) -> None: command = 'state ' + role execute_command(dut, command) -def getDataset(dut:IdfDut) -> str: +def getDataset(dut: IdfDut) -> str: execute_command(dut, 'dataset active -x') dut_data = dut.expect(r'\n(\w+)\r', timeout=5)[1].decode() return str(dut_data) -def init_thread(dut:IdfDut) -> None: +def init_thread(dut: IdfDut) -> None: dut.expect('>', timeout=10) wait(dut, 3) reset_thread(dut) -def reset_thread(dut:IdfDut) -> None: +def reset_thread(dut: IdfDut) -> None: execute_command(dut, 'factoryreset') dut.expect('OpenThread attached to netif', timeout=20) dut.expect('>', timeout=10) @@ -162,7 +182,7 @@ def reset_thread(dut:IdfDut) -> None: # get the mleid address of the thread -def get_mleid_addr(dut:IdfDut) -> str: +def get_mleid_addr(dut: IdfDut) -> str: dut_adress = '' execute_command(dut, 'ipaddr mleid') dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() @@ -170,7 +190,7 @@ def get_mleid_addr(dut:IdfDut) -> str: # get the rloc address of the thread -def get_rloc_addr(dut:IdfDut) -> str: +def get_rloc_addr(dut: IdfDut) -> str: dut_adress = '' execute_command(dut, 'ipaddr rloc') dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() @@ -178,7 +198,7 @@ def get_rloc_addr(dut:IdfDut) -> str: # get the linklocal address of the thread -def get_linklocal_addr(dut:IdfDut) -> str: +def get_linklocal_addr(dut: IdfDut) -> str: dut_adress = '' execute_command(dut, 'ipaddr linklocal') dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() @@ -186,7 +206,7 @@ def get_linklocal_addr(dut:IdfDut) -> str: # get the global unicast address of the thread: -def get_global_unicast_addr(dut:IdfDut, br:IdfDut) -> str: +def get_global_unicast_addr(dut: IdfDut, br: IdfDut) -> str: dut_adress = '' clean_buffer(br) omrprefix = get_omrprefix(br) @@ -195,17 +215,36 @@ def get_global_unicast_addr(dut:IdfDut, br:IdfDut) -> str: return dut_adress +@extract_address('rloc16', r'(\w{4})') +def get_rloc16_addr(rloc16: str) -> str: + return rloc16 + + # ping of thread -def ot_ping(dut:IdfDut, target:str, times:int) -> Tuple[int, int]: - command = 'ping ' + str(target) + ' 0 ' + str(times) +def ot_ping( + dut: IdfDut, target: str, timeout: int = 5, count: int = 1, size: int = 56, interval: int = 1, hoplimit: int = 64 +) -> Tuple[int, int]: + command = f'ping {str(target)} {size} {count} {interval} {hoplimit} {str(timeout)}' execute_command(dut, command) - transmitted = dut.expect(r'(\d+) packets transmitted', timeout=30)[1].decode() + transmitted = dut.expect(r'(\d+) packets transmitted', timeout=60)[1].decode() tx_count = int(transmitted) - received = dut.expect(r'(\d+) packets received', timeout=30)[1].decode() + received = dut.expect(r'(\d+) packets received', timeout=60)[1].decode() rx_count = int(received) return tx_count, rx_count +def ping_and_check(dut: IdfDut, target: str, tx_total: int = 10, timeout: int = 6, pass_rate: float = 0.8) -> None: + tx_count = 0 + rx_count = 0 + for _ in range(tx_total): + tx, rx = ot_ping(dut, target, timeout=timeout, count=1, size=10, interval=6) + tx_count += tx + rx_count += rx + + assert tx_count == tx_total + assert rx_count > tx_total * pass_rate + + def reset_host_interface() -> None: interface_name = get_host_interface_name() flag = False @@ -242,13 +281,19 @@ def init_interface_ipv6_address() -> None: interface_name = get_host_interface_name() flag = False try: - command = 'ip -6 route | grep ' + interface_name + " | grep ra | awk {'print $1'} | xargs -I {} ip -6 route del {}" + command = ( + 'ip -6 route | grep ' + interface_name + " | grep ra | awk {'print $1'} | xargs -I {} ip -6 route del {}" + ) subprocess.call(command, shell=True, timeout=5) time.sleep(0.5) subprocess.call(command, shell=True, timeout=5) time.sleep(1) - command = 'ip -6 address show dev ' + interface_name + \ - " scope global | grep 'inet6' | awk {'print $2'} | xargs -I {} ip -6 addr del {} dev " + interface_name + command = ( + 'ip -6 address show dev ' + + interface_name + + " scope global | grep 'inet6' | awk {'print $2'} | xargs -I {} ip -6 addr del {} dev " + + interface_name + ) subprocess.call(command, shell=True, timeout=5) time.sleep(1) flag = True @@ -284,12 +329,12 @@ def get_host_interface_name() -> str: raise Exception('Warning: No valid network interface detected. Please check your configuration.') -def clean_buffer(dut:IdfDut) -> None: +def clean_buffer(dut: IdfDut) -> None: str_length = str(len(dut.expect(pexpect.TIMEOUT, timeout=0.1))) dut.expect(r'[\s\S]{%s}' % str(str_length), timeout=10) -def check_if_host_receive_ra(br:IdfDut) -> bool: +def check_if_host_receive_ra(br: IdfDut) -> bool: interface_name = get_host_interface_name() clean_buffer(br) omrprefix = get_omrprefix(br) @@ -306,21 +351,21 @@ def host_connect_wifi() -> None: time.sleep(5) -def is_joined_wifi_network(br:IdfDut) -> bool: +def is_joined_wifi_network(br: IdfDut) -> bool: return check_if_host_receive_ra(br) thread_ipv6_group = 'ff04:0:0:0:0:0:0:125' -def check_ipmaddr(dut:IdfDut) -> bool: +def check_ipmaddr(dut: IdfDut) -> bool: info = get_ouput_string(dut, 'ipmaddr', 2) if thread_ipv6_group in str(info): return True return False -def thread_is_joined_group(dut:IdfDut) -> bool: +def thread_is_joined_group(dut: IdfDut) -> bool: command = 'mcast join ' + thread_ipv6_group execute_command(dut, command) dut.expect('Done', timeout=5) @@ -335,8 +380,16 @@ def thread_is_joined_group(dut:IdfDut) -> bool: class udp_parameter: - - def __init__(self, udp_type:str='', addr:str='::', port:int=5090, group:str='', init_flag:bool=False, timeout:float=15.0, udp_bytes:bytes=b''): + def __init__( + self, + udp_type: str = '', + addr: str = '::', + port: int = 5090, + group: str = '', + init_flag: bool = False, + timeout: float = 15.0, + udp_bytes: bytes = b'', + ): self.udp_type = udp_type self.addr = addr self.port = port @@ -346,7 +399,7 @@ class udp_parameter: self.udp_bytes = udp_bytes -def create_host_udp_server(myudp:udp_parameter) -> None: +def create_host_udp_server(myudp: udp_parameter) -> None: interface_name = get_host_interface_name() try: if myudp.udp_type == 'INET6': @@ -360,9 +413,10 @@ def create_host_udp_server(myudp:udp_parameter) -> None: if myudp.udp_type == 'INET6' and myudp.group != '': sock.setsockopt( - socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, - struct.pack('16si', socket.inet_pton(socket.AF_INET6, myudp.group), - if_index)) + socket.IPPROTO_IPV6, + socket.IPV6_JOIN_GROUP, + struct.pack('16si', socket.inet_pton(socket.AF_INET6, myudp.group), if_index), + ) sock.settimeout(myudp.timeout) myudp.init_flag = True print('The host start to receive message!') @@ -375,7 +429,7 @@ def create_host_udp_server(myudp:udp_parameter) -> None: sock.close() -def host_udp_send_message(udp_target:udp_parameter) -> None: +def host_udp_send_message(udp_target: udp_parameter) -> None: interface_name = get_host_interface_name() try: if udp_target.udp_type == 'INET6': @@ -394,7 +448,7 @@ def host_udp_send_message(udp_target:udp_parameter) -> None: sock.close() -def wait(dut:IdfDut, wait_time:float) -> None: +def wait(dut: IdfDut, wait_time: float) -> None: dut.expect(pexpect.TIMEOUT, timeout=wait_time) @@ -497,8 +551,16 @@ def flush_ipv6_addr_by_interface() -> None: class tcp_parameter: - - def __init__(self, tcp_type:str='', addr:str='::', port:int=12345, listen_flag:bool=False, recv_flag:bool=False, timeout:float=15.0, tcp_bytes:bytes=b''): + def __init__( + self, + tcp_type: str = '', + addr: str = '::', + port: int = 12345, + listen_flag: bool = False, + recv_flag: bool = False, + timeout: float = 15.0, + tcp_bytes: bytes = b'', + ): self.tcp_type = tcp_type self.addr = addr self.port = port @@ -508,7 +570,7 @@ class tcp_parameter: self.tcp_bytes = tcp_bytes -def create_host_tcp_server(mytcp:tcp_parameter) -> None: +def create_host_tcp_server(mytcp: tcp_parameter) -> None: try: if mytcp.tcp_type == 'INET6': AF_INET = socket.AF_INET6 @@ -522,8 +584,8 @@ def create_host_tcp_server(mytcp:tcp_parameter) -> None: print('The tcp server is waiting for connection!') sock.settimeout(mytcp.timeout) - connfd,addr = sock.accept() - print('The tcp server connected with ',addr) + connfd, addr = sock.accept() + print('The tcp server connected with ', addr) mytcp.recv_flag = True mytcp.tcp_bytes = connfd.recv(1024) @@ -539,7 +601,7 @@ def create_host_tcp_server(mytcp:tcp_parameter) -> None: sock.close() -def get_ipv6_from_ipv4(ipv4_address:str, br:IdfDut) -> str: +def get_ipv6_from_ipv4(ipv4_address: str, br: IdfDut) -> str: clean_buffer(br) nat64prefix = get_nat64prefix(br) ipv4_find = re.findall(r'\d+', ipv4_address) @@ -549,36 +611,36 @@ def get_ipv6_from_ipv4(ipv4_address:str, br:IdfDut) -> str: return str(ipv6_get_from_ipv4) -def decimal_to_hex(decimal_str:str) -> str: +def decimal_to_hex(decimal_str: str) -> str: decimal_int = int(decimal_str) hex_str = hex(decimal_int)[2:] return hex_str -def get_omrprefix(br:IdfDut) -> str: +def get_omrprefix(br: IdfDut) -> str: execute_command(br, 'br omrprefix') omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() return str(omrprefix) -def get_onlinkprefix(br:IdfDut) -> str: +def get_onlinkprefix(br: IdfDut) -> str: execute_command(br, 'br onlinkprefix') onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() return str(onlinkprefix) -def get_nat64prefix(br:IdfDut) -> str: +def get_nat64prefix(br: IdfDut) -> str: execute_command(br, 'br nat64prefix') nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode() return str(nat64prefix) -def execute_command(dut:IdfDut, command:str) -> None: +def execute_command(dut: IdfDut, command: str) -> None: clean_buffer(dut) dut.write(command) -def get_ouput_string(dut:IdfDut, command:str, wait_time:int) -> str: +def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str: execute_command(dut, command) tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time) clean_buffer(dut) diff --git a/examples/openthread/ot_sleepy_device/light_sleep/sdkconfig.ci.ssed b/examples/openthread/ot_sleepy_device/light_sleep/sdkconfig.ci.ssed new file mode 100644 index 0000000000..3b8fa8eee5 --- /dev/null +++ b/examples/openthread/ot_sleepy_device/light_sleep/sdkconfig.ci.ssed @@ -0,0 +1,9 @@ +CONFIG_OPENTHREAD_NETWORK_CHANNEL=12 +CONFIG_OPENTHREAD_NETWORK_MASTERKEY="aabbccddeeff00112233445566778899" +CONFIG_ESP_SLEEP_DEBUG=y +CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y +CONFIG_ESP_SLEEP_CACHE_SAFE_ASSERTION=y +CONFIG_OPENTHREAD_CSL_ENABLE=y +CONFIG_OPENTHREAD_CSL_ACCURACY=50 +CONFIG_OPENTHREAD_CSL_UNCERTAIN=0 +CONFIG_RTC_CLK_SRC_EXT_CRYS=y diff --git a/examples/openthread/pytest_otbr.py b/examples/openthread/pytest_otbr.py index 9c0e8cf017..b528577281 100644 --- a/examples/openthread/pytest_otbr.py +++ b/examples/openthread/pytest_otbr.py @@ -3,6 +3,7 @@ # !/usr/bin/env python3 import copy import os.path +import random import re import secrets import subprocess @@ -18,7 +19,8 @@ from pytest_embedded_idf.dut import IdfDut # This file contains the test scripts for Thread: # Case 1: Thread network formation and attaching -# A Thread Border Router forms a Thread network, Thread devices attach to it, then test ping connection between them. +# A Thread Border Router forms a Thread network, Thread devices attach to it, then test ping +# connection between them. # Case 2: Bidirectional IPv6 connectivity # Test IPv6 ping connection between Thread device and Linux Host (via Thread Border Router). @@ -51,13 +53,16 @@ from pytest_embedded_idf.dut import IdfDut # Test the basic startup and network formation of a Thread device. # Case 12: Curl a website via DNS and NAT64 -# A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl a website. +# A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl +# a website. # Case 13: Meshcop discovery of Border Router -# A border router joins a Wi-Fi network, forms a Thread network and publish a meshcop service. Linux Host device discover the mescop service. +# A border router joins a Wi-Fi network, forms a Thread network and publish a meshcop service. Linux Host device +# discover the mescop service. # Case 14: Curl a website over HTTPS via DNS and NAT64 -# A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl a https website. +# A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl +# a https website. # Case 15: Thread network formation and attaching with TREL # A TREL device forms a Thread network, other TREL devices attach to it, then test ping connection between them. @@ -65,6 +70,9 @@ from pytest_embedded_idf.dut import IdfDut # Case 16: Thread network BR lib check # Check BR library compatibility +# Case 17: Synchronized sleepy end device (SSED) test +# Start a Thread ssed device, wait it join the Thread network and check related flags. + @pytest.fixture(scope='module', name='Init_avahi') def fixture_Init_avahi() -> bool: @@ -151,9 +159,9 @@ def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut]) -> None: for cli in cli_list: cli_mleid_addr = ocf.get_mleid_addr(cli) br_mleid_addr = ocf.get_mleid_addr(br) - rx_nums = ocf.ot_ping(cli, br_mleid_addr, 5)[1] + rx_nums = ocf.ot_ping(cli, br_mleid_addr, count=5)[1] assert rx_nums == 5 - rx_nums = ocf.ot_ping(br, cli_mleid_addr, 5)[1] + rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1] assert rx_nums == 5 finally: ocf.execute_command(br, 'factoryreset') @@ -226,7 +234,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, host_global_unicast_addr = re.findall(r'\W+(%s(?:\w+:){3}\w+)\W+' % onlinkprefix, str(out_str)) rx_nums = 0 for ip_addr in host_global_unicast_addr: - txrx_nums = ocf.ot_ping(cli, str(ip_addr), 5) + txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5) rx_nums = rx_nums + int(txrx_nums[1]) assert rx_nums != 0 finally: @@ -504,7 +512,7 @@ def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> assert ocf.is_joined_wifi_network(br) host_ipv4_address = ocf.get_host_ipv4_address() print('host_ipv4_address: ', host_ipv4_address) - rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), 5)[1] + rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1] assert rx_nums != 0 finally: ocf.execute_command(br, 'factoryreset') @@ -962,3 +970,75 @@ def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None: finally: ocf.execute_command(br, 'factoryreset') time.sleep(3) + + +# Case 17: SSED test +@pytest.mark.openthread_sleep +@pytest.mark.parametrize( + 'config, count, app_path, target, port', + [ + pytest.param( + 'cli|ssed', + 2, + f'{os.path.join(os.path.dirname(__file__), "ot_cli")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_sleepy_device/light_sleep")}', + 'esp32h2|esp32c6', + f'{ESPPORT1}|{ESPPORT3}', + id='h2-c6', + ), + pytest.param( + 'cli|ssed', + 2, + f'{os.path.join(os.path.dirname(__file__), "ot_cli")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_sleepy_device/light_sleep")}', + 'esp32c6|esp32h2', + f'{ESPPORT3}|{ESPPORT1}', + id='c6-h2', + ), + ], + indirect=True, +) +def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None: + leader = dut[0] + ssed_device = dut[1] + try: + # CI device must have external XTAL to run SSED case, we will check this here first + ssed_device.expect('32k XTAL in use', timeout=10) + ocf.init_thread(leader) + time.sleep(3) + leader_para = ocf.thread_parameter('leader', '', '12', '7766554433221100', False) + ocf.joinThreadNetwork(leader, leader_para) + ocf.wait(leader, 5) + ocf.execute_command(leader, 'networkkey') + dataset = ocf.getDataset(leader) + ocf.execute_command(ssed_device, 'dataset set active ' + dataset) + ssed_device.expect('Done', timeout=5) + ocf.execute_command(ssed_device, 'mode -') + ssed_device.expect('Done', timeout=5) + ocf.execute_command(ssed_device, 'csl period 3000000') + ssed_device.expect('Done', timeout=5) + ocf.execute_command(ssed_device, 'csl channel 12') + ssed_device.expect('Done', timeout=5) + ocf.execute_command(ssed_device, 'ifconfig up') + ssed_device.expect('Done', timeout=5) + ocf.execute_command(ssed_device, 'thread start') + ssed_device.expect(r'(.+)detached -> child', timeout=20) + # add a sleep to wait ssed ready + time.sleep(3) + ssed_device.expect('PMU_SLEEP_PD_TOP: True', timeout=5) + ssed_device.expect('PMU_SLEEP_PD_MODEM: True', timeout=5) + + ocf.execute_command(leader, 'child table') + pattern = r'\|\s+\d+\s+\|\s+(0x\w{4})\s+\|.*\|\s+(\w{16})\s+\|' + result = leader.expect(pattern) + + rloc16_decode_from_leader = result[1].decode()[2:] + cli_rloc_addr = ':'.join(ocf.get_rloc_addr(leader).split(':')[:-1]) + ssed_address = f'{cli_rloc_addr}:{rloc16_decode_from_leader}' + + ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6) + time.sleep(random.randint(5, 20)) + ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6) + finally: + ocf.execute_command(leader, 'factoryreset') + time.sleep(3)