diff --git a/examples/network/bridge/pytest_example_bridge.py b/examples/network/bridge/pytest_example_bridge.py index 007f4c2d2b..aa0108de7c 100644 --- a/examples/network/bridge/pytest_example_bridge.py +++ b/examples/network/bridge/pytest_example_bridge.py @@ -1,7 +1,10 @@ # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: CC0-1.0 +import base64 +import io import ipaddress import logging +import os import re import socket import subprocess @@ -9,6 +12,7 @@ import time from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor from typing import List +from typing import Optional from typing import Union import netifaces @@ -18,8 +22,11 @@ from common_test_methods import get_host_ip_by_interface from netmiko import ConnectHandler from pytest_embedded import Dut from pytest_embedded_idf.utils import idf_parametrize + # Testbed configuration +ETHVM_ENDNODE_USER = 'ci.ethvm' + BR_PORTS_NUM = 2 IPERF_BW_LIM = 6 MIN_UDP_THROUGHPUT = 5 @@ -27,11 +34,17 @@ MIN_TCP_THROUGHPUT = 4 class EndnodeSsh: - def __init__(self, host_ip: str, usr: str, passwd: str): + def __init__(self, host_ip: str, usr: str, passwd: Optional[str] = None): + key_string = os.getenv('CI_ETHVM_KEY') + key = None + if key_string: + decoded_key_string = base64.b64decode(key_string) + key = paramiko.Ed25519Key.from_private_key(io.StringIO(decoded_key_string.decode('utf-8'))) + self.host_ip = host_ip self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.ssh_client.connect(hostname=self.host_ip, username=usr, password=passwd) + self.ssh_client.connect(hostname=self.host_ip, username=usr, pkey=key, password=passwd) self.executor: ThreadPoolExecutor self.async_result: Future @@ -281,14 +294,16 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: port_num = int(sw_info.group(2)) port_num_endnode = int(port_num) + 1 # endnode address is always + 1 to the host - endnode = EndnodeSsh(f'10.10.{sw_num}.{port_num_endnode}', dev_user, dev_password) + endnode = EndnodeSsh(f'10.10.{sw_num}.{port_num_endnode}', ETHVM_ENDNODE_USER) switch1 = SwitchSsh(f'10.10.{sw_num}.100', dev_user, dev_password, SwitchSsh.EDGE_SWITCH_10XP) # Collect all addresses in our network # ------------------------------------ # Bridge (DUT) MAC br_mac = dut.expect( - r'esp_netif_br_glue: ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})' + r'esp_netif_br_glue: ' + r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:' + r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})' ) br_mac = br_mac.group(1).decode('utf-8') logging.info('ESP Bridge MAC %s', br_mac) @@ -387,7 +402,8 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: bandwidth_udp = run_iperf('udp', endnode, host_ip, IPERF_BW_LIM, 5) if bandwidth_udp < MIN_UDP_THROUGHPUT: logging.warning( - 'Unicast UDP bandwidth was less than expected. Trying again over longer period to compensate transient drops.' + 'Unicast UDP bandwidth was less than expected. ' + 'Trying again over longer period to compensate transient drops.' ) bandwidth_udp = run_iperf('udp', endnode, host_ip, IPERF_BW_LIM, 60) logging.info('Unicast UDP average bandwidth: %s Mbits/s', bandwidth_udp) @@ -396,7 +412,8 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: bandwidth_tcp = run_iperf('tcp', endnode, host_ip, IPERF_BW_LIM, 5) if bandwidth_tcp < MIN_TCP_THROUGHPUT: logging.warning( - 'Unicast TCP bandwidth was less than expected. Trying again over longer period to compensate transient drops.' + 'Unicast TCP bandwidth was less than expected. ' + 'Trying again over longer period to compensate transient drops.' ) bandwidth_tcp = run_iperf('tcp', endnode, host_ip, IPERF_BW_LIM, 60) logging.info('Unicast TCP average bandwidth: %s Mbits/s', bandwidth_tcp) @@ -405,7 +422,8 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: bandwidth_mcast_udp = run_iperf('udp', endnode, '224.0.1.4', IPERF_BW_LIM, 5, host_if, endnode_if) if bandwidth_mcast_udp < MIN_UDP_THROUGHPUT: logging.warning( - 'Multicast UDP bandwidth was less than expected. Trying again over longer period to compensate transient drops.' + 'Multicast UDP bandwidth was less than expected. ' + 'Trying again over longer period to compensate transient drops.' ) bandwidth_mcast_udp = run_iperf('udp', endnode, '224.0.1.4', IPERF_BW_LIM, 60, host_if, endnode_if) logging.info('Multicast UDP average bandwidth: %s Mbits/s', bandwidth_mcast_udp)