mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-11-04 00:51:42 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			744 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			744 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Test case for iperf example.
 | 
						|
 | 
						|
This test case might have problem running on windows:
 | 
						|
 | 
						|
1. direct use of `make`
 | 
						|
2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
 | 
						|
 | 
						|
The test env Example_ShieldBox do need the following config::
 | 
						|
 | 
						|
  Example_ShieldBox:
 | 
						|
    ap_list:
 | 
						|
      - ssid: "ssid"
 | 
						|
        password: "password"
 | 
						|
        outlet: 1
 | 
						|
    apc_ip: "192.168.1.88"
 | 
						|
    attenuator_port: "/dev/ttyUSB0"
 | 
						|
    iperf: "/dev/ttyUSB1"
 | 
						|
    apc_ip: "192.168.1.88"
 | 
						|
    pc_nic: "eth0"
 | 
						|
"""
 | 
						|
from __future__ import division, unicode_literals
 | 
						|
 | 
						|
import os
 | 
						|
import re
 | 
						|
import subprocess
 | 
						|
import time
 | 
						|
from builtins import object, range, str
 | 
						|
 | 
						|
import ttfw_idf
 | 
						|
from idf_iperf_test_util import Attenuator, LineChart, PowerControl, TestReport
 | 
						|
from tiny_test_fw import DUT, TinyFW, Utility
 | 
						|
 | 
						|
# configurations
 | 
						|
TEST_TIME = TEST_TIMEOUT = 60
 | 
						|
WAIT_AP_POWER_ON_TIMEOUT = 90
 | 
						|
SCAN_TIMEOUT = 3
 | 
						|
SCAN_RETRY_COUNT = 3
 | 
						|
RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
 | 
						|
ATTEN_VALUE_LIST = range(0, 60, 2)
 | 
						|
 | 
						|
# constants
 | 
						|
FAILED_TO_SCAN_RSSI = -97
 | 
						|
INVALID_HEAP_SIZE = 0xFFFFFFFF
 | 
						|
 | 
						|
PC_IPERF_TEMP_LOG_FILE = '.tmp_iperf.log'
 | 
						|
CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)')
 | 
						|
 | 
						|
# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
 | 
						|
# Using numbers for config will make this easy.
 | 
						|
# Use default value `99` for config with best performance.
 | 
						|
BEST_PERFORMANCE_CONFIG = '99'
 | 
						|
 | 
						|
 | 
						|
class TestResult(object):
 | 
						|
    """ record, analysis test result and convert data to output format """
 | 
						|
 | 
						|
    PC_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+).0\s*-\s*(\d+).0\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits/sec')
 | 
						|
    DUT_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+)-\s+(\d+)\s+sec\s+([\d.]+)\s+Mbits/sec')
 | 
						|
 | 
						|
    ZERO_POINT_THRESHOLD = -88  # RSSI, dbm
 | 
						|
    ZERO_THROUGHPUT_THRESHOLD = -92  # RSSI, dbm
 | 
						|
    BAD_POINT_RSSI_THRESHOLD = -75  # RSSI, dbm
 | 
						|
    BAD_POINT_MIN_THRESHOLD = 10  # Mbps
 | 
						|
    BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
 | 
						|
 | 
						|
    # we need at least 1/2 valid points to qualify the test result
 | 
						|
    THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
 | 
						|
 | 
						|
    RSSI_RANGE = [-x for x in range(10, 100)]
 | 
						|
    ATT_RANGE = [x for x in range(0, 64)]
 | 
						|
 | 
						|
    def __init__(self, proto, direction, config_name):
 | 
						|
        self.proto = proto
 | 
						|
        self.direction = direction
 | 
						|
        self.config_name = config_name
 | 
						|
        self.throughput_by_rssi = dict()
 | 
						|
        self.throughput_by_att = dict()
 | 
						|
        self.att_rssi_map = dict()
 | 
						|
        self.heap_size = INVALID_HEAP_SIZE
 | 
						|
        self.error_list = []
 | 
						|
 | 
						|
    def _save_result(self, throughput, ap_ssid, att, rssi, heap_size):
 | 
						|
        """
 | 
						|
        save the test results:
 | 
						|
 | 
						|
        * record the better throughput if att/rssi is the same.
 | 
						|
        * record the min heap size.
 | 
						|
        """
 | 
						|
        if ap_ssid not in self.att_rssi_map:
 | 
						|
            # for new ap, create empty dict()
 | 
						|
            self.throughput_by_att[ap_ssid] = dict()
 | 
						|
            self.throughput_by_rssi[ap_ssid] = dict()
 | 
						|
            self.att_rssi_map[ap_ssid] = dict()
 | 
						|
 | 
						|
        self.att_rssi_map[ap_ssid][att] = rssi
 | 
						|
 | 
						|
        def record_throughput(database, key_value):
 | 
						|
            try:
 | 
						|
                # we save the larger value for same att
 | 
						|
                if throughput > database[ap_ssid][key_value]:
 | 
						|
                    database[ap_ssid][key_value] = throughput
 | 
						|
            except KeyError:
 | 
						|
                database[ap_ssid][key_value] = throughput
 | 
						|
 | 
						|
        record_throughput(self.throughput_by_att, att)
 | 
						|
        record_throughput(self.throughput_by_rssi, rssi)
 | 
						|
 | 
						|
        if int(heap_size) < self.heap_size:
 | 
						|
            self.heap_size = int(heap_size)
 | 
						|
 | 
						|
    def add_result(self, raw_data, ap_ssid, att, rssi, heap_size):
 | 
						|
        """
 | 
						|
        add result for one test
 | 
						|
 | 
						|
        :param raw_data: iperf raw data
 | 
						|
        :param ap_ssid: ap ssid that tested
 | 
						|
        :param att: attenuate value
 | 
						|
        :param rssi: AP RSSI
 | 
						|
        :param heap_size: min heap size during test
 | 
						|
        :return: throughput
 | 
						|
        """
 | 
						|
        fall_to_0_recorded = 0
 | 
						|
        throughput_list = []
 | 
						|
        result_list = self.PC_BANDWIDTH_LOG_PATTERN.findall(raw_data)
 | 
						|
        if not result_list:
 | 
						|
            # failed to find raw data by PC pattern, it might be DUT pattern
 | 
						|
            result_list = self.DUT_BANDWIDTH_LOG_PATTERN.findall(raw_data)
 | 
						|
 | 
						|
        for result in result_list:
 | 
						|
            if int(result[1]) - int(result[0]) != 1:
 | 
						|
                # this could be summary, ignore this
 | 
						|
                continue
 | 
						|
            throughput_list.append(float(result[2]))
 | 
						|
            if float(result[2]) == 0 and rssi > self.ZERO_POINT_THRESHOLD \
 | 
						|
                    and fall_to_0_recorded < 1:
 | 
						|
                # throughput fall to 0 error. we only record 1 records for one test
 | 
						|
                self.error_list.append('[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}'
 | 
						|
                                       .format(ap_ssid, att, rssi, result[0], result[1]))
 | 
						|
                fall_to_0_recorded += 1
 | 
						|
 | 
						|
        if len(throughput_list) > self.THROUGHPUT_QUALIFY_COUNT:
 | 
						|
            throughput = sum(throughput_list) / len(throughput_list)
 | 
						|
        else:
 | 
						|
            throughput = 0.0
 | 
						|
 | 
						|
        if throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD:
 | 
						|
            self.error_list.append('[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found'
 | 
						|
                                   .format(ap_ssid, att, rssi))
 | 
						|
 | 
						|
        self._save_result(throughput, ap_ssid, att, rssi, heap_size)
 | 
						|
 | 
						|
        return throughput
 | 
						|
 | 
						|
    def post_analysis(self):
 | 
						|
        """
 | 
						|
        some rules need to be checked after we collected all test raw data:
 | 
						|
 | 
						|
        1. throughput value 30% worse than the next point with lower RSSI
 | 
						|
        2. throughput value 30% worse than the next point with larger attenuate
 | 
						|
        """
 | 
						|
        def analysis_bad_point(data, index_type):
 | 
						|
            for ap_ssid in data:
 | 
						|
                result_dict = data[ap_ssid]
 | 
						|
                index_list = list(result_dict.keys())
 | 
						|
                index_list.sort()
 | 
						|
                if index_type == 'att':
 | 
						|
                    index_list.reverse()
 | 
						|
 | 
						|
                for i, index_value in enumerate(index_list[1:]):
 | 
						|
                    if index_value < self.BAD_POINT_RSSI_THRESHOLD or \
 | 
						|
                            result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD:
 | 
						|
                        continue
 | 
						|
                    _percentage = result_dict[index_value] / result_dict[index_list[i]]
 | 
						|
                    if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD:
 | 
						|
                        self.error_list.append('[Error][Bad point][{}][{}: {}]: drop {:.02f}%'
 | 
						|
                                               .format(ap_ssid, index_type, index_value,
 | 
						|
                                                       (1 - _percentage) * 100))
 | 
						|
 | 
						|
        analysis_bad_point(self.throughput_by_rssi, 'rssi')
 | 
						|
        analysis_bad_point(self.throughput_by_att, 'att')
 | 
						|
 | 
						|
    def draw_throughput_figure(self, path, ap_ssid, draw_type):
 | 
						|
        """
 | 
						|
        :param path: folder to save figure. make sure the folder is already created.
 | 
						|
        :param ap_ssid: ap ssid string or a list of ap ssid string
 | 
						|
        :param draw_type: "att" or "rssi"
 | 
						|
        :return: file_name
 | 
						|
        """
 | 
						|
        if draw_type == 'rssi':
 | 
						|
            type_name = 'RSSI'
 | 
						|
            data = self.throughput_by_rssi
 | 
						|
            range_list = self.RSSI_RANGE
 | 
						|
        elif draw_type == 'att':
 | 
						|
            type_name = 'Att'
 | 
						|
            data = self.throughput_by_att
 | 
						|
            range_list = self.ATT_RANGE
 | 
						|
        else:
 | 
						|
            raise AssertionError('draw type not supported')
 | 
						|
        if isinstance(ap_ssid, list):
 | 
						|
            file_name = 'ThroughputVs{}_{}_{}_{}.html'.format(type_name, self.proto, self.direction,
 | 
						|
                                                              hash(ap_ssid)[:6])
 | 
						|
        else:
 | 
						|
            file_name = 'ThroughputVs{}_{}_{}_{}.html'.format(type_name, self.proto, self.direction, ap_ssid)
 | 
						|
 | 
						|
        LineChart.draw_line_chart(os.path.join(path, file_name),
 | 
						|
                                  'Throughput Vs {} ({} {})'.format(type_name, self.proto, self.direction),
 | 
						|
                                  '{} (dbm)'.format(type_name),
 | 
						|
                                  'Throughput (Mbps)',
 | 
						|
                                  data, range_list)
 | 
						|
        return file_name
 | 
						|
 | 
						|
    def draw_rssi_vs_att_figure(self, path, ap_ssid):
 | 
						|
        """
 | 
						|
        :param path: folder to save figure. make sure the folder is already created.
 | 
						|
        :param ap_ssid: ap to use
 | 
						|
        :return: file_name
 | 
						|
        """
 | 
						|
        if isinstance(ap_ssid, list):
 | 
						|
            file_name = 'AttVsRSSI_{}.html'.format(hash(ap_ssid)[:6])
 | 
						|
        else:
 | 
						|
            file_name = 'AttVsRSSI_{}.html'.format(ap_ssid)
 | 
						|
        LineChart.draw_line_chart(os.path.join(path, file_name),
 | 
						|
                                  'Att Vs RSSI',
 | 
						|
                                  'Att (dbm)',
 | 
						|
                                  'RSSI (dbm)',
 | 
						|
                                  self.att_rssi_map,
 | 
						|
                                  self.ATT_RANGE)
 | 
						|
        return file_name
 | 
						|
 | 
						|
    def get_best_throughput(self):
 | 
						|
        """  get the best throughput during test """
 | 
						|
        best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
 | 
						|
                        for ap_ssid in self.throughput_by_att]
 | 
						|
        return max(best_for_aps)
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        """
 | 
						|
        returns summary for this test:
 | 
						|
 | 
						|
        1. test result (success or fail)
 | 
						|
        2. best performance for each AP
 | 
						|
        3. min free heap size during test
 | 
						|
        """
 | 
						|
        if self.throughput_by_att:
 | 
						|
            ret = '[{}_{}][{}]: {}\r\n\r\n'.format(self.proto, self.direction, self.config_name,
 | 
						|
                                                   'Fail' if self.error_list else 'Success')
 | 
						|
            ret += 'Performance for each AP:\r\n'
 | 
						|
            for ap_ssid in self.throughput_by_att:
 | 
						|
                ret += '[{}]: {:.02f} Mbps\r\n'.format(ap_ssid, max(self.throughput_by_att[ap_ssid].values()))
 | 
						|
            if self.heap_size != INVALID_HEAP_SIZE:
 | 
						|
                ret += 'Minimum heap size: {}'.format(self.heap_size)
 | 
						|
        else:
 | 
						|
            ret = ''
 | 
						|
        return ret
 | 
						|
 | 
						|
 | 
						|
class IperfTestUtility(object):
 | 
						|
    """ iperf test implementation """
 | 
						|
 | 
						|
    def __init__(self, dut, config_name, ap_ssid, ap_password,
 | 
						|
                 pc_nic_ip, pc_iperf_log_file, test_result=None):
 | 
						|
        self.config_name = config_name
 | 
						|
        self.dut = dut
 | 
						|
 | 
						|
        self.pc_iperf_log_file = pc_iperf_log_file
 | 
						|
        self.ap_ssid = ap_ssid
 | 
						|
        self.ap_password = ap_password
 | 
						|
        self.pc_nic_ip = pc_nic_ip
 | 
						|
 | 
						|
        if test_result:
 | 
						|
            self.test_result = test_result
 | 
						|
        else:
 | 
						|
            self.test_result = {
 | 
						|
                'tcp_tx': TestResult('tcp', 'tx', config_name),
 | 
						|
                'tcp_rx': TestResult('tcp', 'rx', config_name),
 | 
						|
                'udp_tx': TestResult('udp', 'tx', config_name),
 | 
						|
                'udp_rx': TestResult('udp', 'rx', config_name),
 | 
						|
            }
 | 
						|
 | 
						|
    def setup(self):
 | 
						|
        """
 | 
						|
        setup iperf test:
 | 
						|
 | 
						|
        1. kill current iperf process
 | 
						|
        2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
 | 
						|
        3. scan to get AP RSSI
 | 
						|
        4. connect to AP
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            subprocess.check_output('sudo killall iperf 2>&1 > /dev/null', shell=True)
 | 
						|
        except subprocess.CalledProcessError:
 | 
						|
            pass
 | 
						|
        self.dut.write('restart')
 | 
						|
        self.dut.expect_any('iperf>', 'esp32>')
 | 
						|
        self.dut.write('scan {}'.format(self.ap_ssid))
 | 
						|
        for _ in range(SCAN_RETRY_COUNT):
 | 
						|
            try:
 | 
						|
                rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
 | 
						|
                                           timeout=SCAN_TIMEOUT)[0])
 | 
						|
                break
 | 
						|
            except DUT.ExpectTimeout:
 | 
						|
                continue
 | 
						|
        else:
 | 
						|
            raise AssertionError('Failed to scan AP')
 | 
						|
        self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
 | 
						|
        dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
 | 
						|
        return dut_ip, rssi
 | 
						|
 | 
						|
    def _save_test_result(self, test_case, raw_data, att, rssi, heap_size):
 | 
						|
        return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
 | 
						|
 | 
						|
    def _test_once(self, proto, direction):
 | 
						|
        """ do measure once for one type """
 | 
						|
        # connect and scan to get RSSI
 | 
						|
        dut_ip, rssi = self.setup()
 | 
						|
 | 
						|
        assert direction in ['rx', 'tx']
 | 
						|
        assert proto in ['tcp', 'udp']
 | 
						|
 | 
						|
        # run iperf test
 | 
						|
        if direction == 'tx':
 | 
						|
            with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
 | 
						|
                if proto == 'tcp':
 | 
						|
                    process = subprocess.Popen(['iperf', '-s', '-B', self.pc_nic_ip,
 | 
						|
                                                '-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
 | 
						|
                                               stdout=f, stderr=f)
 | 
						|
                    self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
 | 
						|
                else:
 | 
						|
                    process = subprocess.Popen(['iperf', '-s', '-u', '-B', self.pc_nic_ip,
 | 
						|
                                                '-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
 | 
						|
                                               stdout=f, stderr=f)
 | 
						|
                    self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
 | 
						|
 | 
						|
                for _ in range(TEST_TIMEOUT):
 | 
						|
                    if process.poll() is not None:
 | 
						|
                        break
 | 
						|
                    time.sleep(1)
 | 
						|
                else:
 | 
						|
                    process.terminate()
 | 
						|
 | 
						|
            with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
 | 
						|
                pc_raw_data = server_raw_data = f.read()
 | 
						|
        else:
 | 
						|
            with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
 | 
						|
                if proto == 'tcp':
 | 
						|
                    self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
 | 
						|
                    # wait until DUT TCP server created
 | 
						|
                    try:
 | 
						|
                        self.dut.expect('iperf tcp server create successfully', timeout=1)
 | 
						|
                    except DUT.ExpectTimeout:
 | 
						|
                        # compatible with old iperf example binary
 | 
						|
                        pass
 | 
						|
                    process = subprocess.Popen(['iperf', '-c', dut_ip,
 | 
						|
                                                '-t', str(TEST_TIME), '-f', 'm'],
 | 
						|
                                               stdout=f, stderr=f)
 | 
						|
                else:
 | 
						|
                    self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
 | 
						|
                    process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', '100M',
 | 
						|
                                                '-t', str(TEST_TIME), '-f', 'm'],
 | 
						|
                                               stdout=f, stderr=f)
 | 
						|
 | 
						|
                for _ in range(TEST_TIMEOUT):
 | 
						|
                    if process.poll() is not None:
 | 
						|
                        break
 | 
						|
                    time.sleep(1)
 | 
						|
                else:
 | 
						|
                    process.terminate()
 | 
						|
 | 
						|
            server_raw_data = self.dut.read()
 | 
						|
            with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
 | 
						|
                pc_raw_data = f.read()
 | 
						|
 | 
						|
        # save PC iperf logs to console
 | 
						|
        with open(self.pc_iperf_log_file, 'a+') as f:
 | 
						|
            f.write('## [{}] `{}`\r\n##### {}'
 | 
						|
                    .format(self.config_name,
 | 
						|
                            '{}_{}'.format(proto, direction),
 | 
						|
                            time.strftime('%m-%d %H:%M:%S', time.localtime(time.time()))))
 | 
						|
            f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
 | 
						|
        self.dut.write('heap')
 | 
						|
        heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
 | 
						|
 | 
						|
        # return server raw data (for parsing test results) and RSSI
 | 
						|
        return server_raw_data, rssi, heap_size
 | 
						|
 | 
						|
    def run_test(self, proto, direction, atten_val):
 | 
						|
        """
 | 
						|
        run test for one type, with specified atten_value and save the test result
 | 
						|
 | 
						|
        :param proto: tcp or udp
 | 
						|
        :param direction: tx or rx
 | 
						|
        :param atten_val: attenuate value
 | 
						|
        """
 | 
						|
        rssi = FAILED_TO_SCAN_RSSI
 | 
						|
        heap_size = INVALID_HEAP_SIZE
 | 
						|
        try:
 | 
						|
            server_raw_data, rssi, heap_size = self._test_once(proto, direction)
 | 
						|
            throughput = self._save_test_result('{}_{}'.format(proto, direction),
 | 
						|
                                                server_raw_data, atten_val,
 | 
						|
                                                rssi, heap_size)
 | 
						|
            Utility.console_log('[{}][{}_{}][{}][{}]: {:.02f}'
 | 
						|
                                .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
 | 
						|
        except Exception as e:
 | 
						|
            self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size)
 | 
						|
            Utility.console_log('Failed during test: {}'.format(e))
 | 
						|
 | 
						|
    def run_all_cases(self, atten_val):
 | 
						|
        """
 | 
						|
        run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
 | 
						|
 | 
						|
        :param atten_val: attenuate value
 | 
						|
        """
 | 
						|
        self.run_test('tcp', 'tx', atten_val)
 | 
						|
        self.run_test('tcp', 'rx', atten_val)
 | 
						|
        self.run_test('udp', 'tx', atten_val)
 | 
						|
        self.run_test('udp', 'rx', atten_val)
 | 
						|
 | 
						|
    def wait_ap_power_on(self):
 | 
						|
        """
 | 
						|
        AP need to take sometime to power on. It changes for different APs.
 | 
						|
        This method will scan to check if the AP powers on.
 | 
						|
 | 
						|
        :return: True or False
 | 
						|
        """
 | 
						|
        self.dut.write('restart')
 | 
						|
        self.dut.expect_any('iperf>', 'esp32>')
 | 
						|
        for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
 | 
						|
            try:
 | 
						|
                self.dut.write('scan {}'.format(self.ap_ssid))
 | 
						|
                self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
 | 
						|
                                timeout=SCAN_TIMEOUT)
 | 
						|
                ret = True
 | 
						|
                break
 | 
						|
            except DUT.ExpectTimeout:
 | 
						|
                pass
 | 
						|
        else:
 | 
						|
            ret = False
 | 
						|
        return ret
 | 
						|
 | 
						|
 | 
						|
