feat(openthread): examples adaptation of new APIs

This commit is contained in:
Xu Si Yu
2025-09-16 19:39:30 +08:00
parent 41dbd0d03a
commit 4876f122cc
38 changed files with 281 additions and 815 deletions

View File

@@ -8,9 +8,8 @@ import socket
import struct
import subprocess
import time
from collections.abc import Callable
from functools import wraps
from typing import Callable
from typing import Tuple
import netifaces
import pexpect
@@ -133,7 +132,7 @@ def wait_for_join(dut: IdfDut, role: str) -> bool:
return False
def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> Tuple[str, int]:
def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> tuple[str, int]:
clean_buffer(dut)
ip_address = ''
for order in range(1, wifi.retry_times):
@@ -168,7 +167,7 @@ def getDataset(dut: IdfDut) -> str:
def init_thread(dut: IdfDut) -> None:
dut.expect('>', timeout=10)
dut.expect('OpenThread attached to netif', timeout=10)
wait(dut, 3)
reset_thread(dut)
@@ -176,7 +175,6 @@ def init_thread(dut: IdfDut) -> None:
def reset_thread(dut: IdfDut) -> None:
execute_command(dut, 'factoryreset')
dut.expect('OpenThread attached to netif', timeout=20)
dut.expect('>', timeout=10)
wait(dut, 3)
clean_buffer(dut)
@@ -186,7 +184,7 @@ def get_mleid_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr mleid')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return dut_adress
return str(dut_adress)
# get the rloc address of the thread
@@ -194,7 +192,7 @@ def get_rloc_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr rloc')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return dut_adress
return str(dut_adress)
# get the linklocal address of the thread
@@ -202,7 +200,7 @@ def get_linklocal_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr linklocal')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return dut_adress
return str(dut_adress)
# get the global unicast address of the thread:
@@ -211,8 +209,8 @@ def get_global_unicast_addr(dut: IdfDut, br: IdfDut) -> str:
clean_buffer(br)
omrprefix = get_omrprefix(br)
execute_command(dut, 'ipaddr')
dut_adress = dut.expect(r'(%s(?:\w+:){3}\w+)\r' % str(omrprefix), timeout=5)[1].decode()
return dut_adress
dut_adress = dut.expect(rf'({omrprefix}(?:\w+:){{3}}\w+)\r', timeout=5)[1].decode()
return str(dut_adress)
@extract_address('rloc16', r'(\w{4})')
@@ -223,7 +221,7 @@ def get_rloc16_addr(rloc16: str) -> str:
# ping of thread
def ot_ping(
dut: IdfDut, target: str, timeout: int = 5, count: int = 1, size: int = 56, interval: int = 1, hoplimit: int = 64
) -> Tuple[int, int]:
) -> tuple[int, int]:
command = f'ping {str(target)} {size} {count} {interval} {hoplimit} {str(timeout)}'
execute_command(dut, command)
transmitted = dut.expect(r'(\d+) packets transmitted', timeout=60)[1].decode()
@@ -307,7 +305,7 @@ def get_host_interface_name() -> str:
config_path = os.path.join(home_dir, 'config', 'env_config.yml')
try:
if os.path.exists(config_path):
with open(config_path, 'r') as file:
with open(config_path) as file:
config = yaml.safe_load(file)
interface_name = config.get('interface_name')
if interface_name:
@@ -331,7 +329,7 @@ def get_host_interface_name() -> str:
def clean_buffer(dut: IdfDut) -> None:
str_length = str(len(dut.expect(pexpect.TIMEOUT, timeout=0.1)))
dut.expect(r'[\s\S]{%s}' % str(str_length), timeout=10)
dut.expect(rf'[\s\S]{{{str_length}}}', timeout=10)
def check_if_host_receive_ra(br: IdfDut) -> bool:
@@ -422,7 +420,7 @@ def create_host_udp_server(myudp: udp_parameter) -> None:
print('The host start to receive message!')
myudp.udp_bytes = (sock.recvfrom(1024))[0]
print('The host has received message: ', myudp.udp_bytes)
except socket.error:
except OSError:
print('The host did not receive message!')
finally:
print('Close the socket.')
@@ -442,7 +440,7 @@ def host_udp_send_message(udp_target: udp_parameter) -> None:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 32)
print('Host is sending message')
sock.sendto(udp_target.udp_bytes, (udp_target.addr, udp_target.port))
except socket.error:
except OSError:
print('Host cannot send message')
finally:
sock.close()
@@ -459,7 +457,7 @@ def get_host_ipv4_address() -> str:
out_str = out_bytes.decode('utf-8')
host_ipv4_address = ''
host_ipv4_address = re.findall(r'((?:\d+.){3}\d+)', str(out_str))[0]
return host_ipv4_address
return str(host_ipv4_address)
def restart_avahi() -> None:
@@ -592,7 +590,7 @@ def create_host_tcp_server(mytcp: tcp_parameter) -> None:
mytcp.tcp_bytes = connfd.recv(1024)
print('The tcp server has received message: ', mytcp.tcp_bytes)
except socket.error:
except OSError:
if mytcp.recv_flag:
print('The tcp server did not receive message!')
else:
@@ -636,9 +634,9 @@ def get_nat64prefix(br: IdfDut) -> str:
return str(nat64prefix)
def execute_command(dut: IdfDut, command: str) -> None:
def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None:
clean_buffer(dut)
dut.write(command)
dut.write(prefix + command)
def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str: