diff --git a/examples/openthread/ot_br/main/idf_component.yml b/examples/openthread/ot_br/main/idf_component.yml index 2f9c0f6440..c5955f8e3b 100644 --- a/examples/openthread/ot_br/main/idf_component.yml +++ b/examples/openthread/ot_br/main/idf_component.yml @@ -1,6 +1,6 @@ ## IDF Component Manager Manifest File dependencies: - espressif/esp_ot_cli_extension: "~0.1.0" + espressif/esp_ot_cli_extension: "~0.2.0" espressif/mdns: "^1.0.3" ## Required IDF version idf: diff --git a/examples/openthread/ot_ci_function.py b/examples/openthread/ot_ci_function.py index ac49307dd8..7fdc35a0e2 100644 --- a/examples/openthread/ot_ci_function.py +++ b/examples/openthread/ot_ci_function.py @@ -30,20 +30,20 @@ def reset_thread(dut:IdfDut) -> None: def config_thread(dut:IdfDut, model:str, dataset:str='0') -> Union[str, None]: if model == 'random': dut.write('dataset init new') - dut.expect('Done', timeout=2) + dut.expect('Done', timeout=5) dut.write('dataset commit active') - dut.expect('Done', timeout=2) + dut.expect('Done', timeout=5) dut.write('ifconfig up') - dut.expect('Done', timeout=2) + dut.expect('Done', timeout=5) dut.write('dataset active -x') # get dataset dut_data = dut.expect(r'\n(\w{212})\r', timeout=5)[1].decode() return str(dut_data) if model == 'appointed': tmp = 'dataset set active ' + str(dataset) dut.write(tmp) - dut.expect('Done', timeout=2) + dut.expect('Done', timeout=5) dut.write('ifconfig up') - dut.expect('Done', timeout=2) + dut.expect('Done', timeout=5) return None return None @@ -121,9 +121,9 @@ def form_network_using_manual_configuration(leader:IdfDut, child:IdfDut, leader_ reset_thread(child) clean_buffer(child) leader.write('channel 12') - leader.expect('Done', timeout=2) + leader.expect('Done', timeout=5) child.write('channel 12') - child.expect('Done', timeout=2) + child.expect('Done', timeout=5) res = '0000' if wifi_psk != '0000': res = connect_wifi(wifi, wifi_ssid, wifi_psk, 10)[0] @@ -134,7 +134,7 @@ def form_network_using_manual_configuration(leader:IdfDut, child:IdfDut, leader_ config_thread(leader, 'appointed', thread_dataset) if leader_name == 'br': leader.write('bbr enable') - leader.expect('Done', timeout=2) + leader.expect('Done', timeout=5) role = start_thread(leader) assert role == 'leader' if thread_dataset_model == 'random': @@ -143,7 +143,7 @@ def form_network_using_manual_configuration(leader:IdfDut, child:IdfDut, leader_ config_thread(child, 'appointed', thread_dataset) if leader_name != 'br': child.write('bbr enable') - child.expect('Done', timeout=2) + child.expect('Done', timeout=5) role = start_thread(child) assert role == 'child' wait(leader, 10) @@ -168,10 +168,10 @@ def connect_wifi(dut:IdfDut, ssid:str, psk:str, nums:int) -> Tuple[str, int]: information = '' for order in range(1, nums): dut.write('wifi connect -s ' + str(ssid) + ' -p ' + str(psk)) - tmp = dut.expect(pexpect.TIMEOUT, timeout=5) + tmp = dut.expect(pexpect.TIMEOUT, timeout=10) if 'sta ip' in str(tmp): ip_address = re.findall(r'sta ip: (\w+.\w+.\w+.\w+),', str(tmp))[0] - information = dut.expect(r'wifi sta (\w+ \w+ \w+)\W', timeout=5)[1].decode() + information = dut.expect(r'wifi sta (\w+ \w+ \w+)\W', timeout=20)[1].decode() if information == 'is connected successfully': break assert information == 'is connected successfully' @@ -277,7 +277,7 @@ def check_ipmaddr(dut:IdfDut) -> bool: def thread_is_joined_group(dut:IdfDut) -> bool: command = 'mcast join ' + thread_ipv6_group dut.write(command) - dut.expect('Done', timeout=2) + dut.expect('Done', timeout=5) order = 0 while order < 3: if check_ipmaddr(dut): @@ -288,22 +288,22 @@ def thread_is_joined_group(dut:IdfDut) -> bool: return False -host_ipv6_group = 'ff04::125' - - -def host_joined_group() -> bool: +def host_joined_group(group:str='') -> bool: interface_name = get_host_interface_name() command = 'netstat -g | grep ' + str(interface_name) out_str = subprocess.getoutput(command) print('groups:\n', str(out_str)) - return host_ipv6_group in str(out_str) + return group in str(out_str) class udp_parameter: - def __init__(self, group:str='', try_join_udp_group:bool=False, timeout:float=15.0, udp_bytes:bytes=b''): + def __init__(self, udp_type:str='', addr:str='::', port:int=5090, group:str='', init_flag:bool=False, timeout:float=15.0, udp_bytes:bytes=b''): + self.udp_type = udp_type + self.addr = addr + self.port = port self.group = group - self.try_join_udp_group = try_join_udp_group + self.init_flag = init_flag self.timeout = timeout self.udp_bytes = udp_bytes @@ -311,16 +311,22 @@ class udp_parameter: def create_host_udp_server(myudp:udp_parameter) -> None: interface_name = get_host_interface_name() try: + if myudp.udp_type == 'INET6': + AF_INET = socket.AF_INET6 + else: + AF_INET = socket.AF_INET print('The host start to create udp server!') if_index = socket.if_nametoindex(interface_name) - sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - sock.bind(('::', 5090)) - sock.setsockopt( - socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, - struct.pack('16si', socket.inet_pton(socket.AF_INET6, myudp.group), - if_index)) - myudp.try_join_udp_group = True + sock = socket.socket(AF_INET, socket.SOCK_DGRAM) + sock.bind((myudp.addr, myudp.port)) + + if myudp.udp_type == 'INET6' and myudp.group != '': + sock.setsockopt( + socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, + struct.pack('16si', socket.inet_pton(socket.AF_INET6, myudp.group), + if_index)) sock.settimeout(myudp.timeout) + myudp.init_flag = True print('The host start to receive message!') myudp.udp_bytes = (sock.recvfrom(1024))[0] print('The host has received message: ', myudp.udp_bytes) @@ -333,3 +339,137 @@ def create_host_udp_server(myudp:udp_parameter) -> None: def wait(dut:IdfDut, wait_time:float) -> None: dut.expect(pexpect.TIMEOUT, timeout=wait_time) + + +def get_host_ipv4_address() -> str: + interface_name = get_host_interface_name() + command = 'ifconfig ' + interface_name + " | grep -w 'inet' | awk '{print $2}'" + out_bytes = subprocess.check_output(command, shell=True, timeout=5) + out_str = out_bytes.decode('utf-8') + host_ipv4_address = '' + host_ipv4_address = re.findall(r'((?:\d+.){3}\d+)', str(out_str))[0] + return host_ipv4_address + + +def start_avahi() -> None: + time.sleep(1) + command = '/etc/init.d/dbus start' + subprocess.Popen(command, shell=True) + time.sleep(5) + command = 'avahi-daemon' + subprocess.Popen(command, shell=True) + time.sleep(5) + + +def host_publish_service() -> None: + command = 'avahi-publish-service testxxx _testxxx._udp 12347 test=1235 dn="for_ci_br_test"' + subprocess.Popen(command, shell=True) + time.sleep(2) + + +def host_close_service() -> None: + command = "ps | grep avahi-publish-s | awk '{print $1}'" + out_bytes = subprocess.check_output(command, shell=True, timeout=5) + out_str = out_bytes.decode('utf-8') + the_pid = re.findall(r'(\d+)\n', str(out_str)) + for pid in the_pid: + command = 'kill -9 ' + pid + subprocess.call(command, shell=True, timeout=5) + time.sleep(1) + + +def close_host_interface() -> None: + interface_name = get_host_interface_name() + flag = False + try: + command = 'ifconfig ' + interface_name + ' down' + subprocess.call(command, shell=True, timeout=5) + time.sleep(1) + flag = True + finally: + time.sleep(1) + assert flag + + +def open_host_interface() -> None: + interface_name = get_host_interface_name() + flag = False + try: + command = 'ifconfig ' + interface_name + ' up' + subprocess.call(command, shell=True, timeout=5) + time.sleep(1) + flag = True + finally: + time.sleep(1) + assert flag + + +def get_domain() -> str: + hostname = socket.gethostname() + print('hostname is: ', hostname) + command = 'ps -aux | grep avahi-daemon | grep running' + out_str = subprocess.getoutput(command) + print('avahi status:\n', out_str) + role = re.findall(r'\[([\w\W]+)\.local\]', str(out_str))[0] + print('active host is: ', role) + return str(role) + + +class tcp_parameter: + + def __init__(self, tcp_type:str='', addr:str='::', port:int=12345, listen_flag:bool=False, recv_flag:bool=False, timeout:float=15.0, tcp_bytes:bytes=b''): + self.tcp_type = tcp_type + self.addr = addr + self.port = port + self.listen_flag = listen_flag + self.recv_flag = recv_flag + self.timeout = timeout + self.tcp_bytes = tcp_bytes + + +def create_host_tcp_server(mytcp:tcp_parameter) -> None: + try: + if mytcp.tcp_type == 'INET6': + AF_INET = socket.AF_INET6 + else: + AF_INET = socket.AF_INET + print('The host start to create a tcp server!') + sock = socket.socket(AF_INET, socket.SOCK_STREAM) + sock.bind((mytcp.addr, mytcp.port)) + sock.listen(5) + mytcp.listen_flag = True + + print('The tcp server is waiting for connection!') + sock.settimeout(mytcp.timeout) + connfd,addr = sock.accept() + print('The tcp server connected with ',addr) + mytcp.recv_flag = True + + mytcp.tcp_bytes = connfd.recv(1024) + print('The tcp server has received message: ', mytcp.tcp_bytes) + + except socket.error: + if mytcp.recv_flag: + print('The tcp server did not receive message!') + else: + print('The tcp server fail to connect!') + finally: + print('Close the socket.') + sock.close() + + +def get_ipv6_from_ipv4(ipv4_address:str, br:IdfDut) -> str: + clean_buffer(br) + br.write('br nat64prefix') + omrprefix = br.expect(r'\n((?:\w+:){6}):/\d+', timeout=5)[1].decode() + ipv4_find = re.findall(r'\d+', ipv4_address) + ipv6_16_1 = decimal_to_hex(ipv4_find[0]) + decimal_to_hex(ipv4_find[1]) + ipv6_16_2 = decimal_to_hex(ipv4_find[2]) + decimal_to_hex(ipv4_find[3]) + ipv6_get_from_ipv4 = omrprefix + ':' + ipv6_16_1 + ':' + ipv6_16_2 + return str(ipv6_get_from_ipv4) + + +def decimal_to_hex(decimal_str:str) -> str: + decimal_int = int(decimal_str) + hex_str = hex(decimal_int)[2:] + return hex_str diff --git a/examples/openthread/ot_cli/main/idf_component.yml b/examples/openthread/ot_cli/main/idf_component.yml index 2246fc9a0e..768f5e8b22 100644 --- a/examples/openthread/ot_cli/main/idf_component.yml +++ b/examples/openthread/ot_cli/main/idf_component.yml @@ -1,5 +1,5 @@ ## IDF Component Manager Manifest File dependencies: - espressif/esp_ot_cli_extension: "~0.1.0" + espressif/esp_ot_cli_extension: "~0.2.0" idf: version: ">=4.1.0" diff --git a/examples/openthread/pytest_otbr.py b/examples/openthread/pytest_otbr.py index ea012241d3..939590faeb 100644 --- a/examples/openthread/pytest_otbr.py +++ b/examples/openthread/pytest_otbr.py @@ -11,6 +11,7 @@ import time from typing import Tuple import ot_ci_function as ocf +import pexpect import pytest from pytest_embedded_idf.dut import IdfDut @@ -28,9 +29,33 @@ from pytest_embedded_idf.dut import IdfDut # Case 4: Multicast forwarding from Thread to Wi-Fi network # Linux Host joins the multicast group, test group communication from Thread to Wi-Fi network. +# Case 5: discover Serice published by Thread device +# Thread device publishes the service, Linux Host discovers the service on Wi-Fi network. + +# Case 6: discover Serice published by W-Fi device +# Linux Host device publishes the service on Wi-Fi network, Thread device discovers the service. + +# Case 7: ICMP communication via NAT64 +# Thread device (IPV6) ping the host device (IPV4) via NAT64. + +# Case 8: UDP communication via NAT64 +# Thread device (IPV6) send udp message to the host device (IPV4) via NAT64. + +# Case 9: TCP communication via NAT64 +# Thread device (IPV6) send tcp message to the host device (IPV4) via NAT64. + + +@pytest.fixture(scope='module', name='Init_avahi') +def fixture_Init_avahi() -> bool: + print('Init Avahi') + ocf.start_avahi() + time.sleep(10) + return True + @pytest.fixture(name='Init_interface') def fixture_Init_interface() -> bool: + print('Init interface') ocf.init_interface_ipv6_address() ocf.reset_host_interface() time.sleep(30) @@ -45,8 +70,8 @@ wifi_psk = 'otcitest888' # Case 1: Thread network formation and attaching @pytest.mark.esp32s3 @pytest.mark.esp32h4 -@pytest.mark.timeout(40 * 60) @pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.parametrize( 'port, config, count, app_path, beta_target, target', [ ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, @@ -60,7 +85,7 @@ wifi_psk = 'otcitest888' def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] - dut[0].close() + dut[0].serial.stop_redirect_thread() dataset = '-1' ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, '0000') @@ -83,8 +108,8 @@ def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut]) -> None: # Case 2: Bidirectional IPv6 connectivity @pytest.mark.esp32s3 @pytest.mark.esp32h4 -@pytest.mark.timeout(40 * 60) @pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.parametrize( 'port, config, count, app_path, beta_target, target', [ ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, @@ -99,7 +124,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, br = dut[2] cli = dut[1] assert Init_interface - dut[0].close() + dut[0].serial.stop_redirect_thread() dataset = '-1' ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) @@ -134,8 +159,8 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, # Case 3: Multicast forwarding from Wi-Fi to Thread network @pytest.mark.esp32s3 @pytest.mark.esp32h4 -@pytest.mark.timeout(40 * 60) @pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.parametrize( 'port, config, count, app_path, beta_target, target', [ ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, @@ -150,7 +175,7 @@ def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, br = dut[2] cli = dut[1] assert Init_interface - dut[0].close() + dut[0].serial.stop_redirect_thread() dataset = '-1' ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) @@ -158,7 +183,7 @@ def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, try: assert ocf.is_joined_wifi_network(br) br.write('bbr') - br.expect('server16', timeout=2) + br.expect('server16', timeout=5) assert ocf.thread_is_joined_group(cli) interface_name = ocf.get_host_interface_name() command = 'ping -I ' + str(interface_name) + ' -t 64 ff04::125 -c 10' @@ -177,8 +202,8 @@ def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, # Case 4: Multicast forwarding from Thread to Wi-Fi network @pytest.mark.esp32s3 @pytest.mark.esp32h4 -@pytest.mark.timeout(40 * 60) @pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.parametrize( 'port, config, count, app_path, beta_target, target', [ ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, @@ -193,31 +218,29 @@ def test_multicast_forwarding_B(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, br = dut[2] cli = dut[1] assert Init_interface - dut[0].close() + dut[0].serial.stop_redirect_thread() dataset = '-1' ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) try: assert ocf.is_joined_wifi_network(br) br.write('bbr') - br.expect('server16', timeout=2) + br.expect('server16', timeout=5) cli.write('udp open') - cli.expect('Done', timeout=2) - ocf.wait(cli, 1) - cli.write('udp open') - cli.expect('Already', timeout=2) - myudp = ocf.udp_parameter('ff04::125', False, 15.0, b'') + cli.expect('Done', timeout=5) + ocf.wait(cli, 3) + myudp = ocf.udp_parameter('INET6', '::', 5090, 'ff04::125', False, 15.0, b'') udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp, )) udp_mission.start() start_time = time.time() - while not myudp.try_join_udp_group: + while not myudp.init_flag: if (time.time() - start_time) > 10: assert False - assert ocf.host_joined_group() + assert ocf.host_joined_group('ff04::125') for num in range(0, 3): command = 'udp send ff04::125 5090 hello' + str(num) cli.write(command) - cli.expect('Done', timeout=2) + cli.expect('Done', timeout=5) ocf.wait(cli, 0.5) while udp_mission.is_alive(): time.sleep(1) @@ -226,3 +249,283 @@ def test_multicast_forwarding_B(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, cli.write('factoryreset') time.sleep(3) assert b'hello' in myudp.udp_bytes + + +# Case 5: discover dervice published by Thread device +@pytest.mark.esp32s3 +@pytest.mark.esp32h4 +@pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.parametrize( + 'port, config, count, app_path, beta_target, target', [ + ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, + f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_br")}', + 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'), + ], + indirect=True, +) +def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: + br = dut[2] + cli = dut[1] + assert Init_interface + assert Init_avahi + dut[0].serial.stop_redirect_thread() + + dataset = '-1' + ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) + ocf.wait(cli, 3) + flag = False + try: + assert ocf.is_joined_wifi_network(br) + command = 'avahi-browse -rt _testyyy._udp' + out_str = subprocess.getoutput(command) + print('avahi-browse:\n', str(out_str)) + assert 'myTest' not in str(out_str) + hostname = 'myTest' + command = 'srp client host name ' + hostname + cli.write(command) + cli.expect('Done', timeout=5) + cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br) + print('cli_global_unicast_addr', cli_global_unicast_addr) + command = 'srp client host address ' + str(cli_global_unicast_addr) + cli.write(command) + cli.expect('Done', timeout=5) + port = '12348' + command = 'srp client service add my-service _testyyy._udp ' + port + cli.write(command) + cli.expect('Done', timeout=5) + cli.write('srp client autostart enable') + cli.expect('Done', timeout=5) + ocf.wait(cli, 3) + command = 'avahi-browse -rt _testyyy._udp' + out_str = subprocess.getoutput(command) + print('avahi-browse:\n', str(out_str)) + assert 'myTest' in str(out_str) + flag = True + finally: + br.write('factoryreset') + cli.write('factoryreset') + time.sleep(3) + assert flag + + +# Case 6: discover dervice published by Wi-Fi device +@pytest.mark.esp32s3 +@pytest.mark.esp32h4 +@pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.parametrize( + 'port, config, count, app_path, beta_target, target', [ + ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, + f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_br")}', + 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'), + ], + indirect=True, +) +def test_service_discovery_of_WiFi_device(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: + br = dut[2] + cli = dut[1] + assert Init_interface + assert Init_avahi + dut[0].serial.stop_redirect_thread() + + dataset = '-1' + ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) + ocf.wait(cli, 3) + flag = False + try: + assert ocf.is_joined_wifi_network(br) + br_global_unicast_addr = ocf.get_global_unicast_addr(br, br) + command = 'dns config ' + br_global_unicast_addr + ocf.clean_buffer(cli) + cli.write(command) + cli.expect('Done', timeout=5) + ocf.wait(cli, 1) + command = 'dns resolve FA000123.default.service.arpa.' + ocf.clean_buffer(cli) + cli.write(command) + cli.expect('Error', timeout=15) + domain_name = ocf.get_domain() + print('domain name is: ', domain_name) + command = 'dns resolve ' + domain_name + '.default.service.arpa.' + ocf.clean_buffer(cli) + cli.write(command) + cli.expect('TTL', timeout=10) + cli.expect('Done', timeout=10) + ocf.clean_buffer(cli) + cli.write('dns browse _testxxx._udp.default.service.arpa') + tmp = cli.expect(pexpect.TIMEOUT, timeout=5) + assert 'Port:12347' not in str(tmp) + ocf.host_publish_service() + ocf.wait(cli, 5) + ocf.clean_buffer(cli) + cli.write('dns browse _testxxx._udp.default.service.arpa') + tmp = cli.expect(pexpect.TIMEOUT, timeout=5) + assert 'response for _testxxx' in str(tmp) + assert 'Port:12347' in str(tmp) + ocf.clean_buffer(cli) + cli.write('dns service testxxx _testxxx._udp.default.service.arpa.') + tmp = cli.expect(pexpect.TIMEOUT, timeout=5) + assert 'response for testxxx' in str(tmp) + assert 'Port:12347' in str(tmp) + flag = True + finally: + ocf.host_close_service() + br.write('factoryreset') + cli.write('factoryreset') + time.sleep(3) + assert flag + + +# Case 7: ICMP communication via NAT64 +@pytest.mark.esp32s3 +@pytest.mark.esp32h4 +@pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.parametrize( + 'port, config, count, app_path, beta_target, target', [ + ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, + f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_br")}', + 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'), + ], + indirect=True, +) +def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: + br = dut[2] + cli = dut[1] + assert Init_interface + dut[0].serial.stop_redirect_thread() + + dataset = '-1' + ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) + flag = False + try: + assert ocf.is_joined_wifi_network(br) + host_ipv4_address = ocf.get_host_ipv4_address() + print('host_ipv4_address: ', host_ipv4_address) + rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), 5)[1] + assert rx_nums != 0 + flag = True + finally: + br.write('factoryreset') + cli.write('factoryreset') + time.sleep(3) + assert flag + + +# Case 8: UDP communication via NAT64 +@pytest.mark.esp32s3 +@pytest.mark.esp32h4 +@pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.parametrize( + 'port, config, count, app_path, beta_target, target', [ + ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, + f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_br")}', + 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'), + ], + indirect=True, +) +def test_UDP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: + br = dut[2] + cli = dut[1] + assert Init_interface + dut[0].serial.stop_redirect_thread() + + dataset = '-1' + ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) + try: + assert ocf.is_joined_wifi_network(br) + br.write('bbr') + br.expect('server16', timeout=5) + cli.write('udp open') + cli.expect('Done', timeout=5) + ocf.wait(cli, 3) + host_ipv4_address = ocf.get_host_ipv4_address() + print('host_ipv4_address: ', host_ipv4_address) + myudp = ocf.udp_parameter('INET4', host_ipv4_address, 5090, '', False, 15.0, b'') + udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp, )) + udp_mission.start() + start_time = time.time() + while not myudp.init_flag: + if (time.time() - start_time) > 10: + assert False + for num in range(0, 3): + command = 'udp send ' + host_ipv4_address + ' 5090 hello' + str(num) + cli.write(command) + cli.expect('Done', timeout=5) + ocf.wait(cli, 0.5) + while udp_mission.is_alive(): + time.sleep(1) + finally: + br.write('factoryreset') + cli.write('factoryreset') + time.sleep(3) + assert b'hello' in myudp.udp_bytes + + +# Case 9: TCP communication via NAT64 +@pytest.mark.esp32s3 +@pytest.mark.esp32h4 +@pytest.mark.i154_multi_dut +@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.parametrize( + 'port, config, count, app_path, beta_target, target', [ + ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, + f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}' + f'|{os.path.join(os.path.dirname(__file__), "ot_br")}', + 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'), + ], + indirect=True, +) +def test_TCP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: + br = dut[2] + cli = dut[1] + assert Init_interface + dut[0].serial.stop_redirect_thread() + + dataset = '-1' + ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk) + try: + assert ocf.is_joined_wifi_network(br) + br.write('bbr') + br.expect('server16', timeout=5) + cli.write('tcpsockclient open') + cli.expect('Done', timeout=5) + ocf.wait(cli, 3) + host_ipv4_address = ocf.get_host_ipv4_address() + connect_address = ocf.get_ipv6_from_ipv4(host_ipv4_address, br) + print('connect_address is: ', connect_address) + mytcp = ocf.tcp_parameter('INET4', host_ipv4_address, 12345, False, False, 15.0, b'') + tcp_mission = threading.Thread(target=ocf.create_host_tcp_server, args=(mytcp, )) + tcp_mission.start() + start_time = time.time() + while not mytcp.listen_flag: + if (time.time() - start_time) > 10: + assert False + command = 'tcpsockclient connect ' + connect_address + ' 12345' + cli.write(command) + cli.expect('Successfully connected', timeout=10) + start_time = time.time() + while not mytcp.recv_flag: + if (time.time() - start_time) > 10: + assert False + command = 'tcpsockclient send hello' + cli.write(command) + cli.expect('Done', timeout=5) + while tcp_mission.is_alive(): + time.sleep(1) + finally: + br.write('factoryreset') + cli.write('factoryreset') + time.sleep(3) + assert b'hello' in mytcp.tcp_bytes