mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-11-04 00:51:42 +01:00 
			
		
		
		
	CI: add vlan example to ethernet patterns Included a Pytest for the vlan_support example, which focuses on testing the NAPT module in lwip by forwarding packets between different VLAN interfaces.
		
			
				
	
	
		
			323 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			323 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
 | 
						|
# SPDX-License-Identifier: CC0-1.0
 | 
						|
 | 
						|
import ipaddress
 | 
						|
import subprocess
 | 
						|
import threading
 | 
						|
import time
 | 
						|
from typing import Dict, Union
 | 
						|
 | 
						|
import pytest
 | 
						|
from pytest_embedded import Dut
 | 
						|
from scapy import layers
 | 
						|
from scapy.all import ICMP, IP, TCP, UDP, AsyncSniffer
 | 
						|
 | 
						|
udp_port = 1234
 | 
						|
tcp_port = 4321
 | 
						|
 | 
						|
 | 
						|
def run_cmd(command: str, secure: bool=False) -> str:
 | 
						|
    if secure is False:
 | 
						|
        print(f'Running: {command}')
 | 
						|
 | 
						|
    cmd = command.strip().split(' ')
 | 
						|
 | 
						|
    # Run the command
 | 
						|
    proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | 
						|
 | 
						|
    # Send su password if secured
 | 
						|
    if secure is True:
 | 
						|
        proc.communicate(input=b'Esp@32')
 | 
						|
 | 
						|
    # Get the output
 | 
						|
    stdout, stderr = proc.communicate()
 | 
						|
 | 
						|
    # Print the output
 | 
						|
    if secure is False:
 | 
						|
        if len(stdout.decode('utf-8')) > 0:
 | 
						|
            print('Output: ', stdout.decode('utf-8'))
 | 
						|
 | 
						|
    if len(stderr.decode('utf-8')) > 0:
 | 
						|
        print('Error: ', stderr.decode('utf-8'))
 | 
						|
 | 
						|
    return stdout.decode('utf-8')
 | 
						|
 | 
						|
 | 
						|
def run_cmd_sec(command: str) -> str:
 | 
						|
    print(f'Running secured: {command}')
 | 
						|
    return run_cmd(f'sudo -S -k {command}', secure=True)
 | 
						|
 | 
						|
 | 
						|
def clear_network(config: dict) -> None:
 | 
						|
    # delete route
 | 
						|
    for each_cmd in config['delete_route_cmd_l']:
 | 
						|
        run_cmd_sec(each_cmd)
 | 
						|
 | 
						|
    # destroy Vlan interfaces
 | 
						|
    for each_cmd in config['vlan_destroy_cmd_l']:
 | 
						|
        run_cmd_sec(each_cmd)
 | 
						|
 | 
						|
 | 
						|
def setup_network(config: dict) -> None:
 | 
						|
    # Clear network before setting it up
 | 
						|
    clear_network(config)
 | 
						|
 | 
						|
    # Create Vlan interfaces
 | 
						|
    for each_cmd in config['vlan_create_cmd_l']:
 | 
						|
        run_cmd_sec(each_cmd)
 | 
						|
 | 
						|
    # set route
 | 
						|
    for each_cmd in config['set_route_cmd_l']:
 | 
						|
        run_cmd_sec(each_cmd)
 | 
						|
 | 
						|
 | 
						|
