Merge branch 'update_mqtt_pre_refactor' into 'master'

Update esp-mqtt submodule to 6af4446a

Closes IDFGH-11179, IDFGH-14022, IDFGH-14489, and IDFGH-14616

See merge request espressif/esp-idf!38893
This commit is contained in:
Rocha Euripedes
2025-05-22 16:59:13 +08:00
5 changed files with 97 additions and 44 deletions

View File

@@ -3,6 +3,7 @@
cmake_minimum_required(VERSION 3.16)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
idf_build_set_property(MINIMAL_BUILD ON)
project(mqtt_publish_connect_test)

View File

@@ -1,3 +1,4 @@
idf_component_register(SRCS "publish_test.c" "connect_test.c" "publish_connect_test.c"
INCLUDE_DIRS ".")
INCLUDE_DIRS "."
REQUIRES mqtt nvs_flash console esp_netif)
target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format")

View File

@@ -67,31 +67,44 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, "ID=%d, total_len=%d, data_len=%d, current_data_offset=%d", event->msg_id, event->total_data_len, event->data_len, event->current_data_offset);
if (event->topic) {
actual_len = event->data_len;
msg_id = event->msg_id;
} else {
actual_len += event->data_len;
// check consistency with msg_id across multiple data events for single msg
if (msg_id != event->msg_id) {
ESP_LOGI(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id);
abort();
}
}
memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len);
if (actual_len == event->total_data_len) {
if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) {
memset(test_data->received_data, 0, test_data->expected_size);
test_data->nr_of_msg_received ++;
if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) {
ESP_LOGI(TAG, "Correct pattern received exactly x times");
ESP_LOGI(TAG, "Test finished correctly!");
}
} else {
ESP_LOGE(TAG, "FAILED!");
abort();
}
}
if (event->current_data_offset == 0) {
actual_len = event->data_len;
msg_id = event->msg_id;
if (event->total_data_len != test_data->expected_size) {
ESP_LOGE(TAG, "Incorrect message size: %d != %d", event->total_data_len, test_data->expected_size);
abort();
}
} else {
actual_len += event->data_len;
// check consistency with msg_id across multiple data events for single msg
if (msg_id != event->msg_id) {
ESP_LOGE(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id);
abort();
}
}
if (event->current_data_offset + event->data_len > test_data->expected_size) {
ESP_LOGE(TAG, "Buffer overflow detected: offset %d + data_len %d > buffer size %d", event->current_data_offset, event->data_len, test_data->expected_size);
abort();
}
if (memcmp(test_data->expected + event->current_data_offset, event->data, event->data_len) != 0) {
ESP_LOGE(TAG, "Data mismatch at offset %d: \n expected %.*s, \n got %.*s", event->current_data_offset, event->data_len, test_data->expected + event->current_data_offset, event->data_len, event->data);
abort();
}
memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len);
if (actual_len == event->total_data_len) {
if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) {
memset(test_data->received_data, 0, test_data->expected_size);
test_data->nr_of_msg_received++;
if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) {
ESP_LOGI(TAG, "Correct pattern received exactly x times");
ESP_LOGI(TAG, "Test finished correctly!");
}
} else {
ESP_LOGE(TAG, "FAILED!");
abort();
}
}
break;
case MQTT_EVENT_ERROR:
ESP_LOGE(TAG, "MQTT_EVENT_ERROR");

View File

