diff --git a/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py b/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py index ff248219e5..632da99dbc 100644 --- a/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py +++ b/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py @@ -7,11 +7,13 @@ import subprocess import time import pexpect -from idf_iperf_test_util import LineChart from pytest_embedded import Dut +from idf_iperf_test_util import LineChart + try: - from typing import Any, Tuple + from typing import Any + from typing import Tuple except ImportError: # Only used for type annotations pass @@ -30,7 +32,7 @@ PC_IPERF_TEMP_LOG_FILE = '.tmp_iperf.log' class TestResult(object): - """ record, analysis test result and convert data to output format """ + """record, analysis test result and convert data to output format""" PC_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+\.\d+)\s*-\s*(\d+.\d+)\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits\/sec') DUT_BANDWIDTH_LOG_PATTERN = re.compile(r'([\d.]+)\s*-\s*([\d.]+)\s+sec\s+([\d.]+)\s+Mbits/sec') @@ -47,7 +49,7 @@ class TestResult(object): RSSI_RANGE = [-x for x in range(10, 100)] ATT_RANGE = [x for x in range(0, 64)] - def __init__(self, proto:str, direction:str, config_name:str) -> None: + def __init__(self, proto: str, direction: str, config_name: str) -> None: self.proto = proto self.direction = direction self.config_name = config_name @@ -57,7 +59,7 @@ class TestResult(object): self.heap_size = INVALID_HEAP_SIZE self.error_list = [] # type: list[str] - def _save_result(self, throughput:float, ap_ssid:str, att:int, rssi:int, heap_size:str) -> None: + def _save_result(self, throughput: float, ap_ssid: str, att: int, rssi: int, heap_size: str) -> None: """ save the test results: @@ -72,7 +74,7 @@ class TestResult(object): self.att_rssi_map[ap_ssid][att] = rssi - def record_throughput(database:dict, key_value:int) -> None: + def record_throughput(database: dict, key_value: int) -> None: try: # we save the larger value for same att if throughput > database[ap_ssid][key_value]: @@ -86,7 +88,7 @@ class TestResult(object): if int(heap_size) < self.heap_size: self.heap_size = int(heap_size) - def add_result(self, raw_data:str, ap_ssid:str, att:int, rssi:int, heap_size:str) -> float: + def add_result(self, raw_data: str, ap_ssid: str, att: int, rssi: int, heap_size: str) -> float: """ add result for one test @@ -114,21 +116,27 @@ class TestResult(object): continue throughput_list.append(throughput) max_throughput = max(max_throughput, throughput) - if throughput == 0 and rssi > self.ZERO_POINT_THRESHOLD \ - and fall_to_0_recorded < 1: + if throughput == 0 and rssi > self.ZERO_POINT_THRESHOLD and fall_to_0_recorded < 1: # throughput fall to 0 error. we only record 1 records for one test - self.error_list.append('[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}' - .format(ap_ssid, att, rssi, result[0], result[1])) + self.error_list.append( + '[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}'.format( + ap_ssid, att, rssi, result[0], result[1] + ) + ) fall_to_0_recorded += 1 if len(throughput_list) < self.THROUGHPUT_QUALIFY_COUNT: - self.error_list.append('[Error][Fatal][{}][att: {}][rssi: {}]: Only {} throughput values found, expected at least {}' - .format(ap_ssid, att, rssi, len(throughput_list), self.THROUGHPUT_QUALIFY_COUNT)) + self.error_list.append( + '[Error][Fatal][{}][att: {}][rssi: {}]: Only {} throughput values found, expected at least {}'.format( + ap_ssid, att, rssi, len(throughput_list), self.THROUGHPUT_QUALIFY_COUNT + ) + ) max_throughput = 0.0 if max_throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD: - self.error_list.append('[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found' - .format(ap_ssid, att, rssi)) + self.error_list.append( + '[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found'.format(ap_ssid, att, rssi) + ) self._save_result(max_throughput, ap_ssid, att, rssi, heap_size) @@ -141,7 +149,8 @@ class TestResult(object): 1. throughput value 30% worse than the next point with lower RSSI 2. throughput value 30% worse than the next point with larger attenuate """ - def analysis_bad_point(data:dict, index_type:str) -> None: + + def analysis_bad_point(data: dict, index_type: str) -> None: for ap_ssid in data: result_dict = data[ap_ssid] index_list = list(result_dict.keys()) @@ -150,19 +159,23 @@ class TestResult(object): index_list.reverse() for i, index_value in enumerate(index_list[1:]): - if index_value < self.BAD_POINT_RSSI_THRESHOLD or \ - result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD: + if ( + index_value < self.BAD_POINT_RSSI_THRESHOLD + or result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD + ): continue _percentage = result_dict[index_value] / result_dict[index_list[i]] if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD: - self.error_list.append('[Error][Bad point][{}][{}: {}]: drop {:.02f}%' - .format(ap_ssid, index_type, index_value, - (1 - _percentage) * 100)) + self.error_list.append( + '[Error][Bad point][{}][{}: {}]: drop {:.02f}%'.format( + ap_ssid, index_type, index_value, (1 - _percentage) * 100 + ) + ) analysis_bad_point(self.throughput_by_rssi, 'rssi') analysis_bad_point(self.throughput_by_att, 'att') - def draw_throughput_figure(self, path:str, ap_ssid:str, draw_type:str) -> str: + def draw_throughput_figure(self, path: str, ap_ssid: str, draw_type: str) -> str: """ :param path: folder to save figure. make sure the folder is already created. :param ap_ssid: ap ssid string or a list of ap ssid string @@ -184,14 +197,17 @@ class TestResult(object): else: file_name = 'ThroughputVs{}_{}_{}.html'.format(type_name, self.proto, self.direction) - LineChart.draw_line_chart(os.path.join(path, file_name), - 'Throughput Vs {} ({} {})'.format(type_name, self.proto, self.direction), - '{} (dbm)'.format(type_name), - 'Throughput (Mbps)', - data, range_list) + LineChart.draw_line_chart( + os.path.join(path, file_name), + 'Throughput Vs {} ({} {})'.format(type_name, self.proto, self.direction), + '{} (dbm)'.format(type_name), + 'Throughput (Mbps)', + data, + range_list, + ) return file_name - def draw_rssi_vs_att_figure(self, path:str, ap_ssid:str) -> str: + def draw_rssi_vs_att_figure(self, path: str, ap_ssid: str) -> str: """ :param path: folder to save figure. make sure the folder is already created. :param ap_ssid: ap to use @@ -201,18 +217,14 @@ class TestResult(object): file_name = 'AttVsRSSI.html' else: file_name = 'AttVsRSSI.html' - LineChart.draw_line_chart(os.path.join(path, file_name), - 'Att Vs RSSI', - 'Att (dbm)', - 'RSSI (dbm)', - self.att_rssi_map, - self.ATT_RANGE) + LineChart.draw_line_chart( + os.path.join(path, file_name), 'Att Vs RSSI', 'Att (dbm)', 'RSSI (dbm)', self.att_rssi_map, self.ATT_RANGE + ) return file_name def get_best_throughput(self) -> Any: - """ get the best throughput during test """ - best_for_aps = [max(self.throughput_by_att[ap_ssid].values()) - for ap_ssid in self.throughput_by_att] + """get the best throughput during test""" + best_for_aps = [max(self.throughput_by_att[ap_ssid].values()) for ap_ssid in self.throughput_by_att] return max(best_for_aps) def __str__(self) -> str: @@ -224,8 +236,9 @@ class TestResult(object): 3. min free heap size during test """ if self.throughput_by_att: - ret = '[{}_{}][{}]: {}\r\n\r\n'.format(self.proto, self.direction, self.config_name, - 'Fail' if self.error_list else 'Success') + ret = '[{}_{}][{}]: {}\r\n\r\n'.format( + self.proto, self.direction, self.config_name, 'Fail' if self.error_list else 'Success' + ) ret += 'Performance for each AP:\r\n' for ap_ssid in self.throughput_by_att: ret += '[{}]: {:.02f} Mbps\r\n'.format(ap_ssid, max(self.throughput_by_att[ap_ssid].values())) @@ -237,10 +250,18 @@ class TestResult(object): class IperfTestUtility(object): - """ iperf test implementation """ + """iperf test implementation""" - def __init__(self, dut:Dut, config_name:str, ap_ssid:str, ap_password:str, - pc_nic_ip:str, pc_iperf_log_file:str, test_result:Any=None) -> None: + def __init__( + self, + dut: Dut, + config_name: str, + ap_ssid: str, + ap_password: str, + pc_nic_ip: str, + pc_iperf_log_file: str, + test_result: Any = None, + ) -> None: self.config_name = config_name self.dut = dut @@ -261,7 +282,7 @@ class IperfTestUtility(object): 'udp_rx': TestResult('udp', 'rx', config_name), } - def setup(self) -> Tuple[str,int]: + def setup(self) -> Tuple[str, int]: """ setup iperf test: @@ -281,8 +302,7 @@ class IperfTestUtility(object): self.dut.write('sta_scan {}'.format(self.ap_ssid)) for _ in range(SCAN_RETRY_COUNT): try: - rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid), - timeout=SCAN_TIMEOUT).group(1)) + rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid), timeout=SCAN_TIMEOUT).group(1)) break except pexpect.TIMEOUT: continue @@ -292,11 +312,11 @@ class IperfTestUtility(object): dut_ip = self.dut.expect(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)').group(1) return dut_ip, rssi - def _save_test_result(self, test_case:str, raw_data:str, att:int, rssi:int, heap_size:int) -> Any: + def _save_test_result(self, test_case: str, raw_data: str, att: int, rssi: int, heap_size: int) -> Any: return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size) - def _test_once(self, proto:str, direction:str, bw_limit:int) -> Tuple[str, int, int]: - """ do measure once for one type """ + def _test_once(self, proto: str, direction: str, bw_limit: int) -> Tuple[str, int, int]: + """do measure once for one type""" # connect and scan to get RSSI dut_ip, rssi = self.setup() @@ -307,17 +327,21 @@ class IperfTestUtility(object): if direction == 'tx': with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f: if proto == 'tcp': - process = subprocess.Popen(['iperf', '-s', '-B', self.pc_nic_ip, - '-t', str(TEST_TIME), '-i', '1', '-f', 'm'], - stdout=f, stderr=f) + process = subprocess.Popen( + ['iperf', '-s', '-B', self.pc_nic_ip, '-t', str(TEST_TIME), '-i', '1', '-f', 'm'], + stdout=f, + stderr=f, + ) if bw_limit > 0: self.dut.write('iperf -c {} -i 1 -t {} -b {}'.format(self.pc_nic_ip, TEST_TIME, bw_limit)) else: self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME)) else: - process = subprocess.Popen(['iperf', '-s', '-u', '-B', self.pc_nic_ip, - '-t', str(TEST_TIME), '-i', '1', '-f', 'm'], - stdout=f, stderr=f) + process = subprocess.Popen( + ['iperf', '-s', '-u', '-B', self.pc_nic_ip, '-t', str(TEST_TIME), '-i', '1', '-f', 'm'], + stdout=f, + stderr=f, + ) if bw_limit > 0: self.dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(self.pc_nic_ip, TEST_TIME, bw_limit)) else: @@ -343,11 +367,15 @@ class IperfTestUtility(object): # compatible with old iperf example binary logging.info('create iperf tcp server fail') if bw_limit > 0: - process = subprocess.Popen(['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm', - '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f) + process = subprocess.Popen( + ['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm', '-t', str(TEST_TIME), '-f', 'm'], + stdout=f, + stderr=f, + ) else: - process = subprocess.Popen(['iperf', '-c', dut_ip, - '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f) + process = subprocess.Popen( + ['iperf', '-c', dut_ip, '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f + ) for _ in range(TEST_TIMEOUT): if process.poll() is not None: break @@ -363,8 +391,11 @@ class IperfTestUtility(object): except pexpect.TIMEOUT: # compatible with old iperf example binary logging.info('create iperf udp server fail') - process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm', - '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f) + process = subprocess.Popen( + ['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm', '-t', str(TEST_TIME), '-f', 'm'], + stdout=f, + stderr=f, + ) for _ in range(TEST_TIMEOUT): if process.poll() is not None: break @@ -376,7 +407,9 @@ class IperfTestUtility(object): stop_bw = 100 n = 10 step = int((stop_bw - start_bw) / n) - self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME + 4 * (n + 1))) # 4 sec for each bw step instance start/stop + self.dut.write( + 'iperf -s -u -i 1 -t {}'.format(TEST_TIME + 4 * (n + 1)) + ) # 4 sec for each bw step instance start/stop # wait until DUT TCP server created try: self.dut.expect('Socket bound', timeout=5) @@ -384,8 +417,22 @@ class IperfTestUtility(object): # compatible with old iperf example binary logging.info('create iperf udp server fail') for bandwidth in range(start_bw, stop_bw, step): - process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bandwidth) + 'm', - '-t', str(TEST_TIME / (n + 1)), '-f', 'm'], stdout=f, stderr=f) + process = subprocess.Popen( + [ + 'iperf', + '-c', + dut_ip, + '-u', + '-b', + str(bandwidth) + 'm', + '-t', + str(TEST_TIME / (n + 1)), + '-f', + 'm', + ], + stdout=f, + stderr=f, + ) for _ in range(TEST_TIMEOUT): if process.poll() is not None: break @@ -402,18 +449,21 @@ class IperfTestUtility(object): # save PC iperf logs to console with open(self.pc_iperf_log_file, 'a+') as f: - f.write('## [{}] `{}`\r\n##### {}' - .format(self.config_name, - '{}_{}'.format(proto, direction), - time.strftime('%m-%d %H:%M:%S', time.localtime(time.time())))) + f.write( + '## [{}] `{}`\r\n##### {}'.format( + self.config_name, + '{}_{}'.format(proto, direction), + time.strftime('%m-%d %H:%M:%S', time.localtime(time.time())), + ) + ) f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n') self.dut.write('heap') - heap_size = self.dut.expect(r'min heap size: (\d+)\D').group(1) + heap_size = self.dut.expect(r'min heap size: (\d+)\D', timeout=120).group(1) # return server raw data (for parsing test results) and RSSI return server_raw_data, rssi, heap_size - def run_test(self, proto:str, direction:str, atten_val:int, bw_limit:int) -> None: + def run_test(self, proto: str, direction: str, atten_val: int, bw_limit: int) -> None: """ run test for one type, with specified atten_value and save the test result @@ -426,11 +476,14 @@ class IperfTestUtility(object): heap_size = INVALID_HEAP_SIZE try: server_raw_data, rssi, heap_size = self._test_once(proto, direction, bw_limit) - throughput = self._save_test_result('{}_{}'.format(proto, direction), - server_raw_data, atten_val, - rssi, heap_size) - logging.info('[{}][{}_{}][{}][{}]: {:.02f}' - .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput)) + throughput = self._save_test_result( + '{}_{}'.format(proto, direction), server_raw_data, atten_val, rssi, heap_size + ) + logging.info( + '[{}][{}_{}][{}][{}]: {:.02f}'.format( + self.config_name, proto, direction, rssi, self.ap_ssid, throughput + ) + ) self.lowest_rssi_scanned = min(self.lowest_rssi_scanned, rssi) except (ValueError, IndexError): self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size) @@ -439,7 +492,7 @@ class IperfTestUtility(object): self.fail_to_scan += 1 logging.info('Fail to scan AP.') - def run_all_cases(self, atten_val:int, bw_limit:int) -> None: + def run_all_cases(self, atten_val: int, bw_limit: int) -> None: """ run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx). @@ -452,7 +505,8 @@ class IperfTestUtility(object): self.run_test('udp', 'rx', atten_val, bw_limit) if self.fail_to_scan > 10: logging.info( - 'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned)) + 'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned) + ) raise AssertionError def wait_ap_power_on(self) -> bool: @@ -467,8 +521,7 @@ class IperfTestUtility(object): for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT): try: self.dut.write('scan {}'.format(self.ap_ssid)) - self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid), - timeout=SCAN_TIMEOUT) + self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid), timeout=SCAN_TIMEOUT) ret = True break except pexpect.TIMEOUT: