From db64b791205c8dbcd0f3826a9544f0aa0301baaf Mon Sep 17 00:00:00 2001 From: Laurent Louf Date: Tue, 2 Oct 2018 17:23:14 +0200 Subject: [PATCH 1/5] Fix for case where multiple MQTT messages are fitted into a single TCP packet : iterate over the buffer received. Merges https://github.com/espressif/esp-mqtt/pull/82 Closes #64 Closes #80 --- mqtt_client.c | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 0f8656e..9f27538 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -606,6 +606,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) uint8_t msg_type; uint8_t msg_qos; uint16_t msg_id; + uint32_t transport_message_offset = 0 ; read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 1000); @@ -618,9 +619,13 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_OK; } - msg_type = mqtt_get_type(client->mqtt_state.in_buffer); - msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); - msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + // In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here + while ( transport_message_offset < read_len ){ + // If the message was valid, get the type, quality of service and id of the message + msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]); + msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]); + msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset); + ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); switch (msg_type) @@ -656,9 +661,12 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) // return ESP_FAIL; } } - client->mqtt_state.message_length_read = read_len; - client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + + // Deliver the publish message + client->mqtt_state.message_length_read = read_len - transport_message_offset; + client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); + deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); break; @@ -695,6 +703,9 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; } + transport_message_offset += mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset) ; + } + return ESP_OK; } From cf5b8eda89057f5eb49654d2eaf4d1a209af2d11 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Tue, 4 Dec 2018 15:29:14 +0100 Subject: [PATCH 2/5] added a fix for incomplete header message received (topic or data could not be resolved from msg) --- mqtt_client.c | 59 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 9f27538..d50e3c0 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -511,23 +511,32 @@ typedef struct { static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { - const char *mqtt_topic, *mqtt_data; + const char *mqtt_topic = NULL, *mqtt_data = NULL; uint32_t mqtt_topic_length, mqtt_data_length; - uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0; - int len_read; + uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0; + int len_read= length; + int max_to_read = client->mqtt_state.in_buffer_length; + int buffer_offset = 0; esp_transport_handle_t transport = client->transport; do { if (total_mqtt_len == 0) { - mqtt_topic_length = length; - mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length); - mqtt_data_length = length; - mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); - total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; - mqtt_len = mqtt_data_length; /* any further reading only the underlying payload */ transport = esp_transport_get_payload_transport_handle(transport); + mqtt_data_length = mqtt_topic_length = length; + if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) || + NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) { + // mqtt header is not complete, continue reading + memmove(client->mqtt_state.in_buffer, message, length); + buffer_offset = length; + message = client->mqtt_state.in_buffer; + max_to_read = client->mqtt_state.in_buffer_length - length; + mqtt_len = 0; + } else { + total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; + mqtt_len = mqtt_data_length; + } } else { mqtt_len = len_read; mqtt_data = (const char*)client->mqtt_state.in_buffer; @@ -535,15 +544,17 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i mqtt_topic_length = 0; } - ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); - client->event.event_id = MQTT_EVENT_DATA; - client->event.data = (char *)mqtt_data; - client->event.data_len = mqtt_len; - client->event.total_data_len = total_mqtt_len; - client->event.current_data_offset = mqtt_offset; - client->event.topic = (char *)mqtt_topic; - client->event.topic_len = mqtt_topic_length; - esp_mqtt_dispatch_event(client); + if (total_mqtt_len != 0) { + ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); + client->event.event_id = MQTT_EVENT_DATA; + client->event.data = (char *)mqtt_data; + client->event.data_len = mqtt_len; + client->event.total_data_len = total_mqtt_len; + client->event.current_data_offset = mqtt_offset; + client->event.topic = (char *)mqtt_topic; + client->event.topic_len = mqtt_topic_length; + esp_mqtt_dispatch_event(client); + } mqtt_offset += mqtt_len; if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) { @@ -551,10 +562,13 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i } len_read = esp_transport_read(transport, - (char *)client->mqtt_state.in_buffer, - client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ? - client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read, + (char *)client->mqtt_state.in_buffer + buffer_offset, + client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ? + max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read, client->config->network_timeout_ms); + length = len_read + buffer_offset; + buffer_offset = 0; + max_to_read = client->mqtt_state.in_buffer_length; if (len_read <= 0) { ESP_LOGE(TAG, "Read error or timeout: %d", errno); break; @@ -562,7 +576,6 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i client->mqtt_state.message_length_read += len_read; } while (1); - } static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) @@ -667,8 +680,6 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); - - deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); break; case MQTT_MSG_TYPE_PUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { From e0bbbebc08884118931f7adaedb0b8cba06db00c Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 25 Oct 2018 16:38:25 +0200 Subject: [PATCH 3/5] mqtt support for sending fragmented messages and full mqtt message length support --- lib/include/mqtt_msg.h | 3 +- lib/include/mqtt_outbox.h | 12 +++++- lib/mqtt_msg.c | 68 ++++++++++++++++++++++---------- lib/mqtt_outbox.c | 17 ++++---- mqtt_client.c | 81 ++++++++++++++++++++++++++++++++++----- 5 files changed, 142 insertions(+), 39 deletions(-) diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 049c738..931cbf9 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -72,7 +72,8 @@ typedef struct mqtt_message { uint8_t* data; uint32_t length; - + uint32_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */ + uint32_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */ } mqtt_message_t; typedef struct mqtt_connection diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index a87b466..31dd5b9 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -15,9 +15,19 @@ struct outbox_item; typedef struct outbox_list_t * outbox_handle_t; typedef struct outbox_item * outbox_item_handle_t; +typedef struct outbox_message * outbox_message_handle_t; + +typedef struct outbox_message { + uint8_t *data; + int len; + int msg_id; + int msg_type; + uint8_t *remaining_data; + int remaining_len; +} outbox_message_t; outbox_handle_t outbox_init(); -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick); +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index eb97847..15eb93e 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -34,7 +34,7 @@ #include "mqtt_config.h" #include "platform.h" -#define MQTT_MAX_FIXED_HEADER_SIZE 3 +#define MQTT_MAX_FIXED_HEADER_SIZE 5 enum mqtt_connect_flag { @@ -105,22 +105,42 @@ static mqtt_message_t* fail_message(mqtt_connection_t* connection) static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) { - int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; - - if (remaining_length > 127) - { - connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[1] = 0x80 | (remaining_length % 128); - connection->buffer[2] = remaining_length / 128; - connection->message.length = remaining_length + 3; - connection->message.data = connection->buffer; + int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + int total_length = message_length; + int encoded_length = 0; + uint8_t encoded_lens[4] = {0}; + // Check if we have fragmented message and update total_len + if (connection->message.fragmented_msg_total_length) { + total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE; } - else - { - connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[2] = remaining_length; - connection->message.length = remaining_length + 2; - connection->message.data = connection->buffer + 1; + + // Encode MQTT message length + int len_bytes = 0; // size of encoded message length + do { + encoded_length = total_length % 128; + total_length /= 128; + if (total_length > 0) { + encoded_length |= 0x80; + } + encoded_lens[len_bytes] = encoded_length; + len_bytes++; + } while (total_length > 0); + + // Sanity check for MQTT header + if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) { + return fail_message(connection); + } + + // Save the header bytes + connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) + int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes; + connection->message.data = connection->buffer + offs; + connection->message.fragmented_msg_data_offset -= offs; + // type byte + connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + // length bytes + for (int j = 0; jbuffer[offs++] = encoded_lens[j]; } return &connection->message; @@ -377,11 +397,17 @@ mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topi else *message_id = 0; - if (connection->message.length + data_length > connection->buffer_length) - return fail_message(connection); - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; - + if (connection->message.length + data_length > connection->buffer_length) { + // Not enough size in buffer -> fragment this message + connection->message.fragmented_msg_data_offset = connection->message.length; + memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length); + connection->message.length = connection->buffer_length; + connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset; + } else { + memcpy(connection->buffer + connection->message.length, data, data_length); + connection->message.length += data_length; + connection->message.fragmented_msg_total_length = 0; + } return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 300e698..6d6efc5 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -31,22 +31,25 @@ outbox_handle_t outbox_init() return outbox; } -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick) +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick) { outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t)); ESP_MEM_CHECK(TAG, item, return NULL); - item->msg_id = msg_id; - item->msg_type = msg_type; + item->msg_id = message->msg_id; + item->msg_type = message->msg_type; item->tick = tick; - item->len = len; - item->buffer = malloc(len); + item->len = message->len; + item->buffer = malloc(message->len + message->remaining_len); ESP_MEM_CHECK(TAG, item->buffer, { free(item); return NULL; }); - memcpy(item->buffer, data, len); + memcpy(item->buffer, message->data, message->len); + if (message->remaining_data) { + memcpy(item->buffer+message->len, message->remaining_data, message->remaining_len); + } STAILQ_INSERT_TAIL(outbox, item, next); - ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", msg_id, msg_type, len, outbox_get_size(outbox)); + ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox)); return item; } diff --git a/mqtt_client.c b/mqtt_client.c index d50e3c0..6c5a795 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -596,19 +596,40 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int return false; } +static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) +{ + ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful", + client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); + //lock mutex + outbox_message_t msg = { 0 }; + if (client->mqtt_state.pending_msg_count > 0) { + client->mqtt_state.pending_msg_count --; + } + msg.data = client->mqtt_state.outbound_message->data; + msg.len = client->mqtt_state.outbound_message->length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; + msg.remaining_data = remaining_data; + msg.remaining_len = remaining_len; + //Copy to queue buffer + outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); + + //unlock +} + static void mqtt_enqueue(esp_mqtt_client_handle_t client) { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); //lock mutex if (client->mqtt_state.pending_msg_count > 0) { + outbox_message_t msg = { 0 }; + msg.data = client->mqtt_state.outbound_message->data; + msg.len = client->mqtt_state.outbound_message->length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; //Copy to queue buffer - outbox_enqueue(client->outbox, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->mqtt_state.pending_msg_id, - client->mqtt_state.pending_msg_type, - platform_tick_get_ms()); + outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } //unlock } @@ -951,9 +972,51 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, client->mqtt_state.outbound_message = publish_msg; } - if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos); - return -1; + /* Provide support for sending fragmented message if it doesn't fit buffer */ + int remaining_len = len; + const char *current_data = data; + bool sending = true; + + while (sending) { + + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos); + return -1; + } + + int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; + remaining_len -= data_sent; + current_data += data_sent; + + if (remaining_len > 0) { + mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection; + ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len); + if (connection->message.fragmented_msg_data_offset) { + // asked to enqueue oversized message (first time only) + connection->message.fragmented_msg_data_offset = 0; + connection->message.fragmented_msg_total_length = 0; + if (qos > 0) { + // internally enqueue all big messages, as they dont fit 'pending msg' structure + mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len); + } + } + + if (remaining_len > connection->buffer_length) { + // Continue with sending + memcpy(connection->buffer, current_data, connection->buffer_length); + connection->message.length = connection->buffer_length; + sending = true; + } else { + memcpy(connection->buffer, current_data, remaining_len); + connection->message.length = remaining_len; + sending = true; + } + connection->message.data = connection->buffer; + client->mqtt_state.outbound_message = &connection->message; + } else { + // Message was sent correctly + sending = false; + } } return pending_msg_id; } From 06fe5cca8ed78dc5922cff5c406214b244b35a26 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Fri, 2 Nov 2018 08:37:05 +0100 Subject: [PATCH 4/5] mqtt data events fixed to keep consistent msg_id when fragmented message received closes #70 --- mqtt_client.c | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 6c5a795..627222c 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -77,6 +77,7 @@ struct esp_mqtt_client { const static int STOPPED_BIT = BIT0; static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client); +static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms); static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client); @@ -293,7 +294,7 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client) ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms); client->event.event_id = MQTT_EVENT_DISCONNECTED; client->wait_for_ping_resp = false; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); return ESP_OK; } @@ -490,9 +491,14 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) return ESP_OK; } -static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) +static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) { client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + return esp_mqtt_dispatch_event(client); +} + +static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) +{ client->event.user_context = client->config->user_context; client->event.client = client; @@ -536,6 +542,12 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i } else { total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; mqtt_len = mqtt_data_length; + if (client->mqtt_state.message_length_read < client->mqtt_state.message_length) { + /* if message is framented -> correct the size for the first DATA event */ + mqtt_data_length = client->mqtt_state.message_length_read - ((uint8_t*)mqtt_data- message); + } + /* read msg id only once */ + client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); } } else { mqtt_len = len_read; @@ -668,14 +680,14 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { ESP_LOGD(TAG, "Subscribe successful"); client->event.event_id = MQTT_EVENT_SUBSCRIBED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; case MQTT_MSG_TYPE_UNSUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { ESP_LOGD(TAG, "UnSubscribe successful"); client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; case MQTT_MSG_TYPE_PUBLISH: @@ -706,7 +718,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; @@ -726,7 +738,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); } break; case MQTT_MSG_TYPE_PINGRESP: @@ -765,7 +777,7 @@ static void esp_mqtt_task(void *pv) switch ((int)client->state) { case MQTT_STATE_INIT: client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); if (client->transport == NULL) { ESP_LOGE(TAG, "There are no transport"); @@ -789,7 +801,7 @@ static void esp_mqtt_task(void *pv) client->event.event_id = MQTT_EVENT_CONNECTED; client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); client->state = MQTT_STATE_CONNECTED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); client->refresh_connection_tick = platform_tick_get_ms(); break; From f08f3b678717865234637164a29ed3a63e756ca7 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 3 Jan 2019 07:56:22 +0100 Subject: [PATCH 5/5] update log levels, logging complete error code, removed unnecessary struct --- mqtt_client.c | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 627222c..bf695f8 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -508,13 +508,6 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) return ESP_FAIL; } - -typedef struct { - char *path; - char *buffer; - esp_transport_handle_t parent; -} transport_ws_t; - static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { const char *mqtt_topic = NULL, *mqtt_data = NULL; @@ -582,7 +575,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i buffer_offset = 0; max_to_read = client->mqtt_state.in_buffer_length; if (len_read <= 0) { - ESP_LOGE(TAG, "Read error or timeout: %d", errno); + ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", len_read, errno); break; } client->mqtt_state.message_length_read += len_read; @@ -711,7 +704,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) // Deliver the publish message client->mqtt_state.message_length_read = read_len - transport_message_offset; client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); - ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); + ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); break; case MQTT_MSG_TYPE_PUBACK: