feat(openthread): add openthread ci ssed case

This commit is contained in:
Tan Yan Quan
2025-04-09 19:45:04 +08:00
parent 2685cc10e0
commit fd0df9d04a
3 changed files with 212 additions and 61 deletions

View File

@ -8,6 +8,8 @@ import socket
import struct import struct
import subprocess import subprocess
import time import time
from functools import wraps
from typing import Callable
from typing import Tuple from typing import Tuple
import netifaces import netifaces
@ -16,9 +18,28 @@ import yaml
from pytest_embedded_idf.dut import IdfDut from pytest_embedded_idf.dut import IdfDut
class thread_parameter: def extract_address(
command: str, pattern: str, default_return: str = ''
) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]:
def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]:
@wraps(func)
def wrapper(dut: IdfDut) -> str:
clean_buffer(dut)
execute_command(dut, command)
try:
result = dut.expect(pattern, timeout=5)[1].decode()
except Exception as e:
print(f'Error: {e}')
return default_return
return func(result)
def __init__(self, deviceRole:str='', dataset:str='', channel:str='', exaddr:str='', bbr:bool=False): return wrapper
return decorator
class thread_parameter:
def __init__(self, deviceRole: str = '', dataset: str = '', channel: str = '', exaddr: str = '', bbr: bool = False):
self.deviceRole = deviceRole self.deviceRole = deviceRole
self.dataset = dataset self.dataset = dataset
self.channel = channel self.channel = channel
@ -30,31 +51,30 @@ class thread_parameter:
self.networkkey = '' self.networkkey = ''
self.pskc = '' self.pskc = ''
def setnetworkname(self, networkname:str) -> None: def setnetworkname(self, networkname: str) -> None:
self.networkname = networkname self.networkname = networkname
def setpanid(self, panid:str) -> None: def setpanid(self, panid: str) -> None:
self.panid = panid self.panid = panid
def setextpanid(self, extpanid:str) -> None: def setextpanid(self, extpanid: str) -> None:
self.extpanid = extpanid self.extpanid = extpanid
def setnetworkkey(self, networkkey:str) -> None: def setnetworkkey(self, networkkey: str) -> None:
self.networkkey = networkkey self.networkkey = networkkey
def setpskc(self, pskc:str) -> None: def setpskc(self, pskc: str) -> None:
self.pskc = pskc self.pskc = pskc
class wifi_parameter: class wifi_parameter:
def __init__(self, ssid: str = '', psk: str = '', retry_times: int = 10):
def __init__(self, ssid:str='', psk:str='', retry_times:int=10):
self.ssid = ssid self.ssid = ssid
self.psk = psk self.psk = psk
self.retry_times = retry_times self.retry_times = retry_times
def joinThreadNetwork(dut:IdfDut, thread:thread_parameter) -> None: def joinThreadNetwork(dut: IdfDut, thread: thread_parameter) -> None:
if thread.dataset: if thread.dataset:
command = 'dataset set active ' + thread.dataset command = 'dataset set active ' + thread.dataset
execute_command(dut, command) execute_command(dut, command)
@ -104,7 +124,7 @@ def joinThreadNetwork(dut:IdfDut, thread:thread_parameter) -> None:
assert wait_for_join(dut, thread.deviceRole) assert wait_for_join(dut, thread.deviceRole)
def wait_for_join(dut:IdfDut, role:str) -> bool: def wait_for_join(dut: IdfDut, role: str) -> bool:
for _ in range(1, 30): for _ in range(1, 30):
if getDeviceRole(dut) == role: if getDeviceRole(dut) == role:
wait(dut, 5) wait(dut, 5)
@ -113,7 +133,7 @@ def wait_for_join(dut:IdfDut, role:str) -> bool:
return False return False
def joinWiFiNetwork(dut:IdfDut, wifi:wifi_parameter) -> Tuple[str, int]: def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> Tuple[str, int]:
clean_buffer(dut) clean_buffer(dut)
ip_address = '' ip_address = ''
for order in range(1, wifi.retry_times): for order in range(1, wifi.retry_times):
@ -128,7 +148,7 @@ def joinWiFiNetwork(dut:IdfDut, wifi:wifi_parameter) -> Tuple[str, int]:
raise Exception(f'{dut} connect wifi {str(wifi.ssid)} with password {str(wifi.psk)} fail') raise Exception(f'{dut} connect wifi {str(wifi.ssid)} with password {str(wifi.psk)} fail')
def getDeviceRole(dut:IdfDut) -> str: def getDeviceRole(dut: IdfDut) -> str:
wait(dut, 1) wait(dut, 1)
execute_command(dut, 'state') execute_command(dut, 'state')
role = dut.expect(r'\W+(\w+)\W+Done', timeout=5)[1].decode() role = dut.expect(r'\W+(\w+)\W+Done', timeout=5)[1].decode()
@ -136,24 +156,24 @@ def getDeviceRole(dut:IdfDut) -> str:
return str(role) return str(role)
def changeDeviceRole(dut:IdfDut, role:str) -> None: def changeDeviceRole(dut: IdfDut, role: str) -> None:
command = 'state ' + role command = 'state ' + role
execute_command(dut, command) execute_command(dut, command)
def getDataset(dut:IdfDut) -> str: def getDataset(dut: IdfDut) -> str:
execute_command(dut, 'dataset active -x') execute_command(dut, 'dataset active -x')
dut_data = dut.expect(r'\n(\w+)\r', timeout=5)[1].decode() dut_data = dut.expect(r'\n(\w+)\r', timeout=5)[1].decode()
return str(dut_data) return str(dut_data)
def init_thread(dut:IdfDut) -> None: def init_thread(dut: IdfDut) -> None:
dut.expect('>', timeout=10) dut.expect('>', timeout=10)
wait(dut, 3) wait(dut, 3)
reset_thread(dut) reset_thread(dut)
def reset_thread(dut:IdfDut) -> None: def reset_thread(dut: IdfDut) -> None:
execute_command(dut, 'factoryreset') execute_command(dut, 'factoryreset')
dut.expect('OpenThread attached to netif', timeout=20) dut.expect('OpenThread attached to netif', timeout=20)
dut.expect('>', timeout=10) dut.expect('>', timeout=10)
@ -162,7 +182,7 @@ def reset_thread(dut:IdfDut) -> None:
# get the mleid address of the thread # get the mleid address of the thread
def get_mleid_addr(dut:IdfDut) -> str: def get_mleid_addr(dut: IdfDut) -> str:
dut_adress = '' dut_adress = ''
execute_command(dut, 'ipaddr mleid') execute_command(dut, 'ipaddr mleid')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
@ -170,7 +190,7 @@ def get_mleid_addr(dut:IdfDut) -> str:
# get the rloc address of the thread # get the rloc address of the thread
def get_rloc_addr(dut:IdfDut) -> str: def get_rloc_addr(dut: IdfDut) -> str:
dut_adress = '' dut_adress = ''
execute_command(dut, 'ipaddr rloc') execute_command(dut, 'ipaddr rloc')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
@ -178,7 +198,7 @@ def get_rloc_addr(dut:IdfDut) -> str:
# get the linklocal address of the thread # get the linklocal address of the thread
def get_linklocal_addr(dut:IdfDut) -> str: def get_linklocal_addr(dut: IdfDut) -> str:
dut_adress = '' dut_adress = ''
execute_command(dut, 'ipaddr linklocal') execute_command(dut, 'ipaddr linklocal')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
@ -186,7 +206,7 @@ def get_linklocal_addr(dut:IdfDut) -> str:
# get the global unicast address of the thread: # get the global unicast address of the thread:
def get_global_unicast_addr(dut:IdfDut, br:IdfDut) -> str: def get_global_unicast_addr(dut: IdfDut, br: IdfDut) -> str:
dut_adress = '' dut_adress = ''
clean_buffer(br) clean_buffer(br)
omrprefix = get_omrprefix(br) omrprefix = get_omrprefix(br)
@ -195,17 +215,36 @@ def get_global_unicast_addr(dut:IdfDut, br:IdfDut) -> str:
return dut_adress return dut_adress
@extract_address('rloc16', r'(\w{4})')
def get_rloc16_addr(rloc16: str) -> str:
return rloc16
# ping of thread # ping of thread
def ot_ping(dut:IdfDut, target:str, times:int) -> Tuple[int, int]: def ot_ping(
command = 'ping ' + str(target) + ' 0 ' + str(times) dut: IdfDut, target: str, timeout: int = 5, count: int = 1, size: int = 56, interval: int = 1, hoplimit: int = 64
) -> Tuple[int, int]:
command = f'ping {str(target)} {size} {count} {interval} {hoplimit} {str(timeout)}'
execute_command(dut, command) execute_command(dut, command)
transmitted = dut.expect(r'(\d+) packets transmitted', timeout=30)[1].decode() transmitted = dut.expect(r'(\d+) packets transmitted', timeout=60)[1].decode()
tx_count = int(transmitted) tx_count = int(transmitted)
received = dut.expect(r'(\d+) packets received', timeout=30)[1].decode() received = dut.expect(r'(\d+) packets received', timeout=60)[1].decode()
rx_count = int(received) rx_count = int(received)
return tx_count, rx_count return tx_count, rx_count
def ping_and_check(dut: IdfDut, target: str, tx_total: int = 10, timeout: int = 6, pass_rate: float = 0.8) -> None:
tx_count = 0
rx_count = 0
for _ in range(tx_total):
tx, rx = ot_ping(dut, target, timeout=timeout, count=1, size=10, interval=6)
tx_count += tx
rx_count += rx
assert tx_count == tx_total
assert rx_count > tx_total * pass_rate
def reset_host_interface() -> None: def reset_host_interface() -> None:
interface_name = get_host_interface_name() interface_name = get_host_interface_name()
flag = False flag = False
@ -242,13 +281,19 @@ def init_interface_ipv6_address() -> None:
interface_name = get_host_interface_name() interface_name = get_host_interface_name()
flag = False flag = False
try: try:
command = 'ip -6 route | grep ' + interface_name + " | grep ra | awk {'print $1'} | xargs -I {} ip -6 route del {}" command = (
'ip -6 route | grep ' + interface_name + " | grep ra | awk {'print $1'} | xargs -I {} ip -6 route del {}"
)
subprocess.call(command, shell=True, timeout=5) subprocess.call(command, shell=True, timeout=5)
time.sleep(0.5) time.sleep(0.5)
subprocess.call(command, shell=True, timeout=5) subprocess.call(command, shell=True, timeout=5)
time.sleep(1) time.sleep(1)
command = 'ip -6 address show dev ' + interface_name + \ command = (
" scope global | grep 'inet6' | awk {'print $2'} | xargs -I {} ip -6 addr del {} dev " + interface_name 'ip -6 address show dev '
+ interface_name
+ " scope global | grep 'inet6' | awk {'print $2'} | xargs -I {} ip -6 addr del {} dev "
+ interface_name
)
subprocess.call(command, shell=True, timeout=5) subprocess.call(command, shell=True, timeout=5)
time.sleep(1) time.sleep(1)
flag = True flag = True
@ -284,12 +329,12 @@ def get_host_interface_name() -> str:
raise Exception('Warning: No valid network interface detected. Please check your configuration.') raise Exception('Warning: No valid network interface detected. Please check your configuration.')
def clean_buffer(dut:IdfDut) -> None: def clean_buffer(dut: IdfDut) -> None:
str_length = str(len(dut.expect(pexpect.TIMEOUT, timeout=0.1))) str_length = str(len(dut.expect(pexpect.TIMEOUT, timeout=0.1)))
dut.expect(r'[\s\S]{%s}' % str(str_length), timeout=10) dut.expect(r'[\s\S]{%s}' % str(str_length), timeout=10)
def check_if_host_receive_ra(br:IdfDut) -> bool: def check_if_host_receive_ra(br: IdfDut) -> bool:
interface_name = get_host_interface_name() interface_name = get_host_interface_name()
clean_buffer(br) clean_buffer(br)
omrprefix = get_omrprefix(br) omrprefix = get_omrprefix(br)
@ -306,21 +351,21 @@ def host_connect_wifi() -> None:
time.sleep(5) time.sleep(5)
def is_joined_wifi_network(br:IdfDut) -> bool: def is_joined_wifi_network(br: IdfDut) -> bool:
return check_if_host_receive_ra(br) return check_if_host_receive_ra(br)
thread_ipv6_group = 'ff04:0:0:0:0:0:0:125' thread_ipv6_group = 'ff04:0:0:0:0:0:0:125'
def check_ipmaddr(dut:IdfDut) -> bool: def check_ipmaddr(dut: IdfDut) -> bool:
info = get_ouput_string(dut, 'ipmaddr', 2) info = get_ouput_string(dut, 'ipmaddr', 2)
if thread_ipv6_group in str(info): if thread_ipv6_group in str(info):
return True return True
return False return False
def thread_is_joined_group(dut:IdfDut) -> bool: def thread_is_joined_group(dut: IdfDut) -> bool:
command = 'mcast join ' + thread_ipv6_group command = 'mcast join ' + thread_ipv6_group
execute_command(dut, command) execute_command(dut, command)
dut.expect('Done', timeout=5) dut.expect('Done', timeout=5)
@ -335,8 +380,16 @@ def thread_is_joined_group(dut:IdfDut) -> bool:
class udp_parameter: class udp_parameter:
def __init__(
def __init__(self, udp_type:str='', addr:str='::', port:int=5090, group:str='', init_flag:bool=False, timeout:float=15.0, udp_bytes:bytes=b''): self,
udp_type: str = '',
addr: str = '::',
port: int = 5090,
group: str = '',
init_flag: bool = False,
timeout: float = 15.0,
udp_bytes: bytes = b'',
):
self.udp_type = udp_type self.udp_type = udp_type
self.addr = addr self.addr = addr
self.port = port self.port = port
@ -346,7 +399,7 @@ class udp_parameter:
self.udp_bytes = udp_bytes self.udp_bytes = udp_bytes
def create_host_udp_server(myudp:udp_parameter) -> None: def create_host_udp_server(myudp: udp_parameter) -> None:
interface_name = get_host_interface_name() interface_name = get_host_interface_name()
try: try:
if myudp.udp_type == 'INET6': if myudp.udp_type == 'INET6':
@ -360,9 +413,10 @@ def create_host_udp_server(myudp:udp_parameter) -> None:
if myudp.udp_type == 'INET6' and myudp.group != '': if myudp.udp_type == 'INET6' and myudp.group != '':
sock.setsockopt( sock.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, socket.IPPROTO_IPV6,
struct.pack('16si', socket.inet_pton(socket.AF_INET6, myudp.group), socket.IPV6_JOIN_GROUP,
if_index)) struct.pack('16si', socket.inet_pton(socket.AF_INET6, myudp.group), if_index),
)
sock.settimeout(myudp.timeout) sock.settimeout(myudp.timeout)
myudp.init_flag = True myudp.init_flag = True
print('The host start to receive message!') print('The host start to receive message!')
@ -375,7 +429,7 @@ def create_host_udp_server(myudp:udp_parameter) -> None:
sock.close() sock.close()
def host_udp_send_message(udp_target:udp_parameter) -> None: def host_udp_send_message(udp_target: udp_parameter) -> None:
interface_name = get_host_interface_name() interface_name = get_host_interface_name()
try: try:
if udp_target.udp_type == 'INET6': if udp_target.udp_type == 'INET6':
@ -394,7 +448,7 @@ def host_udp_send_message(udp_target:udp_parameter) -> None:
sock.close() sock.close()
def wait(dut:IdfDut, wait_time:float) -> None: def wait(dut: IdfDut, wait_time: float) -> None:
dut.expect(pexpect.TIMEOUT, timeout=wait_time) dut.expect(pexpect.TIMEOUT, timeout=wait_time)
@ -497,8 +551,16 @@ def flush_ipv6_addr_by_interface() -> None:
class tcp_parameter: class tcp_parameter:
def __init__(
def __init__(self, tcp_type:str='', addr:str='::', port:int=12345, listen_flag:bool=False, recv_flag:bool=False, timeout:float=15.0, tcp_bytes:bytes=b''): self,
tcp_type: str = '',
addr: str = '::',
port: int = 12345,
listen_flag: bool = False,
recv_flag: bool = False,
timeout: float = 15.0,
tcp_bytes: bytes = b'',
):
self.tcp_type = tcp_type self.tcp_type = tcp_type
self.addr = addr self.addr = addr
self.port = port self.port = port
@ -508,7 +570,7 @@ class tcp_parameter:
self.tcp_bytes = tcp_bytes self.tcp_bytes = tcp_bytes
def create_host_tcp_server(mytcp:tcp_parameter) -> None: def create_host_tcp_server(mytcp: tcp_parameter) -> None:
try: try:
if mytcp.tcp_type == 'INET6': if mytcp.tcp_type == 'INET6':
AF_INET = socket.AF_INET6 AF_INET = socket.AF_INET6
@ -522,8 +584,8 @@ def create_host_tcp_server(mytcp:tcp_parameter) -> None:
print('The tcp server is waiting for connection!') print('The tcp server is waiting for connection!')
sock.settimeout(mytcp.timeout) sock.settimeout(mytcp.timeout)
connfd,addr = sock.accept() connfd, addr = sock.accept()
print('The tcp server connected with ',addr) print('The tcp server connected with ', addr)
mytcp.recv_flag = True mytcp.recv_flag = True
mytcp.tcp_bytes = connfd.recv(1024) mytcp.tcp_bytes = connfd.recv(1024)
@ -539,7 +601,7 @@ def create_host_tcp_server(mytcp:tcp_parameter) -> None:
sock.close() sock.close()
def get_ipv6_from_ipv4(ipv4_address:str, br:IdfDut) -> str: def get_ipv6_from_ipv4(ipv4_address: str, br: IdfDut) -> str:
clean_buffer(br) clean_buffer(br)
nat64prefix = get_nat64prefix(br) nat64prefix = get_nat64prefix(br)
ipv4_find = re.findall(r'\d+', ipv4_address) ipv4_find = re.findall(r'\d+', ipv4_address)
@ -549,36 +611,36 @@ def get_ipv6_from_ipv4(ipv4_address:str, br:IdfDut) -> str:
return str(ipv6_get_from_ipv4) return str(ipv6_get_from_ipv4)
def decimal_to_hex(decimal_str:str) -> str: def decimal_to_hex(decimal_str: str) -> str:
decimal_int = int(decimal_str) decimal_int = int(decimal_str)
hex_str = hex(decimal_int)[2:] hex_str = hex(decimal_int)[2:]
return hex_str return hex_str
def get_omrprefix(br:IdfDut) -> str: def get_omrprefix(br: IdfDut) -> str:
execute_command(br, 'br omrprefix') execute_command(br, 'br omrprefix')
omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(omrprefix) return str(omrprefix)
def get_onlinkprefix(br:IdfDut) -> str: def get_onlinkprefix(br: IdfDut) -> str:
execute_command(br, 'br onlinkprefix') execute_command(br, 'br onlinkprefix')
onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(onlinkprefix) return str(onlinkprefix)
def get_nat64prefix(br:IdfDut) -> str: def get_nat64prefix(br: IdfDut) -> str:
execute_command(br, 'br nat64prefix') execute_command(br, 'br nat64prefix')
nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode() nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode()
return str(nat64prefix) return str(nat64prefix)
def execute_command(dut:IdfDut, command:str) -> None: def execute_command(dut: IdfDut, command: str) -> None:
clean_buffer(dut) clean_buffer(dut)
dut.write(command) dut.write(command)
def get_ouput_string(dut:IdfDut, command:str, wait_time:int) -> str: def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str:
execute_command(dut, command) execute_command(dut, command)
tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time) tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time)
clean_buffer(dut) clean_buffer(dut)

