From 412e7c48ee01e7119ead882b6ee12177c4e03e36 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Tue, 6 May 2025 12:35:15 +0200 Subject: [PATCH] change(mqtt): Adds retry on publish connect test case This test case fails mostly for network related issues, with retrys we might avoid the failure of the whole job. --- .../mqtt/publish_connect_test/CMakeLists.txt | 1 + .../publish_connect_test/main/CMakeLists.txt | 3 +- .../publish_connect_test/main/publish_test.c | 63 +++++++++------- .../pytest_mqtt_publish_app.py | 72 ++++++++++++++----- 4 files changed, 96 insertions(+), 43 deletions(-) diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/CMakeLists.txt b/tools/test_apps/protocols/mqtt/publish_connect_test/CMakeLists.txt index 4776e7424d..543259e6a2 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/CMakeLists.txt +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/CMakeLists.txt @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 3.16) include($ENV{IDF_PATH}/tools/cmake/project.cmake) +idf_build_set_property(MINIMAL_BUILD ON) project(mqtt_publish_connect_test) diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/CMakeLists.txt b/tools/test_apps/protocols/mqtt/publish_connect_test/main/CMakeLists.txt index 657a574868..326ecd640b 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/CMakeLists.txt +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/CMakeLists.txt @@ -1,3 +1,4 @@ idf_component_register(SRCS "publish_test.c" "connect_test.c" "publish_connect_test.c" - INCLUDE_DIRS ".") + INCLUDE_DIRS "." + REQUIRES mqtt nvs_flash console esp_netif) target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format") diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c index 7d11ae31e3..fe373b6957 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c @@ -67,31 +67,44 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_ ESP_LOGI(TAG, "MQTT_EVENT_DATA"); ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic); ESP_LOGI(TAG, "ID=%d, total_len=%d, data_len=%d, current_data_offset=%d", event->msg_id, event->total_data_len, event->data_len, event->current_data_offset); - if (event->topic) { - actual_len = event->data_len; - msg_id = event->msg_id; - } else { - actual_len += event->data_len; - // check consistency with msg_id across multiple data events for single msg - if (msg_id != event->msg_id) { - ESP_LOGI(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id); - abort(); - } - } - memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len); - if (actual_len == event->total_data_len) { - if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) { - memset(test_data->received_data, 0, test_data->expected_size); - test_data->nr_of_msg_received ++; - if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) { - ESP_LOGI(TAG, "Correct pattern received exactly x times"); - ESP_LOGI(TAG, "Test finished correctly!"); - } - } else { - ESP_LOGE(TAG, "FAILED!"); - abort(); - } - } + if (event->current_data_offset == 0) { + actual_len = event->data_len; + msg_id = event->msg_id; + if (event->total_data_len != test_data->expected_size) { + ESP_LOGE(TAG, "Incorrect message size: %d != %d", event->total_data_len, test_data->expected_size); + abort(); + } + } else { + actual_len += event->data_len; + // check consistency with msg_id across multiple data events for single msg + if (msg_id != event->msg_id) { + ESP_LOGE(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id); + abort(); + } + } + if (event->current_data_offset + event->data_len > test_data->expected_size) { + ESP_LOGE(TAG, "Buffer overflow detected: offset %d + data_len %d > buffer size %d", event->current_data_offset, event->data_len, test_data->expected_size); + abort(); + } + if (memcmp(test_data->expected + event->current_data_offset, event->data, event->data_len) != 0) { + ESP_LOGE(TAG, "Data mismatch at offset %d: \n expected %.*s, \n got %.*s", event->current_data_offset, event->data_len, test_data->expected + event->current_data_offset, event->data_len, event->data); + abort(); + } + + memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len); + if (actual_len == event->total_data_len) { + if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) { + memset(test_data->received_data, 0, test_data->expected_size); + test_data->nr_of_msg_received++; + if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) { + ESP_LOGI(TAG, "Correct pattern received exactly x times"); + ESP_LOGI(TAG, "Test finished correctly!"); + } + } else { + ESP_LOGE(TAG, "FAILED!"); + abort(); + } + } break; case MQTT_EVENT_ERROR: ESP_LOGE(TAG, "MQTT_EVENT_ERROR"); diff --git a/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py b/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py index 4eb25d1f1e..2c39034de5 100644 --- a/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py +++ b/tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py @@ -8,6 +8,7 @@ import random import re import ssl import string +import time from itertools import count from itertools import product from threading import Event @@ -15,8 +16,8 @@ from threading import Lock from typing import Any from typing import Dict from typing import List -from typing import no_type_check from typing import Tuple +from typing import no_type_check import paho.mqtt.client as mqtt import pexpect @@ -61,7 +62,7 @@ class MqttPublisher(mqtt.Client): def on_connect_fail(self, mqttc: Any, obj: Any) -> None: logging.error('Connect failed') - def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None: + def on_message(self, mqttc: mqtt.Client, obj: Any, msg: mqtt.MQTTMessage) -> None: payload = msg.payload.decode('utf-8') if payload == self.expected_data: self.received += 1 @@ -70,8 +71,9 @@ class MqttPublisher(mqtt.Client): else: differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data)))) logging.error( - f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:' - f'{len(self.expected_data)}' + f'Payload on topic "{msg.topic}" (QoS {msg.qos}) differs in {differences} positions ' + 'from expected data. ' + f'Received size: {len(payload)}, expected size: {len(self.expected_data)}.' ) logging.info(f'Repetitions: {payload.count(self.config["pattern"])}') logging.info(f'Pattern: {self.config["pattern"]}') @@ -85,6 +87,7 @@ class MqttPublisher(mqtt.Client): qos = self.config['qos'] broker_host = self.config['broker_host_' + self.config['transport']] broker_port = self.config['broker_port_' + self.config['transport']] + connect_timeout_seconds = self.config.get('client_connect_timeout', 30) try: self.print_details('Connecting...') @@ -93,14 +96,17 @@ class MqttPublisher(mqtt.Client): self.tls_insecure_set(True) self.event_client_connected.clear() self.loop_start() - self.connect(broker_host, broker_port, 60) + self.connect(broker_host, broker_port, 60) # paho's keepalive except Exception: self.print_details(f'ENV_TEST_FAILURE: Unexpected error while connecting to broker {broker_host}') raise self.print_details(f'Connecting py-client to broker {broker_host}:{broker_port}...') - if not self.event_client_connected.wait(timeout=30): - raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}') + if not self.event_client_connected.wait(timeout=connect_timeout_seconds): + raise ValueError( + f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host} ' + f'within {connect_timeout_seconds}s' + ) self.event_client_got_all.clear() result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos) assert result == 0 @@ -148,7 +154,11 @@ def get_configurations(dut: Dut, test_case: Any) -> Dict[str, Any]: publish_cfg['pattern'] = ''.join( random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE) ) + publish_cfg['client_connect_timeout'] = 30 + publish_cfg['dut_subscribe_timeout'] = 60 + publish_cfg['publish_ack_timeout'] = 60 publish_cfg['test_timeout'] = get_timeout(test_case) + unique_topic = ''.join( random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE) ) @@ -159,9 +169,10 @@ def get_configurations(dut: Dut, test_case: Any) -> Dict[str, Any]: @contextlib.contextmanager -def connected_and_subscribed(dut: Dut) -> Any: +def connected_and_subscribed(dut: Dut, config: Dict[str, Any]) -> Any: dut.write('start') - dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60) + dut_subscribe_timeout = config.get('dut_subscribe_timeout', 60) + dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=dut_subscribe_timeout) yield dut.write('stop') @@ -177,6 +188,7 @@ def get_scenarios() -> List[Dict[str, int]]: continue break if not scenarios: # No message sizes present in the env - set defaults + logging.info('Using predefined cases') scenarios = [ {'msg_len': 0, 'nr_of_msgs': 5}, # zero-sized messages {'msg_len': 2, 'nr_of_msgs': 5}, # short messages @@ -201,13 +213,15 @@ def run_publish_test_case(dut: Dut, config: Any) -> None: logging.info( f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},' f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},' - f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}' + f' msg_size:{config["scenario"]["msg_len"]}, enqueue:{config["enqueue"]}' ) dut.write( - f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}' + f'publish_setup {config["transport"]} {config["publish_topic"]}' + f' {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}' ) - with MqttPublisher(config) as publisher, connected_and_subscribed(dut): - assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe' + with MqttPublisher(config) as publisher, connected_and_subscribed(dut, config): + py_client_subscribe_timeout = config.get('py_client_subscribe_timeout', config['test_timeout']) + assert publisher.event_client_subscribed.wait(timeout=py_client_subscribe_timeout), 'Runner failed to subscribe' msgs_published: List[mqtt.MQTTMessageInfo] = [] dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}') assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), ( @@ -222,11 +236,33 @@ def run_publish_test_case(dut: Dut, config: Any) -> None: msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos']) if config['qos'] > 0: msgs_published.append(msg) - logging.info(f'Published: {len(msgs_published)}') - while msgs_published: - msgs_published = [msg for msg in msgs_published if not msg.is_published()] + logging.info(f'Published: {len(msgs_published)} messages from script with QoS > 0 needing ACK.') - logging.info('All messages from runner published') + if msgs_published: + publish_ack_timeout_seconds = config.get('publish_ack_timeout', 60) # Default 60s, make configurable + ack_wait_start_time = time.time() + initial_unacked_count = len(msgs_published) + logging.info(f'Waiting {initial_unacked_count} publish ack with timeout {publish_ack_timeout_seconds}s...') + + while msgs_published: + if time.time() - ack_wait_start_time > publish_ack_timeout_seconds: + unacked_mids = [msg.mid for msg in msgs_published if msg.mid is not None and not msg.is_published()] + logging.error( + f'Timeout waiting for publish acknowledgements. ' + f'{len(unacked_mids)} of {initial_unacked_count} messages remain unacknowledged. ' + f'Unacked MIDs: {unacked_mids}' + ) + # This will likely cause the test to fail at a later assertion, + # or you could raise an explicit error here. + # e.g. raise Exception('Timeout waiting for publish acknowledgements') + break + msgs_published = [msg for msg in msgs_published if not msg.is_published()] + if msgs_published: # Avoid busy-looping if list is not empty + time.sleep(0.1) # Brief pause + if not msgs_published: + logging.info('All script-published QoS > 0 messages acknowledged by broker.') + + logging.info('All messages from runner published (or timed out waiting for ACK).') try: dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout']) @@ -262,6 +298,7 @@ stress_test_cases = make_cases(transport_cases, stress_scenarios) @pytest.mark.parametrize('test_case', test_cases) @pytest.mark.parametrize('config', ['default'], indirect=True) @idf_parametrize('target', ['esp32'], indirect=['target']) +@pytest.mark.flaky(reruns=1, reruns_delay=1) def test_mqtt_publish(dut: Dut, test_case: Any) -> None: publish_cfg = get_configurations(dut, test_case) dut.expect(re.compile(rb'mqtt>'), timeout=30) @@ -273,6 +310,7 @@ def test_mqtt_publish(dut: Dut, test_case: Any) -> None: @pytest.mark.nightly_run @pytest.mark.parametrize('test_case', stress_test_cases) @pytest.mark.parametrize('config', ['default'], indirect=True) +@pytest.mark.flaky(reruns=1, reruns_delay=1) @idf_parametrize('target', ['esp32'], indirect=['target']) def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None: publish_cfg = get_configurations(dut, test_case)