From d74eaadd564a8ff99b742177df26ae73334642b5 Mon Sep 17 00:00:00 2001 From: Yinling He Date: Thu, 10 Dec 2020 20:33:11 +0800 Subject: [PATCH] test: add softap iperf test --- examples/wifi/iperf/components/iperf/iperf.c | 1 + examples/wifi/iperf/iperf_test.py | 141 ++++++++++++++++++- 2 files changed, 141 insertions(+), 1 deletion(-) diff --git a/examples/wifi/iperf/components/iperf/iperf.c b/examples/wifi/iperf/components/iperf/iperf.c index 3b8402a9b0..03687fc955 100644 --- a/examples/wifi/iperf/components/iperf/iperf.c +++ b/examples/wifi/iperf/components/iperf/iperf.c @@ -149,6 +149,7 @@ static esp_err_t IRAM_ATTR iperf_run_tcp_server(void) return ESP_FAIL; } + printf("iperf tcp server create successfully\n"); buffer = s_iperf_ctrl.buffer; want_recv = s_iperf_ctrl.buffer_len; while (!s_iperf_ctrl.finish) { diff --git a/examples/wifi/iperf/iperf_test.py b/examples/wifi/iperf/iperf_test.py index 852cfc2dba..b669057c1d 100644 --- a/examples/wifi/iperf/iperf_test.py +++ b/examples/wifi/iperf/iperf_test.py @@ -348,6 +348,12 @@ class IperfTestUtility(object): with open(PC_IPERF_TEMP_LOG_FILE, "w") as f: if proto == "tcp": self.dut.write("iperf -s -i 1 -t {}".format(TEST_TIME)) + # wait until DUT TCP server created + try: + self.dut.expect("iperf tcp server create successfully", timeout=1) + except DUT.ExpectTimeout: + # compatible with old iperf example binary + pass process = subprocess.Popen(["iperf", "-c", dut_ip, "-t", str(TEST_TIME), "-f", "m"], stdout=f, stderr=f) @@ -436,6 +442,91 @@ class IperfTestUtility(object): return ret +class IperfTestUtilitySoftap(IperfTestUtility): + """ iperf test implementation """ + def __init__(self, dut, softap_dut, config_name, test_result=None): + super(IperfTestUtility, self).__init__(dut, config_name, 'softap', '1234567890', None, None, test_result=None) + self.softap_dut = softap_dut + self.softap_ip = '192.168.4.1' + + def setup(self): + """ + setup iperf test: + + 1. kill current iperf process + 2. reboot DUT (currently iperf is not very robust, need to reboot DUT) + 3. scan to get AP RSSI + 4. connect to AP + """ + self.softap_dut.write("restart") + self.softap_dut.expect_any("iperf>", "esp32>", timeout=30) + self.softap_dut.write("ap {} {}".format(self.ap_ssid, self.ap_password)) + self.dut.write("restart") + self.dut.expect_any("iperf>", "esp32>", timeout=30) + self.dut.write("scan {}".format(self.ap_ssid)) + for _ in range(SCAN_RETRY_COUNT): + try: + rssi = int(self.dut.expect(re.compile(r"\[{}]\[rssi=(-\d+)]".format(self.ap_ssid)), + timeout=SCAN_TIMEOUT)[0]) + break + except DUT.ExpectTimeout: + continue + else: + raise AssertionError("Failed to scan AP") + self.dut.write("sta {} {}".format(self.ap_ssid, self.ap_password)) + dut_ip = self.dut.expect(re.compile(r"sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)"))[0] + return dut_ip, rssi + + def _test_once(self, proto, direction): + """ do measure once for one type """ + # connect and scan to get RSSI + dut_ip, rssi = self.setup() + + assert direction in ["rx", "tx"] + assert proto in ["tcp", "udp"] + + # run iperf test + if direction == "tx": + if proto == "tcp": + self.softap_dut.write("iperf -s -i 1 -t {}".format(TEST_TIME)) + # wait until DUT TCP server created + try: + self.softap_dut.expect("iperf tcp server create successfully", timeout=1) + except DUT.ExpectTimeout: + # compatible with old iperf example binary + pass + self.dut.write("iperf -c {} -i 1 -t {}".format(self.softap_ip, TEST_TIME)) + else: + self.softap_dut.write("iperf -s -u -i 1 -t {}".format(TEST_TIME)) + self.dut.write("iperf -c {} -u -i 1 -t {}".format(self.softap_ip, TEST_TIME)) + else: + if proto == "tcp": + self.dut.write("iperf -s -i 1 -t {}".format(TEST_TIME)) + # wait until DUT TCP server created + try: + self.dut.expect("iperf tcp server create successfully", timeout=1) + except DUT.ExpectTimeout: + # compatible with old iperf example binary + pass + self.softap_dut.write("iperf -c {} -i 1 -t {}".format(dut_ip, TEST_TIME)) + else: + self.dut.write("iperf -s -u -i 1 -t {}".format(TEST_TIME)) + self.softap_dut.write("iperf -c {} -u -i 1 -t {}".format(dut_ip, TEST_TIME)) + time.sleep(60) + + if direction == "tx": + server_raw_data = self.dut.read() + else: + server_raw_data = self.softap_dut.read() + self.dut.write("iperf -a") + self.softap_dut.write("iperf -a") + self.dut.write("heap") + heap_size = self.dut.expect(re.compile(r"min heap size: (\d+)\D"))[0] + + # return server raw data (for parsing test results) and RSSI + return server_raw_data, rssi, heap_size + + @ttfw_idf.idf_example_test(env_tag="Example_ShieldBox_Basic", category="stress") def test_wifi_throughput_with_different_configs(env, extra_data): """ @@ -547,7 +638,7 @@ def test_wifi_throughput_vs_rssi(env, extra_data): env.close_dut("iperf") # 4. generate report - report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, "ThroughputVsRssiReport"), + report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, "STAThroughputVsRssiReport"), test_result) report.generate_report() @@ -605,7 +696,55 @@ def test_wifi_throughput_basic(env, extra_data): env.close_dut("iperf") +@ttfw_idf.idf_example_test(env_tag="Example_ShieldBox2", category="stress") +def test_softap_throughput_vs_rssi(env, extra_data): + """ + steps: | + 1. build with best performance config + 2. switch on one router + 3. set attenuator value from 0-60 for each router + 4. test TCP tx rx and UDP tx rx throughput + """ + att_port = env.get_variable("attenuator_port") + + test_result = { + "tcp_tx": TestResult("tcp", "tx", BEST_PERFORMANCE_CONFIG), + "tcp_rx": TestResult("tcp", "rx", BEST_PERFORMANCE_CONFIG), + "udp_tx": TestResult("udp", "tx", BEST_PERFORMANCE_CONFIG), + "udp_rx": TestResult("udp", "rx", BEST_PERFORMANCE_CONFIG), + } + + # 1. get DUT and download + softap_dut = env.get_dut("softap_iperf", "examples/wifi/iperf", dut_class=ttfw_idf.ESP32DUT, + app_config_name=BEST_PERFORMANCE_CONFIG) + softap_dut.start_app() + softap_dut.expect_any("iperf>", "esp32>") + + sta_dut = env.get_dut("sta_iperf", "examples/wifi/iperf", dut_class=ttfw_idf.ESP32DUT, + app_config_name=BEST_PERFORMANCE_CONFIG) + sta_dut.start_app() + sta_dut.expect_any("iperf>", "esp32>") + + # 2. run test for each required att value + test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result) + + Attenuator.set_att(att_port, 0) + + for atten_val in ATTEN_VALUE_LIST: + assert Attenuator.set_att(att_port, atten_val) is True + test_utility.run_all_cases(atten_val) + + env.close_dut("softap_iperf") + env.close_dut("sta_iperf") + + # 3. generate report + report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, "SoftAPThroughputVsRssiReport"), + test_result) + report.generate_report() + + if __name__ == '__main__': test_wifi_throughput_basic(env_config_file="EnvConfig.yml") test_wifi_throughput_with_different_configs(env_config_file="EnvConfig.yml") test_wifi_throughput_vs_rssi(env_config_file="EnvConfig.yml") + test_softap_throughput_vs_rssi(env_config_file="EnvConfig.yml")