@@ -8,6 +8,7 @@ import random
import re
import ssl
import string
import time
from itertools import count
from itertools import product
from threading import Event
@@ -15,8 +16,8 @@ from threading import Lock
from typing import Any
from typing import Dict
from typing import List
from typing import no_type_check
from typing import Tuple
from typing import no_type_check
import paho.mqtt.client as mqtt
import pexpect
@@ -61,7 +62,7 @@ class MqttPublisher(mqtt.Client):
def on_connect_fail(self, mqttc: Any, obj: Any) -> None:
logging.error('Connect failed')
def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
def on_message(self, mqttc: mqtt.Client, obj: Any, msg: mqtt.MQTTMessage) -> None:
payload = msg.payload.decode('utf-8')
if payload == self.expected_data:
self.received += 1
@@ -70,8 +71,9 @@ class MqttPublisher(mqtt.Client):
else:
differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data))))
logging.error(
f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:'
f'{len(self.expected_data)}'
f'Payload on topic "{msg.topic}" (QoS {msg.qos}) differs in {differences} positions '
'from expected data. '
f'Received size: {len(payload)}, expected size: {len(self.expected_data)}.'
)
logging.info(f'Repetitions: {payload.count(self.config["pattern"])}')
logging.info(f'Pattern: {self.config["pattern"]}')
@@ -85,6 +87,7 @@ class MqttPublisher(mqtt.Client):
qos = self.config['qos']
broker_host = self.config['broker_host_' + self.config['transport']]
broker_port = self.config['broker_port_' + self.config['transport']]
connect_timeout_seconds = self.config.get('client_connect_timeout', 30)
try:
self.print_details('Connecting...')
@@ -93,14 +96,17 @@ class MqttPublisher(mqtt.Client):
self.tls_insecure_set(True)
self.event_client_connected.clear()
self.loop_start()
self.connect(broker_host, broker_port, 60)
self.connect(broker_host, broker_port, 60) # paho's keepalive
except Exception:
self.print_details(f'ENV_TEST_FAILURE: Unexpected error while connecting to broker {broker_host}')
raise
self.print_details(f'Connecting py-client to broker {broker_host}:{broker_port}...')
if not self.event_client_connected.wait(timeout=30):
raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}')
if not self.event_client_connected.wait(timeout=connect_timeout_seconds):
raise ValueError(
f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host} '
f'within {connect_timeout_seconds}s'
)
self.event_client_got_all.clear()
result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos)
assert result == 0
@@ -148,7 +154,11 @@ def get_configurations(dut: Dut, test_case: Any) -> Dict[str, Any]:
publish_cfg['pattern'] = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)
)
publish_cfg['client_connect_timeout'] = 30
publish_cfg['dut_subscribe_timeout'] = 60
publish_cfg['publish_ack_timeout'] = 60
publish_cfg['test_timeout'] = get_timeout(test_case)
unique_topic = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE)
)
@@ -159,9 +169,10 @@ def get_configurations(dut: Dut, test_case: Any) -> Dict[str, Any]:
@contextlib.contextmanager
def connected_and_subscribed(dut: Dut) -> Any:
def connected_and_subscribed(dut: Dut, config: Dict[str, Any]) -> Any:
dut.write('start')
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60)
dut_subscribe_timeout = config.get('dut_subscribe_timeout', 60)
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=dut_subscribe_timeout)
yield
dut.write('stop')
@@ -177,6 +188,7 @@ def get_scenarios() -> List[Dict[str, int]]:
continue
break
if not scenarios: # No message sizes present in the env - set defaults
logging.info('Using predefined cases')
scenarios = [
{'msg_len': 0, 'nr_of_msgs': 5}, # zero-sized messages
{'msg_len': 2, 'nr_of_msgs': 5}, # short messages
@@ -201,13 +213,15 @@ def run_publish_test_case(dut: Dut, config: Any) -> None:
logging.info(
f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},'
f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},'
f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}'
f' msg_size:{config["scenario"]["msg_len"]}, enqueue:{config["enqueue"]}'
)
dut.write(
f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}'
f'publish_setup {config["transport"]} {config["publish_topic"]}'
f' {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}'
)
with MqttPublisher(config) as publisher, connected_and_subscribed(dut):
assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe'
with MqttPublisher(config) as publisher, connected_and_subscribed(dut, config):
py_client_subscribe_timeout = config.get('py_client_subscribe_timeout', config['test_timeout'])
assert publisher.event_client_subscribed.wait(timeout=py_client_subscribe_timeout), 'Runner failed to subscribe'
msgs_published: List[mqtt.MQTTMessageInfo] = []
dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}')
assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (
@@ -222,11 +236,33 @@ def run_publish_test_case(dut: Dut, config: Any) -> None:
msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos'])
if config['qos'] > 0:
msgs_published.append(msg)
logging.info(f'Published: {len(msgs_published)}')
while msgs_published:
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
logging.info(f'Published: {len(msgs_published)} messages from script with QoS > 0 needing ACK.')
logging.info('All messages from runner published')
if msgs_published:
publish_ack_timeout_seconds = config.get('publish_ack_timeout', 60) # Default 60s, make configurable
ack_wait_start_time = time.time()
initial_unacked_count = len(msgs_published)
logging.info(f'Waiting {initial_unacked_count} publish ack with timeout {publish_ack_timeout_seconds}s...')
while msgs_published:
if time.time() - ack_wait_start_time > publish_ack_timeout_seconds:
unacked_mids = [msg.mid for msg in msgs_published if msg.mid is not None and not msg.is_published()]
logging.error(
f'Timeout waiting for publish acknowledgements. '
f'{len(unacked_mids)} of {initial_unacked_count} messages remain unacknowledged. '
f'Unacked MIDs: {unacked_mids}'
)
# This will likely cause the test to fail at a later assertion,
# or you could raise an explicit error here.
# e.g. raise Exception('Timeout waiting for publish acknowledgements')
break
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
if msgs_published: # Avoid busy-looping if list is not empty
time.sleep(0.1) # Brief pause
if not msgs_published:
logging.info('All script-published QoS > 0 messages acknowledged by broker.')
logging.info('All messages from runner published (or timed out waiting for ACK).')
try:
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout'])
@@ -262,6 +298,7 @@ stress_test_cases = make_cases(transport_cases, stress_scenarios)
@pytest.mark.parametrize('test_case', test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
@@ -273,6 +310,7 @@ def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
@pytest.mark.nightly_run
@pytest.mark.parametrize('test_case', stress_test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut, test_case)