change(mqtt): Adds retry on publish connect test case

This test case fails mostly for network related issues, with retrys we
might avoid the failure of the whole job.
This commit is contained in:
Euripedes Rocha
2025-05-06 12:35:15 +02:00
committed by Rocha Euripedes
parent eb9614d37f
commit 412e7c48ee
4 changed files with 96 additions and 43 deletions

View File

@@ -3,6 +3,7 @@
cmake_minimum_required(VERSION 3.16)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
idf_build_set_property(MINIMAL_BUILD ON)
project(mqtt_publish_connect_test)

View File

@@ -1,3 +1,4 @@
idf_component_register(SRCS "publish_test.c" "connect_test.c" "publish_connect_test.c"
INCLUDE_DIRS ".")
INCLUDE_DIRS "."
REQUIRES mqtt nvs_flash console esp_netif)
target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format")

View File

@@ -67,31 +67,44 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, "ID=%d, total_len=%d, data_len=%d, current_data_offset=%d", event->msg_id, event->total_data_len, event->data_len, event->current_data_offset);
if (event->topic) {
actual_len = event->data_len;
msg_id = event->msg_id;
} else {
actual_len += event->data_len;
// check consistency with msg_id across multiple data events for single msg
if (msg_id != event->msg_id) {
ESP_LOGI(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id);
abort();
}
}
memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len);
if (actual_len == event->total_data_len) {
if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) {
memset(test_data->received_data, 0, test_data->expected_size);
test_data->nr_of_msg_received ++;
if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) {
ESP_LOGI(TAG, "Correct pattern received exactly x times");
ESP_LOGI(TAG, "Test finished correctly!");
}
} else {
ESP_LOGE(TAG, "FAILED!");
abort();
}
}
if (event->current_data_offset == 0) {
actual_len = event->data_len;
msg_id = event->msg_id;
if (event->total_data_len != test_data->expected_size) {
ESP_LOGE(TAG, "Incorrect message size: %d != %d", event->total_data_len, test_data->expected_size);
abort();
}
} else {
actual_len += event->data_len;
// check consistency with msg_id across multiple data events for single msg
if (msg_id != event->msg_id) {
ESP_LOGE(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id);
abort();
}
}
if (event->current_data_offset + event->data_len > test_data->expected_size) {
ESP_LOGE(TAG, "Buffer overflow detected: offset %d + data_len %d > buffer size %d", event->current_data_offset, event->data_len, test_data->expected_size);
abort();
}
if (memcmp(test_data->expected + event->current_data_offset, event->data, event->data_len) != 0) {
ESP_LOGE(TAG, "Data mismatch at offset %d: \n expected %.*s, \n got %.*s", event->current_data_offset, event->data_len, test_data->expected + event->current_data_offset, event->data_len, event->data);
abort();
}
memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len);
if (actual_len == event->total_data_len) {
if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) {
memset(test_data->received_data, 0, test_data->expected_size);
test_data->nr_of_msg_received++;
if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) {
ESP_LOGI(TAG, "Correct pattern received exactly x times");
ESP_LOGI(TAG, "Test finished correctly!");
}
} else {
ESP_LOGE(TAG, "FAILED!");
abort();
}
}
break;
case MQTT_EVENT_ERROR:
ESP_LOGE(TAG, "MQTT_EVENT_ERROR");

View File