def create_config(dut: Dut) -> dict:
 | 
						|
    pc_iface = dut.app.sdkconfig.get('EXAMPLE_VLAN_PYTEST_PC_IFACE')
 | 
						|
    vlanClient_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_ETHERNET_VLAN_ID')),
 | 
						|
                       'name': 'vlanClient',
 | 
						|
                       'ip': dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_ADDR_DEF_GW')}
 | 
						|
    vlanServer_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_EXTRA_ETHERNET_VLAN_ID')),
 | 
						|
                       'name': 'vlanServer',
 | 
						|
                       'ip': dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW')}
 | 
						|
    esp_vlanClient_ip = dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_IPV4_ADDR')
 | 
						|
    esp_vlanServer_ip = dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_IPV4_ADDR')
 | 
						|
 | 
						|
    subnet_mask = ipaddress.IPv4Address('255.255.255.0')
 | 
						|
    tmp_ip = ipaddress.IPv4Address(vlanServer_conf['ip'])
 | 
						|
    vlanServer_net_addr = ipaddress.IPv4Address(int(tmp_ip) & int(subnet_mask))
 | 
						|
 | 
						|
    config: Dict[str, Union[str, dict, dict, str, str, list, list, list, list]] = {
 | 
						|
        # Basic Configurations
 | 
						|
        'pc_iface': pc_iface,
 | 
						|
 | 
						|
        'vlanClient': vlanClient_conf,
 | 
						|
        'vlanServer': vlanServer_conf,
 | 
						|
 | 
						|
        'esp_vlanClient_ip': esp_vlanClient_ip,
 | 
						|
        'esp_vlanServer_ip': esp_vlanServer_ip,
 | 
						|
 | 
						|
        'vlan_create_cmd_l': [f'ip netns add ns_vlanClient',
 | 
						|
                              f"ip link add link {pc_iface} name {vlanClient_conf['name']} type vlan id {vlanClient_conf['id']}",
 | 
						|
                              f"ip link set {vlanClient_conf['name']} netns ns_vlanClient",
 | 
						|
                              f"ip netns exec ns_vlanClient ip addr add {vlanClient_conf['ip']}/255.255.255.0 dev {vlanClient_conf['name']}",
 | 
						|
                              f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} up",
 | 
						|
                              f"ip link add link {pc_iface} name {vlanServer_conf['name']} type vlan id {vlanServer_conf['id']}",
 | 
						|
                              f"ip addr add {vlanServer_conf['ip']}/255.255.255.0 dev {vlanServer_conf['name']}",
 | 
						|
                              f"ip link set dev {vlanServer_conf['name']} up"],
 | 
						|
 | 
						|
        'vlan_destroy_cmd_l': [f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} down",
 | 
						|
                               f"ip netns exec ns_vlanClient ip link delete {vlanClient_conf['name']}",
 | 
						|
                               f"ip link set dev {vlanServer_conf['name']} down",
 | 
						|
                               f"ip link delete {vlanServer_conf['name']}",
 | 
						|
                               f'ip netns delete ns_vlanClient'],
 | 
						|
 | 
						|
        'set_route_cmd_l': [f'ip netns exec ns_vlanClient ip route add {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
 | 
						|
 | 
						|
        'delete_route_cmd_l': [f'ip netns exec ns_vlanClient ip route delete {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
 | 
						|
    }
 | 
						|
 | 
						|
    return config
 | 
						|
 | 
						|
 | 
						|
# Ping Test
 | 
						|
def ping_test(config: dict) -> None:
 | 
						|
 | 
						|
    setup_network(config)
 | 
						|
 | 
						|
    capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='icmp', count=10)
 | 
						|
 | 
						|
    # Start sniffing
 | 
						|
    capture.start()
 | 
						|
 | 
						|
    time.sleep(1)
 | 
						|
    # Run network test commands here
 | 
						|
    print(run_cmd_sec(f"ip netns exec ns_vlanClient ping -I {config['vlanClient']['ip']} {config['vlanServer']['ip']} -c 10"))
 | 
						|
 | 
						|
    # Stop sniffing
 | 
						|
    capture.join(timeout=20)
 | 
						|
    vlanServer_pkt_list = capture.results
 | 
						|
 | 
						|
    clear_network(config)
 | 
						|
 | 
						|
    # Test Validation
 | 
						|
    vlanServer_forward_flag = False
 | 
						|
    vlanServer_return_flag = False
 | 
						|
 | 
						|
    if vlanServer_pkt_list is None:
 | 
						|
        print('Failure: No packets captured')
 | 
						|
        assert False
 | 
						|
 | 
						|
    print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
 | 
						|
    for pkt in vlanServer_pkt_list:
 | 
						|
        print('Summary: ', pkt.summary())
 | 
						|
        if pkt[ICMP].type == 8 and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
 | 
						|
            vlanServer_forward_flag = True
 | 
						|
        if pkt[ICMP].type == 0 and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
 | 
						|
            vlanServer_return_flag = True
 | 
						|
 | 
						|
    assert vlanServer_forward_flag and vlanServer_return_flag
 | 
						|
 | 
						|
 | 
						|
