mirror of
				https://github.com/espressif/esp-protocols.git
				synced 2025-11-04 00:21:37 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			99 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			99 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
 | 
						|
# SPDX-License-Identifier: Unlicense OR CC0-1.0
 | 
						|
from __future__ import print_function, unicode_literals
 | 
						|
 | 
						|
import subprocess
 | 
						|
import time
 | 
						|
from threading import Event, Thread
 | 
						|
 | 
						|
import netifaces
 | 
						|
 | 
						|
 | 
						|
def is_esp32(port):
 | 
						|
    """
 | 
						|
    Check if the given port is connected to an ESP32 using esptool.
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        result = subprocess.run(
 | 
						|
            ['esptool.py', '--port', port, 'chip_id'],
 | 
						|
            stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True
 | 
						|
        )
 | 
						|
        return 'ESP32' in result.stdout
 | 
						|
    except subprocess.CalledProcessError:
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def run_server(server_stop, port):
 | 
						|
    print('Launching SLIP netif on port: {}'.format(port))
 | 
						|
    try:
 | 
						|
        arg_list = ['sudo', 'slattach', '-v', '-L', '-s', '115200','-p', 'slip', port]
 | 
						|
        p = subprocess.Popen(arg_list, stdout=subprocess.PIPE, bufsize=1)
 | 
						|
        while not server_stop.is_set():
 | 
						|
            if p.poll() is not None:
 | 
						|
                if p.poll() == 16:
 | 
						|
                    print('[SLIP:] Terminated: hang-up received')
 | 
						|
                    break
 | 
						|
                else:
 | 
						|
                    raise ValueError(
 | 
						|
                        'ENV_TEST_FAILURE: SLIP terminated unexpectedly with {}'
 | 
						|
                        .format(p.poll()))
 | 
						|
            time.sleep(0.1)
 | 
						|
    except Exception as e:
 | 
						|
        print(e)
 | 
						|
        raise ValueError('ENV_TEST_FAILURE: Error running SLIP netif')
 | 
						|
    finally:
 | 
						|
        p.terminate()
 | 
						|
    print('SLIP netif stopped')
 | 
						|
 | 
						|
 | 
						|
def test_examples_protocol_slip(dut):
 | 
						|
 | 
						|
    # the PPP test env is reused for SLIP test and it uses three ttyUSB's: two for ESP32 board and another one for the ppp server
 | 
						|
    server_port = None
 | 
						|
    for i in ['/dev/ttyUSB0', '/dev/ttyUSB1', '/dev/ttyUSB2']:
 | 
						|
        if i == dut.serial.port:
 | 
						|
            print(f'DUT port: {i}')
 | 
						|
        elif is_esp32(i):
 | 
						|
            print(f'Some other ESP32: {i}')
 | 
						|
        else:
 | 
						|
            print(f'Port for SLIP: {i}')
 | 
						|
            server_port = i
 | 
						|
    if server_port is None:
 | 
						|
        print('ENV_TEST_FAILURE: Cannot locate SLIP port')
 | 
						|
        raise
 | 
						|
    # Attach to the SLI netif
 | 
						|
    server_stop = Event()
 | 
						|
    t = Thread(target=run_server, args=(server_stop, server_port))
 | 
						|
    t.start()
 | 
						|
    try:
 | 
						|
        # Setups the SLIP interface
 | 
						|
        ppp_server_timeout = time.time() + 30
 | 
						|
        while 'sl0' not in netifaces.interfaces():
 | 
						|
            print(
 | 
						|
                "SLIP netif hasn't appear yet, list of active netifs:{}"
 | 
						|
                .format(netifaces.interfaces()))
 | 
						|
            time.sleep(0.5)
 | 
						|
            if time.time() > ppp_server_timeout:
 | 
						|
                raise ValueError(
 | 
						|
                    'ENV_TEST_FAILURE: SLIP netif failed to setup sl0 interface within timeout'
 | 
						|
                )
 | 
						|
        # Configure the SLIP interface with IP addresses of both ends
 | 
						|
        cmd = ['sudo', 'ifconfig', 'sl0', '10.0.0.1', 'dstaddr', '10.0.0.2']
 | 
						|
        try:
 | 
						|
            subprocess.run(cmd, check=True)
 | 
						|
            print('SLIP interface configured successfully.')
 | 
						|
        except subprocess.CalledProcessError as e:
 | 
						|
            print(f'Failed to configure SLIP interface: {e}')
 | 
						|
        # Ping the SLIP interface with 5 packets and 10 seconds timeout
 | 
						|
        cmd = ['ping', '10.0.0.2', '-c', '5', '-W', '10']
 | 
						|
        try:
 | 
						|
            subprocess.run(cmd, check=True)
 | 
						|
            print('Pinging SLIP interface successfully.')
 | 
						|
        except subprocess.CalledProcessError as e:
 | 
						|
            print(f'Failed to ping SLIP interface: {e}')
 | 
						|
            raise
 | 
						|
 | 
						|
    finally:
 | 
						|
        server_stop.set()
 | 
						|
        t.join()
 |