From e0bbbebc08884118931f7adaedb0b8cba06db00c Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 25 Oct 2018 16:38:25 +0200 Subject: [PATCH] mqtt support for sending fragmented messages and full mqtt message length support --- lib/include/mqtt_msg.h | 3 +- lib/include/mqtt_outbox.h | 12 +++++- lib/mqtt_msg.c | 68 ++++++++++++++++++++++---------- lib/mqtt_outbox.c | 17 ++++---- mqtt_client.c | 81 ++++++++++++++++++++++++++++++++++----- 5 files changed, 142 insertions(+), 39 deletions(-) diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 049c738..931cbf9 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -72,7 +72,8 @@ typedef struct mqtt_message { uint8_t* data; uint32_t length; - + uint32_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */ + uint32_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */ } mqtt_message_t; typedef struct mqtt_connection diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index a87b466..31dd5b9 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -15,9 +15,19 @@ struct outbox_item; typedef struct outbox_list_t * outbox_handle_t; typedef struct outbox_item * outbox_item_handle_t; +typedef struct outbox_message * outbox_message_handle_t; + +typedef struct outbox_message { + uint8_t *data; + int len; + int msg_id; + int msg_type; + uint8_t *remaining_data; + int remaining_len; +} outbox_message_t; outbox_handle_t outbox_init(); -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick); +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index eb97847..15eb93e 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -34,7 +34,7 @@ #include "mqtt_config.h" #include "platform.h" -#define MQTT_MAX_FIXED_HEADER_SIZE 3 +#define MQTT_MAX_FIXED_HEADER_SIZE 5 enum mqtt_connect_flag { @@ -105,22 +105,42 @@ static mqtt_message_t* fail_message(mqtt_connection_t* connection) static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) { - int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; - - if (remaining_length > 127) - { - connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[1] = 0x80 | (remaining_length % 128); - connection->buffer[2] = remaining_length / 128; - connection->message.length = remaining_length + 3; - connection->message.data = connection->buffer; + int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + int total_length = message_length; + int encoded_length = 0; + uint8_t encoded_lens[4] = {0}; + // Check if we have fragmented message and update total_len + if (connection->message.fragmented_msg_total_length) { + total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE; } - else - { - connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[2] = remaining_length; - connection->message.length = remaining_length + 2; - connection->message.data = connection->buffer + 1; + + // Encode MQTT message length + int len_bytes = 0; // size of encoded message length + do { + encoded_length = total_length % 128; + total_length /= 128; + if (total_length > 0) { + encoded_length |= 0x80; + } + encoded_lens[len_bytes] = encoded_length; + len_bytes++; + } while (total_length > 0); + + // Sanity check for MQTT header + if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) { + return fail_message(connection); + } + + // Save the header bytes + connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) + int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes; + connection->message.data = connection->buffer + offs; + connection->message.fragmented_msg_data_offset -= offs; + // type byte + connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + // length bytes + for (int j = 0; jbuffer[offs++] = encoded_lens[j]; } return &connection->message; @@ -377,11 +397,17 @@ mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topi else *message_id = 0; - if (connection->message.length + data_length > connection->buffer_length) - return fail_message(connection); - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; - + if (connection->message.length + data_length > connection->buffer_length) { + // Not enough size in buffer -> fragment this message + connection->message.fragmented_msg_data_offset = connection->message.length; + memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length); + connection->message.length = connection->buffer_length; + connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset; + } else { + memcpy(connection->buffer + connection->message.length, data, data_length); + connection->message.length += data_length; + connection->message.fragmented_msg_total_length = 0; + } return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 300e698..6d6efc5 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -31,22 +31,25 @@ outbox_handle_t outbox_init() return outbox; } -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick) +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick) { outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t)); ESP_MEM_CHECK(TAG, item, return NULL); - item->msg_id = msg_id; - item->msg_type = msg_type; + item->msg_id = message->msg_id; + item->msg_type = message->msg_type; item->tick = tick; - item->len = len; - item->buffer = malloc(len); + item->len = message->len; + item->buffer = malloc(message->len + message->remaining_len); ESP_MEM_CHECK(TAG, item->buffer, { free(item); return NULL; }); - memcpy(item->buffer, data, len); + memcpy(item->buffer, message->data, message->len); + if (message->remaining_data) { + memcpy(item->buffer+message->len, message->remaining_data, message->remaining_len); + } STAILQ_INSERT_TAIL(outbox, item, next); - ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", msg_id, msg_type, len, outbox_get_size(outbox)); + ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox)); return item; } diff --git a/mqtt_client.c b/mqtt_client.c index d50e3c0..6c5a795 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -596,19 +596,40 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int return false; } +static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) +{ + ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful", + client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); + //lock mutex + outbox_message_t msg = { 0 }; + if (client->mqtt_state.pending_msg_count > 0) { + client->mqtt_state.pending_msg_count --; + } + msg.data = client->mqtt_state.outbound_message->data; + msg.len = client->mqtt_state.outbound_message->length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; + msg.remaining_data = remaining_data; + msg.remaining_len = remaining_len; + //Copy to queue buffer + outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); + + //unlock +} + static void mqtt_enqueue(esp_mqtt_client_handle_t client) { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); //lock mutex if (client->mqtt_state.pending_msg_count > 0) { + outbox_message_t msg = { 0 }; + msg.data = client->mqtt_state.outbound_message->data; + msg.len = client->mqtt_state.outbound_message->length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; //Copy to queue buffer - outbox_enqueue(client->outbox, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->mqtt_state.pending_msg_id, - client->mqtt_state.pending_msg_type, - platform_tick_get_ms()); + outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } //unlock } @@ -951,9 +972,51 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, client->mqtt_state.outbound_message = publish_msg; } - if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos); - return -1; + /* Provide support for sending fragmented message if it doesn't fit buffer */ + int remaining_len = len; + const char *current_data = data; + bool sending = true; + + while (sending) { + + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos); + return -1; + } + + int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; + remaining_len -= data_sent; + current_data += data_sent; + + if (remaining_len > 0) { + mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection; + ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len); + if (connection->message.fragmented_msg_data_offset) { + // asked to enqueue oversized message (first time only) + connection->message.fragmented_msg_data_offset = 0; + connection->message.fragmented_msg_total_length = 0; + if (qos > 0) { + // internally enqueue all big messages, as they dont fit 'pending msg' structure + mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len); + } + } + + if (remaining_len > connection->buffer_length) { + // Continue with sending + memcpy(connection->buffer, current_data, connection->buffer_length); + connection->message.length = connection->buffer_length; + sending = true; + } else { + memcpy(connection->buffer, current_data, remaining_len); + connection->message.length = remaining_len; + sending = true; + } + connection->message.data = connection->buffer; + client->mqtt_state.outbound_message = &connection->message; + } else { + // Message was sent correctly + sending = false; + } } return pending_msg_id; }