class IperfTestUtilitySoftap(IperfTestUtility):
 | 
						|
    """ iperf test implementation """
 | 
						|
    def __init__(self, dut, softap_dut, config_name, test_result=None):
 | 
						|
        super(IperfTestUtility, self).__init__(dut, config_name, 'softap', '1234567890', None, None, test_result=None)
 | 
						|
        self.softap_dut = softap_dut
 | 
						|
        self.softap_ip = '192.168.4.1'
 | 
						|
 | 
						|
    def setup(self):
 | 
						|
        """
 | 
						|
        setup iperf test:
 | 
						|
 | 
						|
        1. kill current iperf process
 | 
						|
        2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
 | 
						|
        3. scan to get AP RSSI
 | 
						|
        4. connect to AP
 | 
						|
        """
 | 
						|
        self.softap_dut.write('restart')
 | 
						|
        self.softap_dut.expect_any('iperf>', 'esp32>', timeout=30)
 | 
						|
        self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password))
 | 
						|
        self.dut.write('restart')
 | 
						|
        self.dut.expect_any('iperf>', 'esp32>', timeout=30)
 | 
						|
        self.dut.write('scan {}'.format(self.ap_ssid))
 | 
						|
        for _ in range(SCAN_RETRY_COUNT):
 | 
						|
            try:
 | 
						|
                rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
 | 
						|
                                           timeout=SCAN_TIMEOUT)[0])
 | 
						|
                break
 | 
						|
            except DUT.ExpectTimeout:
 | 
						|
                continue
 | 
						|
        else:
 | 
						|
            raise AssertionError('Failed to scan AP')
 | 
						|
        self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
 | 
						|
        dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
 | 
						|
        return dut_ip, rssi
 | 
						|
 | 
						|
    def _test_once(self, proto, direction):
 | 
						|
        """ do measure once for one type """
 | 
						|
        # connect and scan to get RSSI
 | 
						|
        dut_ip, rssi = self.setup()
 | 
						|
 | 
						|
        assert direction in ['rx', 'tx']
 | 
						|
        assert proto in ['tcp', 'udp']
 | 
						|
 | 
						|
        # run iperf test
 | 
						|
        if direction == 'tx':
 | 
						|
            if proto == 'tcp':
 | 
						|
                self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
 | 
						|
                # wait until DUT TCP server created
 | 
						|
                try:
 | 
						|
                    self.softap_dut.expect('iperf tcp server create successfully', timeout=1)
 | 
						|
                except DUT.ExpectTimeout:
 | 
						|
                    # compatible with old iperf example binary
 | 
						|
                    pass
 | 
						|
                self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
 | 
						|
            else:
 | 
						|
                self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
 | 
						|
                self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
 | 
						|
        else:
 | 
						|
            if proto == 'tcp':
 | 
						|
                self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
 | 
						|
                # wait until DUT TCP server created
 | 
						|
                try:
 | 
						|
                    self.dut.expect('iperf tcp server create successfully', timeout=1)
 | 
						|
                except DUT.ExpectTimeout:
 | 
						|
                    # compatible with old iperf example binary
 | 
						|
                    pass
 | 
						|
                self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME))
 | 
						|
            else:
 | 
						|
                self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
 | 
						|
                self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME))
 | 
						|
        time.sleep(60)
 | 
						|
 | 
						|
        if direction == 'tx':
 | 
						|
            server_raw_data = self.dut.read()
 | 
						|
        else:
 | 
						|
            server_raw_data = self.softap_dut.read()
 | 
						|
        self.dut.write('iperf -a')
 | 
						|
        self.softap_dut.write('iperf -a')
 | 
						|
        self.dut.write('heap')
 | 
						|
        heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
 | 
						|
 | 
						|
        # return server raw data (for parsing test results) and RSSI
 | 
						|
        return server_raw_data, rssi, heap_size
 | 
						|
 | 
						|
 | 
						|
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', target=['ESP32', 'ESP32S2', 'ESP32C3'], category='stress')
 | 
						|
