feat: Include message topic in all data events for big messages.

When message is larger than the buffer, and must produce several events include topic where it came from in each of those events
This commit is contained in:
Bogdan Kolendovskyy
2024-08-05 11:48:41 +02:00
parent b5b80336b7
commit 82017e9bcc

View File

@ -1062,7 +1062,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
}
#endif
} else {
// get topic
// get and save topic
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
@ -1077,6 +1077,10 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
}
char *saved_msg_topic = strndup(msg_topic, msg_topic_len);
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
size_t saved_msg_topic_len = msg_topic_len;
// post data event
client->event.retain = mqtt_get_retain(msg_buf);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
@ -1089,7 +1093,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
@ -1101,24 +1105,39 @@ post_data_event:
client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client);
if (msg_read_len < msg_total_len) {
client->event.topic = saved_msg_topic;
client->event.topic_len = saved_msg_topic_len;
while(msg_read_len < msg_total_len) {
size_t buf_len = client->mqtt_state.in_buffer_length;
msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = NULL;
msg_topic_len = 0;
msg_data_offset += msg_data_len;
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
size_t read_len;
if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) {
read_len = buf_len - saved_msg_topic_len;
} else {
read_len = msg_total_len - msg_read_len;
}
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms);
if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
}
msg_data_len = ret;
msg_read_len += msg_data_len;
goto post_data_event;
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
esp_mqtt_dispatch_event(client);
}
free(saved_msg_topic);
return ESP_OK;
}