diff --git a/examples/protocols/.build-test-rules.yml b/examples/protocols/.build-test-rules.yml index 35295523a4..1efc6a7495 100644 --- a/examples/protocols/.build-test-rules.yml +++ b/examples/protocols/.build-test-rules.yml @@ -10,6 +10,13 @@ - mbedtls - protocol_examples_common +.mqtt_dependencies: &mqtt_dependencies + <<: *default_dependencies + depends_filepatterns: + - components/mqtt/**/* + depends_components+: + - mqtt + examples/protocols/esp_http_client: <<: *default_dependencies enable: @@ -134,51 +141,55 @@ examples/protocols/modbus: - examples/common_components/protocol_examples_common/**/* - examples/protocols/modbus/mb_example_common/**/* - -examples/protocols/mqtt: - <<: *default_dependencies - depends_filepatterns: - - components/mqtt/**/* - examples/protocols/mqtt/custom_outbox: - <<: *default_dependencies + <<: *mqtt_dependencies examples/protocols/mqtt/ssl: - <<: *default_dependencies + <<: *mqtt_dependencies disable_test: - if: IDF_TARGET != "esp32" reason: only test on esp32 examples/protocols/mqtt/ssl_ds: - <<: *default_dependencies + <<: *mqtt_dependencies disable: - if: SOC_DIG_SIGN_SUPPORTED != 1 temporary: false reason: DS not present + depends_filepatterns+: + - examples/protocols/mqtt/ssl_ds/**/* examples/protocols/mqtt/tcp: - <<: *default_dependencies + <<: *mqtt_dependencies disable_test: - if: IDF_TARGET != "esp32" reason: only test on esp32 + depends_filepatterns+: + - examples/protocols/mqtt/tcp/**/* examples/protocols/mqtt/ws: - <<: *default_dependencies + <<: *mqtt_dependencies disable_test: - if: IDF_TARGET != "esp32" reason: only test on esp32 + depends_filepatterns+: + - examples/protocols/mqtt/ws/**/* examples/protocols/mqtt/wss: - <<: *default_dependencies + <<: *mqtt_dependencies disable_test: - if: IDF_TARGET != "esp32" reason: only test on esp32 + depends_filepatterns+: + - examples/protocols/mqtt/wss/**/* examples/protocols/mqtt5: - <<: *default_dependencies + <<: *mqtt_dependencies disable_test: - if: IDF_TARGET != "esp32" reason: only test on esp32 + depends_filepatterns+: + - examples/protocols/mqtt5/**/* examples/protocols/smtp_client: <<: *default_dependencies diff --git a/tools/ci/dynamic_pipelines/constants.py b/tools/ci/dynamic_pipelines/constants.py index c0c90c4d27..d724cbfe15 100644 --- a/tools/ci/dynamic_pipelines/constants.py +++ b/tools/ci/dynamic_pipelines/constants.py @@ -20,6 +20,9 @@ DEFAULT_TARGET_TEST_CHILD_PIPELINE_FILEPATH = os.path.join(IDF_PATH, 'target_tes DEFAULT_BUILD_CHILD_PIPELINE_NAME = 'Build Child Pipeline' DEFAULT_TARGET_TEST_CHILD_PIPELINE_NAME = 'Target Test Child Pipeline' +DEFAULT_TARGET_TEST_JOB_TEMPLATE_NAME = '.dynamic_target_test_template' +TIMEOUT_4H_TEMPLATE_NAME = '.timeout_4h_template' + TEST_RELATED_BUILD_JOB_NAME = 'build_test_related_apps' NON_TEST_RELATED_BUILD_JOB_NAME = 'build_non_test_related_apps' diff --git a/tools/ci/dynamic_pipelines/scripts/generate_target_test_child_pipeline.py b/tools/ci/dynamic_pipelines/scripts/generate_target_test_child_pipeline.py index 019bc789a6..d845311170 100644 --- a/tools/ci/dynamic_pipelines/scripts/generate_target_test_child_pipeline.py +++ b/tools/ci/dynamic_pipelines/scripts/generate_target_test_child_pipeline.py @@ -19,16 +19,19 @@ from dynamic_pipelines.constants import BUILD_ONLY_LABEL from dynamic_pipelines.constants import DEFAULT_CASES_TEST_PER_JOB from dynamic_pipelines.constants import DEFAULT_TARGET_TEST_CHILD_PIPELINE_FILEPATH from dynamic_pipelines.constants import DEFAULT_TARGET_TEST_CHILD_PIPELINE_NAME +from dynamic_pipelines.constants import DEFAULT_TARGET_TEST_JOB_TEMPLATE_NAME from dynamic_pipelines.constants import DEFAULT_TEST_PATHS from dynamic_pipelines.constants import ( KNOWN_GENERATE_TEST_CHILD_PIPELINE_WARNINGS_FILEPATH, ) +from dynamic_pipelines.constants import TIMEOUT_4H_TEMPLATE_NAME from dynamic_pipelines.models import EmptyJob from dynamic_pipelines.models import Job from dynamic_pipelines.models import TargetTestJob from dynamic_pipelines.utils import dump_jobs_to_yaml from idf_build_apps import App from idf_ci.app import import_apps_from_txt +from idf_pytest.constants import TIMEOUT_4H_MARKERS from idf_pytest.script import get_pytest_cases @@ -82,7 +85,13 @@ def get_target_test_jobs( print('WARNING: excluding test cases with runner tags:', runner_tags) continue + _extends = [DEFAULT_TARGET_TEST_JOB_TEMPLATE_NAME] + for timeout_4h_marker in TIMEOUT_4H_MARKERS: + if timeout_4h_marker in env_markers: + _extends.append(TIMEOUT_4H_TEMPLATE_NAME) + target_test_job = TargetTestJob( + extends=_extends, name=f'{target_selector} - {",".join(env_markers)}', tags=runner_tags, parallel=len(cases) // DEFAULT_CASES_TEST_PER_JOB + 1, diff --git a/tools/ci/dynamic_pipelines/templates/.dynamic_jobs.yml b/tools/ci/dynamic_pipelines/templates/.dynamic_jobs.yml index 4d78137e60..06a63fc0bd 100644 --- a/tools/ci/dynamic_pipelines/templates/.dynamic_jobs.yml +++ b/tools/ci/dynamic_pipelines/templates/.dynamic_jobs.yml @@ -94,3 +94,6 @@ - section_start "upload_junit_reports" - run_cmd python tools/ci/artifacts_handler.py upload --type logs junit_reports - section_end "upload_junit_reports" + +.timeout_4h_template: + timeout: 4 hours diff --git a/tools/ci/idf_pytest/constants.py b/tools/ci/idf_pytest/constants.py index 1aa138b87b..b0dbcaf5f0 100644 --- a/tools/ci/idf_pytest/constants.py +++ b/tools/ci/idf_pytest/constants.py @@ -75,6 +75,7 @@ ENV_MARKERS = { 'twai_transceiver': 'runners with a TWAI PHY transceiver', 'flash_encryption_wifi_high_traffic': 'Flash Encryption runners with wifi high traffic support', 'ethernet': 'ethernet runner', + 'ethernet_stress': 'ethernet runner with stress test', 'ethernet_flash_8m': 'ethernet runner with 8mb flash', 'ethernet_router': 'both the runner and dut connect to the same router through ethernet NIC', 'ethernet_vlan': 'ethernet runner GARM-32-SH-1-R16S5N3', @@ -127,6 +128,11 @@ ENV_MARKERS = { 'ram_app': 'ram_app runners', } +# by default the timeout is 1h, for some special cases we need to extend it +TIMEOUT_4H_MARKERS = [ + 'ethernet_stress', +] + DEFAULT_CONFIG_RULES_STR = ['sdkconfig.ci=default', 'sdkconfig.ci.*=', '=default'] DEFAULT_IGNORE_WARNING_FILEPATH = os.path.join(IDF_PATH, 'tools', 'ci', 'ignore_build_warnings.txt') DEFAULT_BUILD_TEST_RULES_FILEPATH = os.path.join(IDF_PATH, '.gitlab', 'ci', 'default-build-test-rules.yml') diff --git a/tools/test_apps/protocols/.build-test-rules.yml b/tools/test_apps/protocols/.build-test-rules.yml index 81198acd88..21d8dfa455 100644 --- a/tools/test_apps/protocols/.build-test-rules.yml +++ b/tools/test_apps/protocols/.build-test-rules.yml @@ -16,7 +16,7 @@ tools/test_apps/protocols/mqtt/publish_connect_test: temporary: true reason: lack of runners depends_components: - - esp_eth + - mqtt depends_filepatterns: - tools/ci/python_packages/common_test_methods.py - examples/common_components/**/* diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild b/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild index 8e4cfe56e6..dce9ce2530 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild @@ -24,18 +24,6 @@ menu "Example Configuration" help URL of an mqtt broker for wss transport - config EXAMPLE_PUBLISH_TOPIC - string "publish topic" - default "/topic/publish/esp2py" - help - topic to which esp32 client publishes - - config EXAMPLE_SUBSCRIBE_TOPIC - string "subscribe topic" - default "/topic/subscribe/py2esp" - help - topic to which esp32 client subscribes (and expects data) - config EXAMPLE_BROKER_CERTIFICATE_OVERRIDE string "Broker certificate override" default "" diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c index 23a58a6750..d720a6d26f 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c @@ -152,6 +152,8 @@ static int do_publish_setup(int argc, char **argv) { command_context.data = calloc(1, sizeof(publish_context_t)); ((publish_context_t*)command_context.data)->pattern = strdup(*publish_setup_args.pattern->sval); ((publish_context_t*)command_context.data)->pattern_repetitions = *publish_setup_args.pattern_repetitions->ival; + ((publish_context_t*)command_context.data)->subscribe_to = strdup(*publish_setup_args.subscribe_to->sval); + ((publish_context_t*)command_context.data)->publish_to = strdup(*publish_setup_args.publish_to->sval); publish_setup(&command_context, *publish_setup_args.transport->sval); return 0; } @@ -210,6 +212,8 @@ void register_common_commands(void) { } void register_publish_commands(void) { publish_setup_args.transport = arg_str1(NULL,NULL,"", "Selected transport to test"); + publish_setup_args.publish_to = arg_str1(NULL,NULL,"", "Selected publish_to to publish"); + publish_setup_args.subscribe_to = arg_str1(NULL,NULL,"", "Selected subscribe_to to publish"); publish_setup_args.pattern = arg_str1(NULL,NULL,"", "Message pattern repeated to build big messages"); publish_setup_args.pattern_repetitions = arg_int1(NULL,NULL,"", "How many times the pattern is repeated"); publish_setup_args.end = arg_end(1); @@ -220,7 +224,7 @@ void register_publish_commands(void) { publish_args.end = arg_end(1); const esp_console_cmd_t publish_setup = { .command = "publish_setup", - .help = "Run publish test\n", + .help = "Set publish test parameters\n", .hint = NULL, .func = &do_publish_setup, .argtable = &publish_setup_args diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h index 454e82e53f..c5311ecc24 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h @@ -17,6 +17,8 @@ typedef struct { typedef struct { transport_t selected_transport; char *pattern; + char *subscribe_to; + char *publish_to; int pattern_repetitions; int qos; char *expected; @@ -41,6 +43,8 @@ typedef struct { typedef struct { struct arg_str *transport; + struct arg_str *subscribe_to; + struct arg_str *publish_to; struct arg_str *pattern; struct arg_int *pattern_repetitions; struct arg_end *end; diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c index 3bdefb61af..7d11ae31e3 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c @@ -14,6 +14,7 @@ #include "freertos/FreeRTOS.h" #include #include "esp_system.h" +#include "esp_random.h" #include "esp_log.h" #include "mqtt_client.h" @@ -24,7 +25,7 @@ static const char *TAG = "publish_test"; static EventGroupHandle_t mqtt_event_group; const static int CONNECTED_BIT = BIT0; - +#define CLIENT_ID_SUFFIX_SIZE 12 #if CONFIG_EXAMPLE_BROKER_CERTIFICATE_OVERRIDDEN == 1 static const uint8_t mqtt_eclipseprojects_io_pem_start[] = "-----BEGIN CERTIFICATE-----\n" CONFIG_EXAMPLE_BROKER_CERTIFICATE_OVERRIDE "\n-----END CERTIFICATE-----"; #else @@ -45,8 +46,8 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_ case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); xEventGroupSetBits(mqtt_event_group, CONNECTED_BIT); - msg_id = esp_mqtt_client_subscribe(client, CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, test_data->qos); - ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, msg_id); + msg_id = esp_mqtt_client_subscribe(client, test_data->subscribe_to, test_data->qos); + ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", test_data->subscribe_to, msg_id); break; case MQTT_EVENT_DISCONNECTED: @@ -101,8 +102,6 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_ } } - - void test_init(void) { ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); @@ -169,6 +168,10 @@ static void configure_client(command_context_t * ctx, const char *transport) ESP_LOGI(TAG, "Set certificate"); config.broker.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start; } + // Generate a random client id for each iteration + char client_id[CLIENT_ID_SUFFIX_SIZE] = {0}; + snprintf(client_id, sizeof(client_id), "esp32-%08X", esp_random()); + config.credentials.client_id = client_id; esp_mqtt_set_config(ctx->mqtt_client, &config); } } @@ -200,9 +203,9 @@ void publish_test(command_context_t * ctx, int expect_to_publish, int qos, bool for (int i = 0; i < data->nr_of_msg_expected; i++) { int msg_id; if (enqueue) { - msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0, true); + msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0, true); } else { - msg_id = esp_mqtt_client_publish(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0); + msg_id = esp_mqtt_client_publish(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0); if(msg_id < 0) { ESP_LOGE(TAG, "Failed to publish"); break; diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py b/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py index 763574c13e..b2df800002 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py @@ -29,24 +29,30 @@ DEFAULT_MSG_SIZE = 16 # Publisher class creating a python client to send/receive published data from esp-mqtt client class MqttPublisher(mqtt.Client): - def __init__(self, repeat, published, publish_cfg, log_details=False): # type: (MqttPublisher, int, int, dict, bool) -> None - self.sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)) + def __init__(self, config, log_details=False): # type: (MqttPublisher, dict, bool) -> None self.log_details = log_details - self.repeat = repeat - self.publish_cfg = publish_cfg - self.expected_data = f'{self.sample_string * self.repeat}' - self.published = published + self.config = config + self.expected_data = f'{config["pattern"] * config["scenario"]["msg_len"]}' self.received = 0 + self.subscribe_mid = 0 self.lock = Lock() self.event_client_connected = Event() + self.event_client_subscribed = Event() self.event_client_got_all = Event() - transport = 'websockets' if self.publish_cfg['transport'] in ['ws', 'wss'] else 'tcp' - super().__init__('MqttTestRunner', userdata=0, transport=transport) + transport = 'websockets' if self.config['transport'] in ['ws', 'wss'] else 'tcp' + client_id = 'MqttTestRunner' + ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(5)) + super().__init__(client_id, userdata=0, transport=transport) def print_details(self, text): # type: (str) -> None if self.log_details: logging.info(text) + def on_subscribe(self, client: Any, userdata: Any, mid: Any, granted_qos: Any) -> None: + """Verify successful subscription.""" + if mid == self.subscribe_mid: + logging.info(f'Subscribed to {self.config["subscribe_topic"]} successfully with QoS: {granted_qos}') + self.event_client_subscribed.set() + def on_connect(self, mqttc: Any, obj: Any, flags: Any, rc:int) -> None: self.event_client_connected.set() @@ -56,31 +62,29 @@ class MqttPublisher(mqtt.Client): def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None: payload = msg.payload.decode('utf-8') if payload == self.expected_data: - userdata += 1 - self.user_data_set(userdata) - self.received = userdata - if userdata == self.published: + self.received += 1 + if self.received == self.config['scenario']['nr_of_msgs']: self.event_client_got_all.set() else: differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data)))) logging.error(f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:' f'{len(self.expected_data)}') - logging.info(f'Repetitions: {payload.count(self.sample_string)}') - logging.info(f'Pattern: {self.sample_string}') - logging.info(f'First : {payload[:DEFAULT_MSG_SIZE]}') - logging.info(f'Last : {payload[-DEFAULT_MSG_SIZE:]}') + logging.info(f'Repetitions: {payload.count(self.config["pattern"])}') + logging.info(f'Pattern: {self.config["pattern"]}') + logging.info(f'First: {payload[:DEFAULT_MSG_SIZE]}') + logging.info(f'Last: {payload[-DEFAULT_MSG_SIZE:]}') matcher = difflib.SequenceMatcher(a=payload, b=self.expected_data) for match in matcher.get_matching_blocks(): logging.info(f'Match: {match}') def __enter__(self) -> Any: - qos = self.publish_cfg['qos'] - broker_host = self.publish_cfg['broker_host_' + self.publish_cfg['transport']] - broker_port = self.publish_cfg['broker_port_' + self.publish_cfg['transport']] + qos = self.config['qos'] + broker_host = self.config['broker_host_' + self.config['transport']] + broker_port = self.config['broker_port_' + self.config['transport']] try: self.print_details('Connecting...') - if self.publish_cfg['transport'] in ['ssl', 'wss']: + if self.config['transport'] in ['ssl', 'wss']: self.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) self.tls_insecure_set(True) self.event_client_connected.clear() @@ -94,7 +98,8 @@ class MqttPublisher(mqtt.Client): if not self.event_client_connected.wait(timeout=30): raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}') self.event_client_got_all.clear() - self.subscribe(self.publish_cfg['subscribe_topic'], qos) + result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos) + assert result == 0 return self def __exit__(self, exc_type, exc_value, traceback): # type: (MqttPublisher, str, str, dict) -> None @@ -102,38 +107,47 @@ class MqttPublisher(mqtt.Client): self.loop_stop() -def get_configurations(dut: Dut) -> Dict[str,Any]: +def get_configurations(dut: Dut, test_case: Any) -> Dict[str,Any]: publish_cfg = {} try: @no_type_check - def get_broker_from_dut(dut, config_option): + def get_config_from_dut(dut, config_option): # logging.info('Option:', config_option, dut.app.sdkconfig.get(config_option)) value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get(config_option)) if value is None: return None, None return value.group(1), int(value.group(2)) # Get publish test configuration - publish_cfg['publish_topic'] = dut.app.sdkconfig.get('EXAMPLE_SUBSCRIBE_TOPIC').replace('"','') - publish_cfg['subscribe_topic'] = dut.app.sdkconfig.get('EXAMPLE_PUBLISH_TOPIC').replace('"','') - publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI') - publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI') - publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WS_URI') - publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI') + publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI') + publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI') + publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WS_URI') + publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI') except Exception: logging.info('ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig') raise + transport, qos, enqueue, scenario = test_case + if publish_cfg['broker_host_' + transport] is None: + pytest.skip(f'Skipping transport: {transport}...') + publish_cfg['scenario'] = scenario + publish_cfg['qos'] = qos + publish_cfg['enqueue'] = enqueue + publish_cfg['transport'] = transport + publish_cfg['pattern'] = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)) + publish_cfg['test_timeout'] = get_timeout(test_case) + unique_topic = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE)) + publish_cfg['subscribe_topic'] = 'test/subscribe_to/' + unique_topic + publish_cfg['publish_topic'] = 'test/subscribe_to/' + unique_topic logging.info(f'configuration: {publish_cfg}') return publish_cfg @contextlib.contextmanager -def connected_and_subscribed(dut:Dut, transport:str, pattern:str, pattern_repetitions:int) -> Any: - dut.write(f'publish_setup {transport} {pattern} {pattern_repetitions}') - dut.write(f'start') +def connected_and_subscribed(dut:Dut) -> Any: + dut.write('start') dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60) yield - dut.write(f'stop') + dut.write('stop') def get_scenarios() -> List[Dict[str, int]]: @@ -147,112 +161,112 @@ def get_scenarios() -> List[Dict[str, int]]: continue break if not scenarios: # No message sizes present in the env - set defaults - scenarios = [{'len':0, 'repeat':5}, # zero-sized messages - {'len':2, 'repeat':5}, # short messages - {'len':200, 'repeat':3}, # long messages + scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages + {'msg_len':2, 'nr_of_msgs':5}, # short messages + {'msg_len':200, 'nr_of_msgs':3}, # long messages ] return scenarios def get_timeout(test_case: Any) -> int: transport, qos, enqueue, scenario = test_case - if transport in ['ws', 'wss'] or qos == 2: - return 120 - return 60 + timeout = int(scenario['nr_of_msgs'] * 20) + if qos == 1: + timeout += 30 + if qos == 2: + timeout += 45 + if transport in ['ws', 'wss']: + timeout += 30 + return timeout -def run_publish_test_case(dut: Dut, test_case: Any, publish_cfg: Any) -> None: - transport, qos, enqueue, scenario = test_case - if publish_cfg['broker_host_' + transport] is None: - pytest.skip(f'Skipping transport: {transport}...') - repeat = scenario['len'] - published = scenario['repeat'] - publish_cfg['qos'] = qos - publish_cfg['queue'] = enqueue - publish_cfg['transport'] = transport - test_timeout = get_timeout(test_case) - logging.info(f'Starting Publish test: transport:{transport}, qos:{qos}, nr_of_msgs:{published},' - f' msg_size:{repeat*DEFAULT_MSG_SIZE}, enqueue:{enqueue}') - with MqttPublisher(repeat, published, publish_cfg) as publisher, connected_and_subscribed(dut, transport, publisher.sample_string, scenario['len']): +def run_publish_test_case(dut: Dut, config: Any) -> None: + logging.info(f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},' + f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},' + f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}') + dut.write(f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}') + with MqttPublisher(config) as publisher, connected_and_subscribed(dut): + assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe' msgs_published: List[mqtt.MQTTMessageInfo] = [] - dut.write(f'publish {publisher.published} {qos} {enqueue}') - assert publisher.event_client_got_all.wait(timeout=test_timeout), (f'Not all data received from ESP32: {transport} ' - f'qos={qos} received: {publisher.received} ' - f'expected: {publisher.published}') + dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}') + assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (f'Not all data received from ESP32: {config["transport"]} ' + f'qos={config["qos"]} received: {publisher.received} ' + f'expected: {config["scenario"]["nr_of_msgs"]}') logging.info(' - all data received from ESP32') - payload = publisher.sample_string * publisher.repeat - for _ in range(publisher.published): + payload = config['pattern'] * config['scenario']['msg_len'] + for _ in range(config['scenario']['nr_of_msgs']): with publisher.lock: - msg = publisher.publish(topic=publisher.publish_cfg['publish_topic'], payload=payload, qos=qos) - if qos > 0: + msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos']) + if config['qos'] > 0: msgs_published.append(msg) logging.info(f'Published: {len(msgs_published)}') while msgs_published: - msgs_published = [msg for msg in msgs_published if msg.is_published()] + msgs_published = [msg for msg in msgs_published if not msg.is_published()] + + logging.info('All messages from runner published') try: - dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=test_timeout) + dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout']) except pexpect.exceptions.ExceptionPexpect: - dut.write(f'publish_report') + dut.write('publish_report') dut.expect(re.compile(rb'Test Report'), timeout=30) raise logging.info('ESP32 received all data from runner') -stress_scenarios = [{'len':20, 'repeat':50}] # many medium sized +stress_scenarios = [{'msg_len':20, 'nr_of_msgs':30}] # many medium sized transport_cases = ['tcp', 'ws', 'wss', 'ssl'] qos_cases = [0, 1, 2] enqueue_cases = [0, 1] local_broker_supported_transports = ['tcp'] -local_broker_scenarios = [{'len':0, 'repeat':5}, # zero-sized messages - {'len':5, 'repeat':20}, # short messages - {'len':500, 'repeat':10}, # long messages - {'len':20, 'repeat':20}] # many medium sized +local_broker_scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages + {'msg_len':5, 'nr_of_msgs':20}, # short messages + {'msg_len':500, 'nr_of_msgs':10}, # long messages + {'msg_len':20, 'nr_of_msgs':20}] # many medium sized -def make_cases(scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]: - return [test_case for test_case in product(transport_cases, qos_cases, enqueue_cases, scenarios)] +def make_cases(transport: Any, scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]: + return [test_case for test_case in product(transport, qos_cases, enqueue_cases, scenarios)] -test_cases = make_cases(get_scenarios()) -stress_test_cases = make_cases(stress_scenarios) +test_cases = make_cases(transport_cases, get_scenarios()) +stress_test_cases = make_cases(transport_cases, stress_scenarios) @pytest.mark.esp32 @pytest.mark.ethernet -@pytest.mark.nightly_run @pytest.mark.parametrize('test_case', test_cases) @pytest.mark.parametrize('config', ['default'], indirect=True) def test_mqtt_publish(dut: Dut, test_case: Any) -> None: - publish_cfg = get_configurations(dut) + publish_cfg = get_configurations(dut, test_case) dut.expect(re.compile(rb'mqtt>'), timeout=30) dut.confirm_write('init', expect_pattern='init', timeout=30) - run_publish_test_case(dut, test_case, publish_cfg) + run_publish_test_case(dut, publish_cfg) @pytest.mark.esp32 -@pytest.mark.ethernet +@pytest.mark.ethernet_stress @pytest.mark.nightly_run @pytest.mark.parametrize('test_case', stress_test_cases) @pytest.mark.parametrize('config', ['default'], indirect=True) def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None: - publish_cfg = get_configurations(dut) + publish_cfg = get_configurations(dut, test_case) dut.expect(re.compile(rb'mqtt>'), timeout=30) dut.write('init') - run_publish_test_case(dut, test_case, publish_cfg) + run_publish_test_case(dut, publish_cfg) @pytest.mark.esp32 @pytest.mark.ethernet -@pytest.mark.parametrize('test_case', make_cases(local_broker_scenarios)) +@pytest.mark.parametrize('test_case', make_cases(local_broker_supported_transports, local_broker_scenarios)) @pytest.mark.parametrize('config', ['local_broker'], indirect=True) -def test_mqtt_publish_lcoal(dut: Dut, test_case: Any) -> None: +def test_mqtt_publish_local(dut: Dut, test_case: Any) -> None: if test_case[0] not in local_broker_supported_transports: pytest.skip(f'Skipping transport: {test_case[0]}...') dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1) - publish_cfg = get_configurations(dut) + publish_cfg = get_configurations(dut, test_case) publish_cfg['broker_host_tcp'] = dut_ip publish_cfg['broker_port_tcp'] = 1234 dut.expect(re.compile(rb'mqtt>'), timeout=30) dut.confirm_write('init', expect_pattern='init', timeout=30) - run_publish_test_case(dut, test_case, publish_cfg) + run_publish_test_case(dut, publish_cfg)