def test_wifi_throughput_with_different_configs(env, extra_data):
 | 
						|
    """
 | 
						|
    steps: |
 | 
						|
      1. build iperf with specified configs
 | 
						|
      2. test throughput for all routers
 | 
						|
    """
 | 
						|
    pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
 | 
						|
    pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
 | 
						|
    ap_info = {
 | 
						|
        'ssid': env.get_variable('ap_ssid'),
 | 
						|
        'password': env.get_variable('ap_password'),
 | 
						|
    }
 | 
						|
 | 
						|
    config_names_raw = subprocess.check_output(['ls', os.path.dirname(os.path.abspath(__file__))])
 | 
						|
    config_names = CONFIG_NAME_PATTERN.findall(config_names_raw)
 | 
						|
    if not config_names:
 | 
						|
        raise ValueError('no configs found in {}'.format(os.path.dirname(__file__)))
 | 
						|
 | 
						|
    test_result = dict()
 | 
						|
    sdkconfig_files = dict()
 | 
						|
 | 
						|
    for config_name in config_names:
 | 
						|
        # 1. get the config
 | 
						|
        sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__),
 | 
						|
                                                    'sdkconfig.ci.{}'.format(config_name))
 | 
						|
 | 
						|
        # 2. get DUT and download
 | 
						|
        dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=config_name)
 | 
						|
        dut.start_app()
 | 
						|
        dut.expect_any('iperf>', 'esp32>')
 | 
						|
 | 
						|
        # 3. run test for each required att value
 | 
						|
        test_result[config_name] = {
 | 
						|
            'tcp_tx': TestResult('tcp', 'tx', config_name),
 | 
						|
            'tcp_rx': TestResult('tcp', 'rx', config_name),
 | 
						|
            'udp_tx': TestResult('udp', 'tx', config_name),
 | 
						|
            'udp_rx': TestResult('udp', 'rx', config_name),
 | 
						|
        }
 | 
						|
 | 
						|
        test_utility = IperfTestUtility(dut, config_name, ap_info['ssid'],
 | 
						|
                                        ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result[config_name])
 | 
						|
 | 
						|
        for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
 | 
						|
            test_utility.run_all_cases(0)
 | 
						|
 | 
						|
        for result_type in test_result[config_name]:
 | 
						|
            summary = str(test_result[config_name][result_type])
 | 
						|
            if summary:
 | 
						|
                Utility.console_log(summary, color='orange')
 | 
						|
 | 
						|
        # 4. check test results
 | 
						|
        env.close_dut('iperf')
 | 
						|
 | 
						|
    # 5. generate report
 | 
						|
    report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, 'ThroughputForConfigsReport'),
 | 
						|
                                                   ap_info['ssid'], test_result, sdkconfig_files)
 | 
						|
    report.generate_report()
 | 
						|
 | 
						|
 | 
						|
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox', target=['ESP32', 'ESP32S2', 'ESP32C3'], category='stress')
 | 
						|
