fix: Multiple event data

Fix a regression introduced when adding the presence of topic in all
event data.
This commit is contained in:
Euripedes Rocha
2025-05-19 12:52:53 +02:00
parent 2d2060d041
commit 9c76b7054a
2 changed files with 49 additions and 42 deletions

10
Kconfig
View File

@ -177,4 +177,14 @@ menu "ESP-MQTT Configurations"
help
Messages which stays in the outbox longer than this value before being published will be discarded.
config MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS
bool "Enable publish topic in all data events"
default n
depends on MQTT_USE_CUSTOM_CONFIG
help
Set to true to have publish topic in all data events. This changes the behaviour
when the message is bigger than the receive buffer size. The first event of the sequence
always have the topic.
Note: This will allocate memory to store the topic only in case of messge bigger than the buffer size.
endmenu

View File

@ -1,5 +1,6 @@
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include "esp_err.h"
#include "esp_log.h"
#include "esp_heap_caps.h"
@ -1071,9 +1072,13 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
size_t msg_total_len = client->mqtt_state.message_length;
size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len;
size_t msg_topic_len = msg_read_len;
size_t msg_data_len = msg_read_len;
size_t msg_data_offset = 0;
char *msg_topic = NULL, *msg_data = NULL;
size_t saved_msg_topic_len = 0;
char *saved_msg_topic = NULL;
char *msg_topic = NULL;
char *msg_data = NULL;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
@ -1083,7 +1088,6 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
}
#endif
} else {
// get and save topic
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
@ -1098,11 +1102,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
}
char *saved_msg_topic = strndup(msg_topic, msg_topic_len);
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
size_t saved_msg_topic_len = msg_topic_len;
// post data event
client->event.retain = mqtt_get_retain(msg_buf);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
@ -1115,6 +1115,8 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
bool send_event = true;
while (send_event) {
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
@ -1125,38 +1127,33 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.topic = msg_topic;
client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client);
send_event = false;
client->event.topic = saved_msg_topic;
client->event.topic_len = saved_msg_topic_len;
while(msg_read_len < msg_total_len) {
size_t buf_len = client->mqtt_state.in_buffer_length;
msg_data = (char *)client->mqtt_state.in_buffer;
msg_data_offset += msg_data_len;
size_t read_len;
if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) {
read_len = buf_len - saved_msg_topic_len;
} else {
read_len = msg_total_len - msg_read_len;
if (msg_read_len < msg_total_len) {
send_event = true;
#ifdef CONFIG_MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS
if (!saved_msg_topic) {
saved_msg_topic = strndup(msg_topic, msg_topic_len);
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
saved_msg_topic_len = msg_topic_len;
}
#endif
size_t buf_len = client->mqtt_state.in_buffer_length;
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms);
msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = saved_msg_topic;
msg_topic_len = saved_msg_topic_len;
msg_data_offset += msg_data_len;
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
}
msg_data_len = ret;
msg_read_len += msg_data_len;
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
esp_mqtt_dispatch_event(client);
}
}
free(saved_msg_topic);
return ESP_OK;