diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild b/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild index 8e4cfe56e6..dce9ce2530 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/Kconfig.projbuild @@ -24,18 +24,6 @@ menu "Example Configuration" help URL of an mqtt broker for wss transport - config EXAMPLE_PUBLISH_TOPIC - string "publish topic" - default "/topic/publish/esp2py" - help - topic to which esp32 client publishes - - config EXAMPLE_SUBSCRIBE_TOPIC - string "subscribe topic" - default "/topic/subscribe/py2esp" - help - topic to which esp32 client subscribes (and expects data) - config EXAMPLE_BROKER_CERTIFICATE_OVERRIDE string "Broker certificate override" default "" diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c index 23a58a6750..d720a6d26f 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.c @@ -152,6 +152,8 @@ static int do_publish_setup(int argc, char **argv) { command_context.data = calloc(1, sizeof(publish_context_t)); ((publish_context_t*)command_context.data)->pattern = strdup(*publish_setup_args.pattern->sval); ((publish_context_t*)command_context.data)->pattern_repetitions = *publish_setup_args.pattern_repetitions->ival; + ((publish_context_t*)command_context.data)->subscribe_to = strdup(*publish_setup_args.subscribe_to->sval); + ((publish_context_t*)command_context.data)->publish_to = strdup(*publish_setup_args.publish_to->sval); publish_setup(&command_context, *publish_setup_args.transport->sval); return 0; } @@ -210,6 +212,8 @@ void register_common_commands(void) { } void register_publish_commands(void) { publish_setup_args.transport = arg_str1(NULL,NULL,"", "Selected transport to test"); + publish_setup_args.publish_to = arg_str1(NULL,NULL,"", "Selected publish_to to publish"); + publish_setup_args.subscribe_to = arg_str1(NULL,NULL,"", "Selected subscribe_to to publish"); publish_setup_args.pattern = arg_str1(NULL,NULL,"", "Message pattern repeated to build big messages"); publish_setup_args.pattern_repetitions = arg_int1(NULL,NULL,"", "How many times the pattern is repeated"); publish_setup_args.end = arg_end(1); @@ -220,7 +224,7 @@ void register_publish_commands(void) { publish_args.end = arg_end(1); const esp_console_cmd_t publish_setup = { .command = "publish_setup", - .help = "Run publish test\n", + .help = "Set publish test parameters\n", .hint = NULL, .func = &do_publish_setup, .argtable = &publish_setup_args diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h index 454e82e53f..c5311ecc24 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_connect_test.h @@ -17,6 +17,8 @@ typedef struct { typedef struct { transport_t selected_transport; char *pattern; + char *subscribe_to; + char *publish_to; int pattern_repetitions; int qos; char *expected; @@ -41,6 +43,8 @@ typedef struct { typedef struct { struct arg_str *transport; + struct arg_str *subscribe_to; + struct arg_str *publish_to; struct arg_str *pattern; struct arg_int *pattern_repetitions; struct arg_end *end; diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c index 861f917428..7d11ae31e3 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c @@ -46,8 +46,8 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_ case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); xEventGroupSetBits(mqtt_event_group, CONNECTED_BIT); - msg_id = esp_mqtt_client_subscribe(client, CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, test_data->qos); - ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", CONFIG_EXAMPLE_SUBSCRIBE_TOPIC, msg_id); + msg_id = esp_mqtt_client_subscribe(client, test_data->subscribe_to, test_data->qos); + ESP_LOGI(TAG, "sent subscribe successful %s , msg_id=%d", test_data->subscribe_to, msg_id); break; case MQTT_EVENT_DISCONNECTED: @@ -203,9 +203,9 @@ void publish_test(command_context_t * ctx, int expect_to_publish, int qos, bool for (int i = 0; i < data->nr_of_msg_expected; i++) { int msg_id; if (enqueue) { - msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0, true); + msg_id = esp_mqtt_client_enqueue(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0, true); } else { - msg_id = esp_mqtt_client_publish(ctx->mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, data->expected, data->expected_size, qos, 0); + msg_id = esp_mqtt_client_publish(ctx->mqtt_client, data->publish_to, data->expected, data->expected_size, qos, 0); if(msg_id < 0) { ESP_LOGE(TAG, "Failed to publish"); break; diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py b/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py index 366c56f255..b2df800002 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py @@ -1,6 +1,5 @@ # SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 - import contextlib import difflib import logging @@ -30,18 +29,17 @@ DEFAULT_MSG_SIZE = 16 # Publisher class creating a python client to send/receive published data from esp-mqtt client class MqttPublisher(mqtt.Client): - def __init__(self, repeat, published, publish_cfg, log_details=False): # type: (MqttPublisher, int, int, dict, bool) -> None - self.sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)) + def __init__(self, config, log_details=False): # type: (MqttPublisher, dict, bool) -> None self.log_details = log_details - self.repeat = repeat - self.publish_cfg = publish_cfg - self.expected_data = f'{self.sample_string * self.repeat}' - self.published = published + self.config = config + self.expected_data = f'{config["pattern"] * config["scenario"]["msg_len"]}' self.received = 0 + self.subscribe_mid = 0 self.lock = Lock() self.event_client_connected = Event() + self.event_client_subscribed = Event() self.event_client_got_all = Event() - transport = 'websockets' if self.publish_cfg['transport'] in ['ws', 'wss'] else 'tcp' + transport = 'websockets' if self.config['transport'] in ['ws', 'wss'] else 'tcp' client_id = 'MqttTestRunner' + ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(5)) super().__init__(client_id, userdata=0, transport=transport) @@ -49,6 +47,12 @@ class MqttPublisher(mqtt.Client): if self.log_details: logging.info(text) + def on_subscribe(self, client: Any, userdata: Any, mid: Any, granted_qos: Any) -> None: + """Verify successful subscription.""" + if mid == self.subscribe_mid: + logging.info(f'Subscribed to {self.config["subscribe_topic"]} successfully with QoS: {granted_qos}') + self.event_client_subscribed.set() + def on_connect(self, mqttc: Any, obj: Any, flags: Any, rc:int) -> None: self.event_client_connected.set() @@ -58,31 +62,29 @@ class MqttPublisher(mqtt.Client): def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None: payload = msg.payload.decode('utf-8') if payload == self.expected_data: - userdata += 1 - self.user_data_set(userdata) - self.received = userdata - if userdata == self.published: + self.received += 1 + if self.received == self.config['scenario']['nr_of_msgs']: self.event_client_got_all.set() else: differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data)))) logging.error(f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:' f'{len(self.expected_data)}') - logging.info(f'Repetitions: {payload.count(self.sample_string)}') - logging.info(f'Pattern: {self.sample_string}') - logging.info(f'First : {payload[:DEFAULT_MSG_SIZE]}') - logging.info(f'Last : {payload[-DEFAULT_MSG_SIZE:]}') + logging.info(f'Repetitions: {payload.count(self.config["pattern"])}') + logging.info(f'Pattern: {self.config["pattern"]}') + logging.info(f'First: {payload[:DEFAULT_MSG_SIZE]}') + logging.info(f'Last: {payload[-DEFAULT_MSG_SIZE:]}') matcher = difflib.SequenceMatcher(a=payload, b=self.expected_data) for match in matcher.get_matching_blocks(): logging.info(f'Match: {match}') def __enter__(self) -> Any: - qos = self.publish_cfg['qos'] - broker_host = self.publish_cfg['broker_host_' + self.publish_cfg['transport']] - broker_port = self.publish_cfg['broker_port_' + self.publish_cfg['transport']] + qos = self.config['qos'] + broker_host = self.config['broker_host_' + self.config['transport']] + broker_port = self.config['broker_port_' + self.config['transport']] try: self.print_details('Connecting...') - if self.publish_cfg['transport'] in ['ssl', 'wss']: + if self.config['transport'] in ['ssl', 'wss']: self.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) self.tls_insecure_set(True) self.event_client_connected.clear() @@ -96,7 +98,8 @@ class MqttPublisher(mqtt.Client): if not self.event_client_connected.wait(timeout=30): raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}') self.event_client_got_all.clear() - self.subscribe(self.publish_cfg['subscribe_topic'], qos) + result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos) + assert result == 0 return self def __exit__(self, exc_type, exc_value, traceback): # type: (MqttPublisher, str, str, dict) -> None @@ -104,38 +107,47 @@ class MqttPublisher(mqtt.Client): self.loop_stop() -def get_configurations(dut: Dut) -> Dict[str,Any]: +def get_configurations(dut: Dut, test_case: Any) -> Dict[str,Any]: publish_cfg = {} try: @no_type_check - def get_broker_from_dut(dut, config_option): + def get_config_from_dut(dut, config_option): # logging.info('Option:', config_option, dut.app.sdkconfig.get(config_option)) value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get(config_option)) if value is None: return None, None return value.group(1), int(value.group(2)) # Get publish test configuration - publish_cfg['publish_topic'] = dut.app.sdkconfig.get('EXAMPLE_SUBSCRIBE_TOPIC').replace('"','') - publish_cfg['subscribe_topic'] = dut.app.sdkconfig.get('EXAMPLE_PUBLISH_TOPIC').replace('"','') - publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI') - publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI') - publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WS_URI') - publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_broker_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI') + publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_SSL_URI') + publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_TCP_URI') + publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WS_URI') + publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI') except Exception: logging.info('ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig') raise + transport, qos, enqueue, scenario = test_case + if publish_cfg['broker_host_' + transport] is None: + pytest.skip(f'Skipping transport: {transport}...') + publish_cfg['scenario'] = scenario + publish_cfg['qos'] = qos + publish_cfg['enqueue'] = enqueue + publish_cfg['transport'] = transport + publish_cfg['pattern'] = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)) + publish_cfg['test_timeout'] = get_timeout(test_case) + unique_topic = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE)) + publish_cfg['subscribe_topic'] = 'test/subscribe_to/' + unique_topic + publish_cfg['publish_topic'] = 'test/subscribe_to/' + unique_topic logging.info(f'configuration: {publish_cfg}') return publish_cfg @contextlib.contextmanager -def connected_and_subscribed(dut:Dut, transport:str, pattern:str, pattern_repetitions:int) -> Any: - dut.write(f'publish_setup {transport} {pattern} {pattern_repetitions}') - dut.write(f'start') +def connected_and_subscribed(dut:Dut) -> Any: + dut.write('start') dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60) yield - dut.write(f'stop') + dut.write('stop') def get_scenarios() -> List[Dict[str, int]]: @@ -149,16 +161,16 @@ def get_scenarios() -> List[Dict[str, int]]: continue break if not scenarios: # No message sizes present in the env - set defaults - scenarios = [{'len':0, 'repeat':5}, # zero-sized messages - {'len':2, 'repeat':5}, # short messages - {'len':200, 'repeat':3}, # long messages + scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages + {'msg_len':2, 'nr_of_msgs':5}, # short messages + {'msg_len':200, 'nr_of_msgs':3}, # long messages ] return scenarios def get_timeout(test_case: Any) -> int: transport, qos, enqueue, scenario = test_case - timeout = int(scenario['repeat'] * 10) + timeout = int(scenario['nr_of_msgs'] * 20) if qos == 1: timeout += 30 if qos == 2: @@ -168,37 +180,33 @@ def get_timeout(test_case: Any) -> int: return timeout -def run_publish_test_case(dut: Dut, test_case: Any, publish_cfg: Any) -> None: - transport, qos, enqueue, scenario = test_case - if publish_cfg['broker_host_' + transport] is None: - pytest.skip(f'Skipping transport: {transport}...') - repeat = scenario['len'] - published = scenario['repeat'] - publish_cfg['qos'] = qos - publish_cfg['queue'] = enqueue - publish_cfg['transport'] = transport - test_timeout = get_timeout(test_case) - logging.info(f'Starting Publish test: transport:{transport}, qos:{qos}, nr_of_msgs:{published},' - f' msg_size:{repeat * DEFAULT_MSG_SIZE}, enqueue:{enqueue}') - with MqttPublisher(repeat, published, publish_cfg) as publisher, connected_and_subscribed(dut, transport, publisher.sample_string, scenario['len']): +def run_publish_test_case(dut: Dut, config: Any) -> None: + logging.info(f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},' + f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},' + f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}') + dut.write(f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}') + with MqttPublisher(config) as publisher, connected_and_subscribed(dut): + assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe' msgs_published: List[mqtt.MQTTMessageInfo] = [] - dut.write(f'publish {publisher.published} {qos} {enqueue}') - assert publisher.event_client_got_all.wait(timeout=test_timeout), (f'Not all data received from ESP32: {transport} ' - f'qos={qos} received: {publisher.received} ' - f'expected: {publisher.published}') + dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}') + assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (f'Not all data received from ESP32: {config["transport"]} ' + f'qos={config["qos"]} received: {publisher.received} ' + f'expected: {config["scenario"]["nr_of_msgs"]}') logging.info(' - all data received from ESP32') - payload = publisher.sample_string * publisher.repeat - for _ in range(publisher.published): + payload = config['pattern'] * config['scenario']['msg_len'] + for _ in range(config['scenario']['nr_of_msgs']): with publisher.lock: - msg = publisher.publish(topic=publisher.publish_cfg['publish_topic'], payload=payload, qos=qos) - if qos > 0: + msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos']) + if config['qos'] > 0: msgs_published.append(msg) logging.info(f'Published: {len(msgs_published)}') while msgs_published: - msgs_published = [msg for msg in msgs_published if msg.is_published()] + msgs_published = [msg for msg in msgs_published if not msg.is_published()] + + logging.info('All messages from runner published') try: - dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=test_timeout) + dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout']) except pexpect.exceptions.ExceptionPexpect: dut.write('publish_report') dut.expect(re.compile(rb'Test Report'), timeout=30) @@ -206,35 +214,34 @@ def run_publish_test_case(dut: Dut, test_case: Any, publish_cfg: Any) -> None: logging.info('ESP32 received all data from runner') -stress_scenarios = [{'len':20, 'repeat':30}] # many medium sized +stress_scenarios = [{'msg_len':20, 'nr_of_msgs':30}] # many medium sized transport_cases = ['tcp', 'ws', 'wss', 'ssl'] qos_cases = [0, 1, 2] enqueue_cases = [0, 1] local_broker_supported_transports = ['tcp'] -local_broker_scenarios = [{'len':0, 'repeat':5}, # zero-sized messages - {'len':5, 'repeat':20}, # short messages - {'len':500, 'repeat':10}, # long messages - {'len':20, 'repeat':20}] # many medium sized +local_broker_scenarios = [{'msg_len':0, 'nr_of_msgs':5}, # zero-sized messages + {'msg_len':5, 'nr_of_msgs':20}, # short messages + {'msg_len':500, 'nr_of_msgs':10}, # long messages + {'msg_len':20, 'nr_of_msgs':20}] # many medium sized -def make_cases(scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]: - return [test_case for test_case in product(transport_cases, qos_cases, enqueue_cases, scenarios)] +def make_cases(transport: Any, scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]: + return [test_case for test_case in product(transport, qos_cases, enqueue_cases, scenarios)] -test_cases = make_cases(get_scenarios()) -stress_test_cases = make_cases(stress_scenarios) +test_cases = make_cases(transport_cases, get_scenarios()) +stress_test_cases = make_cases(transport_cases, stress_scenarios) @pytest.mark.esp32 @pytest.mark.ethernet -@pytest.mark.nightly_run @pytest.mark.parametrize('test_case', test_cases) @pytest.mark.parametrize('config', ['default'], indirect=True) def test_mqtt_publish(dut: Dut, test_case: Any) -> None: - publish_cfg = get_configurations(dut) + publish_cfg = get_configurations(dut, test_case) dut.expect(re.compile(rb'mqtt>'), timeout=30) dut.confirm_write('init', expect_pattern='init', timeout=30) - run_publish_test_case(dut, test_case, publish_cfg) + run_publish_test_case(dut, publish_cfg) @pytest.mark.esp32 @@ -243,23 +250,23 @@ def test_mqtt_publish(dut: Dut, test_case: Any) -> None: @pytest.mark.parametrize('test_case', stress_test_cases) @pytest.mark.parametrize('config', ['default'], indirect=True) def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None: - publish_cfg = get_configurations(dut) + publish_cfg = get_configurations(dut, test_case) dut.expect(re.compile(rb'mqtt>'), timeout=30) dut.write('init') - run_publish_test_case(dut, test_case, publish_cfg) + run_publish_test_case(dut, publish_cfg) @pytest.mark.esp32 @pytest.mark.ethernet -@pytest.mark.parametrize('test_case', make_cases(local_broker_scenarios)) +@pytest.mark.parametrize('test_case', make_cases(local_broker_supported_transports, local_broker_scenarios)) @pytest.mark.parametrize('config', ['local_broker'], indirect=True) -def test_mqtt_publish_lcoal(dut: Dut, test_case: Any) -> None: +def test_mqtt_publish_local(dut: Dut, test_case: Any) -> None: if test_case[0] not in local_broker_supported_transports: pytest.skip(f'Skipping transport: {test_case[0]}...') dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1) - publish_cfg = get_configurations(dut) + publish_cfg = get_configurations(dut, test_case) publish_cfg['broker_host_tcp'] = dut_ip publish_cfg['broker_port_tcp'] = 1234 dut.expect(re.compile(rb'mqtt>'), timeout=30) dut.confirm_write('init', expect_pattern='init', timeout=30) - run_publish_test_case(dut, test_case, publish_cfg) + run_publish_test_case(dut, publish_cfg)