def test_wifi_throughput_vs_rssi(env, extra_data):
 | 
						|
    """
 | 
						|
    steps: |
 | 
						|
      1. build with best performance config
 | 
						|
      2. switch on one router
 | 
						|
      3. set attenuator value from 0-60 for each router
 | 
						|
      4. test TCP tx rx and UDP tx rx throughput
 | 
						|
    """
 | 
						|
    att_port = env.get_variable('attenuator_port')
 | 
						|
    ap_list = env.get_variable('ap_list')
 | 
						|
    pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
 | 
						|
    apc_ip = env.get_variable('apc_ip')
 | 
						|
    pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
 | 
						|
 | 
						|
    test_result = {
 | 
						|
        'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
 | 
						|
    }
 | 
						|
 | 
						|
    # 1. get DUT and download
 | 
						|
    dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
 | 
						|
    dut.start_app()
 | 
						|
    dut.expect_any('iperf>', 'esp32>')
 | 
						|
 | 
						|
    # 2. run test for each required att value
 | 
						|
    for ap_info in ap_list:
 | 
						|
        test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'],
 | 
						|
                                        pc_nic_ip, pc_iperf_log_file, test_result)
 | 
						|
 | 
						|
        PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF')
 | 
						|
        PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'})
 | 
						|
        Attenuator.set_att(att_port, 0)
 | 
						|
 | 
						|
        if not test_utility.wait_ap_power_on():
 | 
						|
            Utility.console_log('[{}] failed to power on, skip testing this AP'
 | 
						|
                                .format(ap_info['ssid']), color='red')
 | 
						|
            continue
 | 
						|
 | 
						|
        for atten_val in ATTEN_VALUE_LIST:
 | 
						|
            assert Attenuator.set_att(att_port, atten_val) is True
 | 
						|
            test_utility.run_all_cases(atten_val)
 | 
						|
 | 
						|
    # 3. check test results
 | 
						|
    env.close_dut('iperf')
 | 
						|
 | 
						|
    # 4. generate report
 | 
						|
    report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'STAThroughputVsRssiReport'),
 | 
						|
                                               test_result)
 | 
						|
    report.generate_report()
 | 
						|
 | 
						|
 | 
						|
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic',
 | 
						|
                           target=['ESP32', 'ESP32S2', 'ESP32C3'], ci_target=['ESP32'])
 | 
						|