View File

@ -0,0 +1,9 @@
CONFIG_OPENTHREAD_NETWORK_CHANNEL=12
CONFIG_OPENTHREAD_NETWORK_MASTERKEY="aabbccddeeff00112233445566778899"
CONFIG_ESP_SLEEP_DEBUG=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
CONFIG_ESP_SLEEP_CACHE_SAFE_ASSERTION=y
CONFIG_OPENTHREAD_CSL_ENABLE=y
CONFIG_OPENTHREAD_CSL_ACCURACY=50
CONFIG_OPENTHREAD_CSL_UNCERTAIN=0
CONFIG_RTC_CLK_SRC_EXT_CRYS=y

View File

@ -3,6 +3,7 @@
# !/usr/bin/env python3 # !/usr/bin/env python3
import copy import copy
import os.path import os.path
import random
import re import re
import secrets import secrets
import subprocess import subprocess
@ -18,7 +19,8 @@ from pytest_embedded_idf.dut import IdfDut
# This file contains the test scripts for Thread: # This file contains the test scripts for Thread:
# Case 1: Thread network formation and attaching # Case 1: Thread network formation and attaching
# A Thread Border Router forms a Thread network, Thread devices attach to it, then test ping connection between them. # A Thread Border Router forms a Thread network, Thread devices attach to it, then test ping
# connection between them.
# Case 2: Bidirectional IPv6 connectivity # Case 2: Bidirectional IPv6 connectivity
# Test IPv6 ping connection between Thread device and Linux Host (via Thread Border Router). # Test IPv6 ping connection between Thread device and Linux Host (via Thread Border Router).
@ -51,13 +53,16 @@ from pytest_embedded_idf.dut import IdfDut
# Test the basic startup and network formation of a Thread device. # Test the basic startup and network formation of a Thread device.
# Case 12: Curl a website via DNS and NAT64 # Case 12: Curl a website via DNS and NAT64
# A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl a website. # A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl
# a website.
# Case 13: Meshcop discovery of Border Router # Case 13: Meshcop discovery of Border Router
# A border router joins a Wi-Fi network, forms a Thread network and publish a meshcop service. Linux Host device discover the mescop service. # A border router joins a Wi-Fi network, forms a Thread network and publish a meshcop service. Linux Host device
# discover the mescop service.
# Case 14: Curl a website over HTTPS via DNS and NAT64 # Case 14: Curl a website over HTTPS via DNS and NAT64
# A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl a https website. # A border router joins a Wi-Fi network and forms a Thread network, a Thread devices attached to it and curl
# a https website.
# Case 15: Thread network formation and attaching with TREL # Case 15: Thread network formation and attaching with TREL
# A TREL device forms a Thread network, other TREL devices attach to it, then test ping connection between them. # A TREL device forms a Thread network, other TREL devices attach to it, then test ping connection between them.
@ -65,6 +70,9 @@ from pytest_embedded_idf.dut import IdfDut
# Case 16: Thread network BR lib check # Case 16: Thread network BR lib check
# Check BR library compatibility # Check BR library compatibility
# Case 17: Synchronized sleepy end device (SSED) test
# Start a Thread ssed device, wait it join the Thread network and check related flags.
@pytest.fixture(scope='module', name='Init_avahi') @pytest.fixture(scope='module', name='Init_avahi')
def fixture_Init_avahi() -> bool: def fixture_Init_avahi() -> bool:
@ -151,9 +159,9 @@ def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut]) -> None:
for cli in cli_list: for cli in cli_list:
cli_mleid_addr = ocf.get_mleid_addr(cli) cli_mleid_addr = ocf.get_mleid_addr(cli)
br_mleid_addr = ocf.get_mleid_addr(br) br_mleid_addr = ocf.get_mleid_addr(br)
rx_nums = ocf.ot_ping(cli, br_mleid_addr, 5)[1] rx_nums = ocf.ot_ping(cli, br_mleid_addr, count=5)[1]
assert rx_nums == 5 assert rx_nums == 5
rx_nums = ocf.ot_ping(br, cli_mleid_addr, 5)[1] rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1]
assert rx_nums == 5 assert rx_nums == 5
finally: finally:
ocf.execute_command(br, 'factoryreset') ocf.execute_command(br, 'factoryreset')
@ -226,7 +234,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut,
host_global_unicast_addr = re.findall(r'\W+(%s(?:\w+:){3}\w+)\W+' % onlinkprefix, str(out_str)) host_global_unicast_addr = re.findall(r'\W+(%s(?:\w+:){3}\w+)\W+' % onlinkprefix, str(out_str))
rx_nums = 0 rx_nums = 0
for ip_addr in host_global_unicast_addr: for ip_addr in host_global_unicast_addr:
txrx_nums = ocf.ot_ping(cli, str(ip_addr), 5) txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5)
rx_nums = rx_nums + int(txrx_nums[1]) rx_nums = rx_nums + int(txrx_nums[1])
assert rx_nums != 0 assert rx_nums != 0
finally: finally:
@ -504,7 +512,7 @@ def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
host_ipv4_address = ocf.get_host_ipv4_address() host_ipv4_address = ocf.get_host_ipv4_address()
print('host_ipv4_address: ', host_ipv4_address) print('host_ipv4_address: ', host_ipv4_address)
rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), 5)[1] rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1]
assert rx_nums != 0 assert rx_nums != 0
finally: finally:
ocf.execute_command(br, 'factoryreset') ocf.execute_command(br, 'factoryreset')
@ -962,3 +970,75 @@ def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None:
finally: finally:
ocf.execute_command(br, 'factoryreset') ocf.execute_command(br, 'factoryreset')
time.sleep(3) time.sleep(3)
# Case 17: SSED test
@pytest.mark.openthread_sleep
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
pytest.param(
'cli|ssed',
2,
f'{os.path.join(os.path.dirname(__file__), "ot_cli")}'
f'|{os.path.join(os.path.dirname(__file__), "ot_sleepy_device/light_sleep")}',
'esp32h2|esp32c6',
f'{ESPPORT1}|{ESPPORT3}',
id='h2-c6',
),
pytest.param(
'cli|ssed',
2,
f'{os.path.join(os.path.dirname(__file__), "ot_cli")}'
f'|{os.path.join(os.path.dirname(__file__), "ot_sleepy_device/light_sleep")}',
'esp32c6|esp32h2',
f'{ESPPORT3}|{ESPPORT1}',
id='c6-h2',
),
],
indirect=True,
)
def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None:
leader = dut[0]
ssed_device = dut[1]
try:
# CI device must have external XTAL to run SSED case, we will check this here first
ssed_device.expect('32k XTAL in use', timeout=10)
ocf.init_thread(leader)
time.sleep(3)
leader_para = ocf.thread_parameter('leader', '', '12', '7766554433221100', False)
ocf.joinThreadNetwork(leader, leader_para)
ocf.wait(leader, 5)
ocf.execute_command(leader, 'networkkey')
dataset = ocf.getDataset(leader)
ocf.execute_command(ssed_device, 'dataset set active ' + dataset)
ssed_device.expect('Done', timeout=5)
ocf.execute_command(ssed_device, 'mode -')
ssed_device.expect('Done', timeout=5)
ocf.execute_command(ssed_device, 'csl period 3000000')
ssed_device.expect('Done', timeout=5)
ocf.execute_command(ssed_device, 'csl channel 12')
ssed_device.expect('Done', timeout=5)
ocf.execute_command(ssed_device, 'ifconfig up')
ssed_device.expect('Done', timeout=5)
ocf.execute_command(ssed_device, 'thread start')
ssed_device.expect(r'(.+)detached -> child', timeout=20)
# add a sleep to wait ssed ready
time.sleep(3)
ssed_device.expect('PMU_SLEEP_PD_TOP: True', timeout=5)
ssed_device.expect('PMU_SLEEP_PD_MODEM: True', timeout=5)
ocf.execute_command(leader, 'child table')
pattern = r'\|\s+\d+\s+\|\s+(0x\w{4})\s+\|.*\|\s+(\w{16})\s+\|'
result = leader.expect(pattern)
rloc16_decode_from_leader = result[1].decode()[2:]
cli_rloc_addr = ':'.join(ocf.get_rloc_addr(leader).split(':')[:-1])
ssed_address = f'{cli_rloc_addr}:{rloc16_decode_from_leader}'
ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6)
time.sleep(random.randint(5, 20))
ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6)
finally:
ocf.execute_command(leader, 'factoryreset')
time.sleep(3)