diff --git a/Kconfig b/Kconfig index 9eb6c0b..d438858 100644 --- a/Kconfig +++ b/Kconfig @@ -177,4 +177,14 @@ menu "ESP-MQTT Configurations" help Messages which stays in the outbox longer than this value before being published will be discarded. + config MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS + bool "Enable publish topic in all data events" + default n + depends on MQTT_USE_CUSTOM_CONFIG + help + Set to true to have publish topic in all data events. This changes the behaviour + when the message is bigger than the receive buffer size. The first event of the sequence + always have the topic. + Note: This will allocate memory to store the topic only in case of messge bigger than the buffer size. + endmenu diff --git a/mqtt_client.c b/mqtt_client.c index d062e7a..b70f518 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1,5 +1,6 @@ #include #include +#include #include "esp_err.h" #include "esp_log.h" #include "esp_heap_caps.h" @@ -1071,9 +1072,13 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) uint8_t *msg_buf = client->mqtt_state.in_buffer; size_t msg_read_len = client->mqtt_state.in_buffer_read_len; size_t msg_total_len = client->mqtt_state.message_length; - size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len; + size_t msg_topic_len = msg_read_len; + size_t msg_data_len = msg_read_len; size_t msg_data_offset = 0; - char *msg_topic = NULL, *msg_data = NULL; + size_t saved_msg_topic_len = 0; + char *saved_msg_topic = NULL; + char *msg_topic = NULL; + char *msg_data = NULL; if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 @@ -1083,7 +1088,6 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) } #endif } else { - // get and save topic msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len); if (msg_topic == NULL) { ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__); @@ -1098,11 +1102,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) return ESP_FAIL; } } - char *saved_msg_topic = strndup(msg_topic, msg_topic_len); - ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM); - size_t saved_msg_topic_len = msg_topic_len; - // post data event client->event.retain = mqtt_get_retain(msg_buf); if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 @@ -1115,48 +1115,45 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) client->event.dup = mqtt_get_dup(msg_buf); client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len; - ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT, - NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len), - client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset)); - client->event.event_id = MQTT_EVENT_DATA; - client->event.data = msg_data_len > 0 ? msg_data : NULL; - client->event.data_len = msg_data_len; - client->event.current_data_offset = msg_data_offset; - client->event.topic = msg_topic; - client->event.topic_len = msg_topic_len; - esp_mqtt_dispatch_event(client); - - client->event.topic = saved_msg_topic; - client->event.topic_len = saved_msg_topic_len; - while(msg_read_len < msg_total_len) { - size_t buf_len = client->mqtt_state.in_buffer_length; - msg_data = (char *)client->mqtt_state.in_buffer; - msg_data_offset += msg_data_len; - - size_t read_len; - if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) { - read_len = buf_len - saved_msg_topic_len; - } else { - read_len = msg_total_len - msg_read_len; - } - - int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms); - if (ret <= 0) { - return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL; - } - - msg_data_len = ret; - msg_read_len += msg_data_len; - + bool send_event = true; + while (send_event) { ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT, NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len), client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset)); - + client->event.event_id = MQTT_EVENT_DATA; + client->event.data = msg_data_len > 0 ? msg_data : NULL; client->event.data_len = msg_data_len; client->event.current_data_offset = msg_data_offset; - + client->event.topic = msg_topic; + client->event.topic_len = msg_topic_len; esp_mqtt_dispatch_event(client); + send_event = false; + if (msg_read_len < msg_total_len) { + send_event = true; + #ifdef CONFIG_MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS + if (!saved_msg_topic) { + saved_msg_topic = strndup(msg_topic, msg_topic_len); + ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM); + saved_msg_topic_len = msg_topic_len; + } + #endif + size_t buf_len = client->mqtt_state.in_buffer_length; + + msg_data = (char *)client->mqtt_state.in_buffer; + msg_topic = saved_msg_topic; + msg_topic_len = saved_msg_topic_len; + msg_data_offset += msg_data_len; + int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, + msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, + client->config->network_timeout_ms); + if (ret <= 0) { + return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL; + } + + msg_data_len = ret; + msg_read_len += msg_data_len; + } } free(saved_msg_topic); return ESP_OK;