def test_wifi_throughput_basic(env, extra_data):
 | 
						|
    """
 | 
						|
    steps: |
 | 
						|
      1. test TCP tx rx and UDP tx rx throughput
 | 
						|
      2. compare with the pre-defined pass standard
 | 
						|
    """
 | 
						|
    pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
 | 
						|
    pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
 | 
						|
    ap_info = {
 | 
						|
        'ssid': env.get_variable('ap_ssid'),
 | 
						|
        'password': env.get_variable('ap_password'),
 | 
						|
    }
 | 
						|
 | 
						|
    # 1. get DUT
 | 
						|
    dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
 | 
						|
    dut.start_app()
 | 
						|
    dut.expect_any('iperf>', 'esp32>')
 | 
						|
 | 
						|
    # 2. preparing
 | 
						|
    test_result = {
 | 
						|
        'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
 | 
						|
    }
 | 
						|
 | 
						|
    test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'],
 | 
						|
                                    ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result)
 | 
						|
 | 
						|
    # 3. run test for TCP Tx, Rx and UDP Tx, Rx
 | 
						|
    for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
 | 
						|
        test_utility.run_all_cases(0)
 | 
						|
 | 
						|
    # 4. log performance and compare with pass standard
 | 
						|
    performance_items = []
 | 
						|
    for throughput_type in test_result:
 | 
						|
        ttfw_idf.log_performance('{}_throughput'.format(throughput_type),
 | 
						|
                                 '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
 | 
						|
        performance_items.append(['{}_throughput'.format(throughput_type),
 | 
						|
                                  '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())])
 | 
						|
 | 
						|
    # 5. save to report
 | 
						|
    TinyFW.JunitReport.update_performance(performance_items)
 | 
						|
    # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
 | 
						|
    for throughput_type in test_result:
 | 
						|
        ttfw_idf.check_performance('{}_throughput'.format(throughput_type),
 | 
						|
                                   test_result[throughput_type].get_best_throughput(), dut.TARGET)
 | 
						|
 | 
						|
    env.close_dut('iperf')
 | 
						|
 | 
						|
 | 
						|
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox2', target=['ESP32', 'ESP32S2', 'ESP32C3'], category='stress')
 | 
						|
def test_softap_throughput_vs_rssi(env, extra_data):
 | 
						|
    """
 | 
						|
    steps: |
 | 
						|
      1. build with best performance config
 | 
						|
      2. switch on one router
 | 
						|
      3. set attenuator value from 0-60 for each router
 | 
						|
      4. test TCP tx rx and UDP tx rx throughput
 | 
						|
    """
 | 
						|
    att_port = env.get_variable('attenuator_port')
 | 
						|
 | 
						|
    test_result = {
 | 
						|
        'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
 | 
						|
        'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
 | 
						|
    }
 | 
						|
 | 
						|
    # 1. get DUT and download
 | 
						|
    softap_dut = env.get_dut('softap_iperf', 'examples/wifi/iperf')
 | 
						|
    softap_dut.start_app()
 | 
						|
    softap_dut.expect_any('iperf>', 'esp32>')
 | 
						|
 | 
						|
    sta_dut = env.get_dut('sta_iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
 | 
						|
    sta_dut.start_app()
 | 
						|
    sta_dut.expect_any('iperf>', 'esp32>')
 | 
						|
 | 
						|
    # 2. run test for each required att value
 | 
						|
    test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result)
 | 
						|
 | 
						|
    Attenuator.set_att(att_port, 0)
 | 
						|
 | 
						|
    for atten_val in ATTEN_VALUE_LIST:
 | 
						|
        assert Attenuator.set_att(att_port, atten_val) is True
 | 
						|
        test_utility.run_all_cases(atten_val)
 | 
						|
 | 
						|
    env.close_dut('softap_iperf')
 | 
						|
    env.close_dut('sta_iperf')
 | 
						|
 | 
						|
    # 3. generate report
 | 
						|
    report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'SoftAPThroughputVsRssiReport'),
 | 
						|
                                               test_result)
 | 
						|
    report.generate_report()
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    # test_wifi_throughput_basic(env_config_file='EnvConfig.yml')
 | 
						|
    # test_wifi_throughput_with_different_configs(env_config_file='EnvConfig.yml')
 | 
						|
    test_wifi_throughput_vs_rssi(env_config_file='EnvConfig.yml', target='ESP32C3')
 | 
						|
    test_softap_throughput_vs_rssi(env_config_file='EnvConfig.yml')
 |