forked from espressif/esp-idf
CI: fix ipv6 test failed on some runners
This commit is contained in:
@@ -15,9 +15,9 @@ import sys
|
|||||||
from builtins import input
|
from builtins import input
|
||||||
from threading import Event, Thread
|
from threading import Event, Thread
|
||||||
|
|
||||||
import netifaces
|
|
||||||
import ttfw_idf
|
import ttfw_idf
|
||||||
from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip
|
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
|
||||||
|
get_my_interface_by_dest_ip)
|
||||||
|
|
||||||
# ----------- Config ----------
|
# ----------- Config ----------
|
||||||
PORT = 3333
|
PORT = 3333
|
||||||
@@ -111,13 +111,13 @@ def test_examples_protocol_socket_tcpclient(env, extra_data):
|
|||||||
my_interface = get_my_interface_by_dest_ip(ipv4)
|
my_interface = get_my_interface_by_dest_ip(ipv4)
|
||||||
# test IPv4
|
# test IPv4
|
||||||
with TcpServer(PORT, socket.AF_INET):
|
with TcpServer(PORT, socket.AF_INET):
|
||||||
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET)
|
server_ip = get_host_ip4_by_dest_ip(ipv4)
|
||||||
print('Connect tcp client to server IP={}'.format(server_ip))
|
print('Connect tcp client to server IP={}'.format(server_ip))
|
||||||
dut1.write(server_ip)
|
dut1.write(server_ip)
|
||||||
dut1.expect(re.compile(r'OK: Message from ESP32'))
|
dut1.expect(re.compile(r'OK: Message from ESP32'))
|
||||||
# test IPv6
|
# test IPv6
|
||||||
with TcpServer(PORT, socket.AF_INET6):
|
with TcpServer(PORT, socket.AF_INET6):
|
||||||
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6)
|
server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
|
||||||
print('Connect tcp client to server IP={}'.format(server_ip))
|
print('Connect tcp client to server IP={}'.format(server_ip))
|
||||||
dut1.write(server_ip)
|
dut1.write(server_ip)
|
||||||
dut1.expect(re.compile(r'OK: Message from ESP32'))
|
dut1.expect(re.compile(r'OK: Message from ESP32'))
|
||||||
|
@@ -15,9 +15,9 @@ import sys
|
|||||||
from builtins import input
|
from builtins import input
|
||||||
from threading import Event, Thread
|
from threading import Event, Thread
|
||||||
|
|
||||||
import netifaces
|
|
||||||
import ttfw_idf
|
import ttfw_idf
|
||||||
from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip
|
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
|
||||||
|
get_my_interface_by_dest_ip)
|
||||||
from tiny_test_fw.DUT import ExpectTimeout
|
from tiny_test_fw.DUT import ExpectTimeout
|
||||||
|
|
||||||
# ----------- Config ----------
|
# ----------- Config ----------
|
||||||
@@ -106,7 +106,7 @@ def test_examples_protocol_socket_udpclient(env, extra_data):
|
|||||||
my_interface = get_my_interface_by_dest_ip(ipv4)
|
my_interface = get_my_interface_by_dest_ip(ipv4)
|
||||||
# test IPv4
|
# test IPv4
|
||||||
with UdpServer(PORT, socket.AF_INET):
|
with UdpServer(PORT, socket.AF_INET):
|
||||||
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET)
|
server_ip = get_host_ip4_by_dest_ip(ipv4)
|
||||||
print('Connect udp client to server IP={}'.format(server_ip))
|
print('Connect udp client to server IP={}'.format(server_ip))
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
try:
|
try:
|
||||||
@@ -119,7 +119,7 @@ def test_examples_protocol_socket_udpclient(env, extra_data):
|
|||||||
raise ValueError('Failed to send/recv udp packets.')
|
raise ValueError('Failed to send/recv udp packets.')
|
||||||
# test IPv6
|
# test IPv6
|
||||||
with UdpServer(PORT, socket.AF_INET6):
|
with UdpServer(PORT, socket.AF_INET6):
|
||||||
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6)
|
server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
|
||||||
print('Connect udp client to server IP={}'.format(server_ip))
|
print('Connect udp client to server IP={}'.format(server_ip))
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
try:
|
try:
|
||||||
|
@@ -4,7 +4,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
from typing import Any
|
from typing import Any, List
|
||||||
|
|
||||||
import netifaces
|
import netifaces
|
||||||
import yaml
|
import yaml
|
||||||
@@ -26,10 +26,23 @@ $IDF_PATH/EnvConfig.yml:
|
|||||||
|
|
||||||
|
|
||||||
def get_host_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str:
|
def get_host_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str:
|
||||||
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
|
if ip_type == netifaces.AF_INET:
|
||||||
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
|
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
|
||||||
assert isinstance(host_ip, str)
|
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
|
||||||
return host_ip
|
assert isinstance(host_ip, str)
|
||||||
|
return host_ip
|
||||||
|
elif ip_type == netifaces.AF_INET6:
|
||||||
|
ip6_addrs: List[str] = []
|
||||||
|
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
|
||||||
|
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
|
||||||
|
assert isinstance(host_ip, str)
|
||||||
|
# prefer to use link local address due to example settings
|
||||||
|
if host_ip.startswith('FE80::'):
|
||||||
|
ip6_addrs.insert(0, host_ip)
|
||||||
|
else:
|
||||||
|
ip6_addrs.append(host_ip)
|
||||||
|
if ip6_addrs:
|
||||||
|
return ip6_addrs[0]
|
||||||
return ''
|
return ''
|
||||||
|
|
||||||
|
|
||||||
@@ -45,6 +58,17 @@ def get_host_ip4_by_dest_ip(dest_ip: str = '') -> str:
|
|||||||
return host_ip
|
return host_ip
|
||||||
|
|
||||||
|
|
||||||
|
def get_host_ip6_by_dest_ip(dest_ip: str, interface: str) -> str:
|
||||||
|
addr_info = socket.getaddrinfo(f'{dest_ip}%{interface}', 80, socket.AF_INET6, socket.SOCK_DGRAM)
|
||||||
|
s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
||||||
|
s1.connect(addr_info[0][-1])
|
||||||
|
host_ip = s1.getsockname()[0]
|
||||||
|
s1.close()
|
||||||
|
assert isinstance(host_ip, str)
|
||||||
|
print(f'Using host ip: {host_ip}')
|
||||||
|
return host_ip
|
||||||
|
|
||||||
|
|
||||||
def get_my_interface_by_dest_ip(dest_ip: str = '') -> str:
|
def get_my_interface_by_dest_ip(dest_ip: str = '') -> str:
|
||||||
my_ip = get_host_ip4_by_dest_ip(dest_ip)
|
my_ip = get_host_ip4_by_dest_ip(dest_ip)
|
||||||
interfaces = netifaces.interfaces()
|
interfaces = netifaces.interfaces()
|
||||||
|
Reference in New Issue
Block a user