@@ -8,6 +8,7 @@ import random
import re
import ssl
import string
import time
from itertools import count
from itertools import product
from threading import Event
@@ -15,8 +16,8 @@ from threading import Lock
from typing import Any
from typing import Dict
from typing import List
from typing import no_type_check
from typing import Tuple
from typing import no_type_check
import paho.mqtt.client as mqtt
import pexpect
@@ -61,7 +62,7 @@ class MqttPublisher(mqtt.Client):
def on_connect_fail(self, mqttc: Any, obj: Any) -> None:
logging.error('Connect failed')
def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
def on_message(self, mqttc: mqtt.Client, obj: Any, msg: mqtt.MQTTMessage) -> None:
payload = msg.payload.decode('utf-8')
if payload == self.expected_data:
self.received += 1
@@ -70,8 +71,9 @@ class MqttPublisher(mqtt.Client):
else:
differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data))))
logging.error(
f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:'
f'{len(self.expected_data)}'
f'Payload on topic "{msg.topic}" (QoS {msg.qos}) differs in {differences} positions '
'from expected data. '
f'Received size: {len(payload)}, expected size: {len(self.expected_data)}.'
)
logging.info(f'Repetitions: {payload.count(self.config["pattern"])}')
logging.info(f'Pattern: {self.config["pattern"]}')
@@ -85,6 +87,7 @@ class MqttPublisher(mqtt.Client):
qos = self.config['qos']
broker_host = self.config['broker_host_' + self.config['transport']]
broker_port = self.config['broker_port_' + self.config['transport']]
connect_timeout_seconds = self.config.get('client_connect_timeout', 30)
try:
self.print_details('Connecting...')
@@ -93,14 +96,17 @@ class MqttPublisher(mqtt.Client):
self.tls_insecure_set(True)
self.event_client_connected.clear()
self.loop_start()
self.connect(broker_host, broker_port, 60)
self.connect(broker_host, broker_port, 60) # paho's keepalive
except Exception:
self.print_details(f'ENV_TEST_FAILURE: Unexpected error while connecting to broker {broker_host}')
raise
self.print_details(f'Connecting py-client to broker {broker_host}:{broker_port}...')
if not self.event_client_connected.wait(timeout=30):
raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}')
if not self.event_client_connected.wait(timeout=connect_timeout_seconds):
raise ValueError(
f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host} '
f'within {connect_timeout_seconds}s'
)
self.event_client_got_all.clear()
result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos)
assert result == 0
@@ -148,7 +154,11 @@ def get_configurations(dut: Dut, test_case: Any) -> Dict[str, Any]:
publish_cfg['pattern'] = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)
)
publish_cfg['client_connect_timeout'] = 30
publish_cfg['dut_subscribe_timeout'] = 60
publish_cfg['publish_ack_timeout'] = 60
publish_cfg['test_timeout'] = get_timeout(test_case)
unique_topic = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE)
)
@@ -159,9 +169,10 @@ def get_configurations(dut: Dut, test_case: Any) -> Dict[str, Any]:
@contextlib.contextmanager
def connected_and_subscribed(dut: Dut) -> Any:
def connected_and_subscribed(dut: Dut, config: Dict[str, Any]) -> Any:
dut.write('start')
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60)
dut_subscribe_timeout = config.get('dut_subscribe_timeout', 60)
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=dut_subscribe_timeout)
yield
dut.write('stop')
@@ -177,6 +188,7 @@ def get_scenarios() -> List[Dict[str, int]]:
continue
break
if not scenarios: # No message sizes present in the env - set defaults
logging.info('Using predefined cases')
scenarios = [
{'msg_len': 0, 'nr_of_msgs': 5}, # zero-sized messages
{'msg_len': 2, 'nr_of_msgs': 5}, # short messages
@@ -201,13 +213,15 @@ def run_publish_test_case(dut: Dut, config: Any) -> None:
logging.info(
f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},'
f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},'
f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}'
f' msg_size:{config["scenario"]["msg_len"]}, enqueue:{config["enqueue"]}'
)
dut.write(
f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}'
f'publish_setup {config["transport"]} {config["publish_topic"]}'
f' {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}'
)
with MqttPublisher(config) as publisher, connected_and_subscribed(dut):
assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe'
with MqttPublisher(config) as publisher, connected_and_subscribed(dut, config):
py_client_subscribe_timeout = config.get('py_client_subscribe_timeout', config['test_timeout'])
assert publisher.event_client_subscribed.wait(timeout=py_client_subscribe_timeout), 'Runner failed to subscribe'
msgs_published: List[mqtt.MQTTMessageInfo] = []
dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}')
assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (
@@ -222,11 +236,33 @@ def run_publish_test_case(dut: Dut, config: Any) -> None:
msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos'])
if config['qos'] > 0:
msgs_published.append(msg)
logging.info(f'Published: {len(msgs_published)}')
while msgs_published:
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
logging.info(f'Published: {len(msgs_published)} messages from script with QoS > 0 needing ACK.')
logging.info('All messages from runner published')
if msgs_published:
publish_ack_timeout_seconds = config.get('publish_ack_timeout', 60) # Default 60s, make configurable
ack_wait_start_time = time.time()
initial_unacked_count = len(msgs_published)
logging.info(f'Waiting {initial_unacked_count} publish ack with timeout {publish_ack_timeout_seconds}s...')
while msgs_published:
if time.time() - ack_wait_start_time > publish_ack_timeout_seconds:
unacked_mids = [msg.mid for msg in msgs_published if msg.mid is not None and not msg.is_published()]
logging.error(
f'Timeout waiting for publish acknowledgements. '
f'{len(unacked_mids)} of {initial_unacked_count} messages remain unacknowledged. '
f'Unacked MIDs: {unacked_mids}'
)
# This will likely cause the test to fail at a later assertion,
# or you could raise an explicit error here.
# e.g. raise Exception('Timeout waiting for publish acknowledgements')
break
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
if msgs_published: # Avoid busy-looping if list is not empty
time.sleep(0.1) # Brief pause
if not msgs_published:
logging.info('All script-published QoS > 0 messages acknowledged by broker.')
logging.info('All messages from runner published (or timed out waiting for ACK).')
try:
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout'])
@@ -262,6 +298,7 @@ stress_test_cases = make_cases(transport_cases, stress_scenarios)
@pytest.mark.parametrize('test_case', test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
@@ -273,6 +310,7 @@ def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
@pytest.mark.nightly_run
@pytest.mark.parametrize('test_case', stress_test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut, test_case)