# UDP Test
 | 
						|
def udp_server(serverip: str, port: int) -> None:
 | 
						|
    print(f'UDP server listening on IP: {serverip} port: {port}')
 | 
						|
    run_cmd(f'timeout 10s iperf3 -s -p {port}')
 | 
						|
 | 
						|
 | 
						|
def udp_client(serverip: str, port: int) -> None:
 | 
						|
    time.sleep(1)
 | 
						|
    print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -u -p {port} --bidir -k 20'))
 | 
						|
 | 
						|
 | 
						|
def udp_server_client_comm(config: dict, port: int) -> None:
 | 
						|
    server_thread = threading.Thread(target=udp_server, args=(config['vlanServer']['ip'], port,))
 | 
						|
    client_thread = threading.Thread(target=udp_client, args=(config['vlanServer']['ip'], port))
 | 
						|
 | 
						|
    server_thread.start()
 | 
						|
    client_thread.start()
 | 
						|
 | 
						|
    server_thread.join()
 | 
						|
    client_thread.join()
 | 
						|
    print('UDP Test Done')
 | 
						|
 | 
						|
 | 
						|
def udp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
 | 
						|
    return UDP in packet and (packet[UDP].dport == udp_port or packet[UDP].sport == udp_port)
 | 
						|
 | 
						|
 | 
						|
def udp_test(config: dict) -> None:
 | 
						|
 | 
						|
    setup_network(config)
 | 
						|
 | 
						|
    capture = AsyncSniffer(iface=config['vlanServer']['name'],
 | 
						|
                           filter='udp',
 | 
						|
                           lfilter=udp_lfilter,
 | 
						|
                           count=10)
 | 
						|
 | 
						|
    # Start sniffing
 | 
						|
    capture.start()
 | 
						|
 | 
						|
    time.sleep(1)
 | 
						|
 | 
						|
    # Run network test commands here
 | 
						|
    udp_server_client_comm(config, udp_port)
 | 
						|
 | 
						|
    # Stop sniffing
 | 
						|
    capture.join(timeout=20)
 | 
						|
    vlanServer_pkt_list = capture.results
 | 
						|
 | 
						|
    clear_network(config)
 | 
						|
 | 
						|
    # Test Validation
 | 
						|
    vlanServer_forward_flag = False
 | 
						|
    vlanServer_return_flag = False
 | 
						|
 | 
						|
    if vlanServer_pkt_list is None:
 | 
						|
        print('Failure: No packets captured')
 | 
						|
        assert False
 | 
						|
 | 
						|
    print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
 | 
						|
    for pkt in vlanServer_pkt_list:
 | 
						|
        print('Summary: ', pkt.summary())
 | 
						|
        if UDP in pkt:
 | 
						|
            if pkt[UDP].dport == udp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
 | 
						|
                vlanServer_forward_flag = True
 | 
						|
            if pkt[UDP].sport == udp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
 | 
						|
                vlanServer_return_flag = True
 | 
						|
 | 
						|
    assert vlanServer_forward_flag and vlanServer_return_flag
 | 
						|
 | 
						|
 | 
						|
# TCP Test
 | 
						|
