feat: Include message topic in all data events for big messages.

When message is larger than the buffer, and must produce several events include topic where it came from in each of those events
This commit is contained in:
Bogdan Kolendovskyy
2024-08-05 11:48:41 +02:00
parent b5b80336b7
commit 82017e9bcc

View File

@ -521,7 +521,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
} else {
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
}
client->config->transport = config->network.transport;
if (config->network.if_name) {
@ -1062,7 +1062,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
}
#endif
} else {
// get topic
// get and save topic
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
@ -1077,6 +1077,10 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
}
char *saved_msg_topic = strndup(msg_topic, msg_topic_len);
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
size_t saved_msg_topic_len = msg_topic_len;
// post data event
client->event.retain = mqtt_get_retain(msg_buf);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
@ -1089,7 +1093,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
@ -1101,24 +1105,39 @@ post_data_event:
client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client);
if (msg_read_len < msg_total_len) {
client->event.topic = saved_msg_topic;
client->event.topic_len = saved_msg_topic_len;
while(msg_read_len < msg_total_len) {
size_t buf_len = client->mqtt_state.in_buffer_length;
msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = NULL;
msg_topic_len = 0;
msg_data_offset += msg_data_len;
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
size_t read_len;
if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) {
read_len = buf_len - saved_msg_topic_len;
} else {
read_len = msg_total_len - msg_read_len;
}
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms);
if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
}
msg_data_len = ret;
msg_read_len += msg_data_len;
goto post_data_event;
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
esp_mqtt_dispatch_event(client);
}
free(saved_msg_topic);
return ESP_OK;
}