def tcp_server(serverip: str, port: int) -> None:
 | 
						|
    print(f'TCP server listening on IP: {serverip} port: {port}')
 | 
						|
    run_cmd(f'timeout 10s iperf3 -s -p {port}')
 | 
						|
 | 
						|
 | 
						|
def tcp_client(serverip: str, port: int) -> None:
 | 
						|
    time.sleep(1)
 | 
						|
    print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -p {port} --bidir -k 20'))
 | 
						|
 | 
						|
 | 
						|
def tcp_server_client_comm(config: dict, port: int) -> None:
 | 
						|
    server_thread = threading.Thread(target=tcp_server, args=(config['vlanServer']['ip'], port,))
 | 
						|
    client_thread = threading.Thread(target=tcp_client, args=(config['vlanServer']['ip'], port))
 | 
						|
 | 
						|
    server_thread.start()
 | 
						|
    client_thread.start()
 | 
						|
 | 
						|
    server_thread.join()
 | 
						|
    client_thread.join()
 | 
						|
    print('TCP Test Done')
 | 
						|
 | 
						|
 | 
						|
def tcp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
 | 
						|
    return TCP in packet and (packet[TCP].dport == tcp_port or packet[TCP].sport == tcp_port)
 | 
						|
 | 
						|
 | 
						|
def tcp_test(config: dict) -> None:
 | 
						|
 | 
						|
    setup_network(config)
 | 
						|
 | 
						|
    capture = AsyncSniffer(iface=config['vlanServer']['name'],
 | 
						|
                           filter='tcp',
 | 
						|
                           lfilter=tcp_lfilter,
 | 
						|
                           count=10)
 | 
						|
 | 
						|
    # Start sniffing
 | 
						|
    capture.start()
 | 
						|
 | 
						|
    time.sleep(1)
 | 
						|
 | 
						|
    # Run network test commands here
 | 
						|
    tcp_server_client_comm(config, tcp_port)
 | 
						|
 | 
						|
    # Stop sniffing
 | 
						|
    capture.join(timeout=20)
 | 
						|
    vlanServer_pkt_list = capture.results
 | 
						|
 | 
						|
    clear_network(config)
 | 
						|
 | 
						|
    # Test Validation
 | 
						|
    vlanServer_forward_flag = False
 | 
						|
    vlanServer_return_flag = False
 | 
						|
 | 
						|
    if vlanServer_pkt_list is None:
 | 
						|
        print('Failure: No packets captured')
 | 
						|
        assert False
 | 
						|
 | 
						|
    print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
 | 
						|
    for pkt in vlanServer_pkt_list:
 | 
						|
        print('Summary: ', pkt.summary())
 | 
						|
        if TCP in pkt:
 | 
						|
            if pkt[TCP].dport == tcp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
 | 
						|
                vlanServer_forward_flag = True
 | 
						|
            if pkt[TCP].sport == tcp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
 | 
						|
                vlanServer_return_flag = True
 | 
						|
 | 
						|
    assert vlanServer_forward_flag and vlanServer_return_flag
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.esp32
 | 
						|
@pytest.mark.ethernet_vlan
 | 
						|
def test_vlan_napt_pingtest(dut: Dut) -> None:
 | 
						|
    dut.expect('main_task: Returned from app_main()')
 | 
						|
    test_conf = create_config(dut)
 | 
						|
    ping_test(test_conf)
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.esp32
 | 
						|
@pytest.mark.ethernet_vlan
 | 
						|
def test_vlan_napt_udptest(dut: Dut) -> None:
 | 
						|
    dut.expect('main_task: Returned from app_main()')
 | 
						|
    test_conf = create_config(dut)
 | 
						|
    udp_test(test_conf)
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.esp32
 | 
						|
@pytest.mark.ethernet_vlan
 | 
						|
def test_vlan_napt_tcptest(dut: Dut) -> None:
 | 
						|
    dut.expect('main_task: Returned from app_main()')
 | 
						|
    test_conf = create_config(dut)
 | 
						|
    tcp_test(test_conf)
 |