From 4bc37bb8db859626288a8afdca301fab683bb81f Mon Sep 17 00:00:00 2001 From: Christian Gawron Date: Thu, 3 May 2018 15:46:06 +0200 Subject: [PATCH] fix handling of large messages (#40) --- lib/include/mqtt_msg.h | 8 ++++---- lib/mqtt_msg.c | 12 ++++++------ mqtt_client.c | 22 +++++++++------------- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 4f789ba..fd04cd0 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -71,7 +71,7 @@ enum mqtt_connect_return_code typedef struct mqtt_message { uint8_t* data; - uint16_t length; + uint32_t length; } mqtt_message_t; @@ -108,9 +108,9 @@ static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1 static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); -int mqtt_get_total_length(uint8_t* buffer, uint16_t length); -const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length); -const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length); +uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length); +const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length); +const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length); uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length); mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index 589cb46..eb97847 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -133,10 +133,10 @@ void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buff connection->buffer_length = buffer_length; } -int mqtt_get_total_length(uint8_t* buffer, uint16_t length) +uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length) { int i; - int totlen = 0; + uint32_t totlen = 0; for (i = 1; i < length; ++i) { @@ -148,11 +148,11 @@ int mqtt_get_total_length(uint8_t* buffer, uint16_t length) } } totlen += i; - + return totlen; } -const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) +const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length) { int i; int totlen = 0; @@ -179,9 +179,9 @@ const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) *length = topiclen; return (const char*)(buffer + i); -} +} -const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) +const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length) { int i; int totlen = 0; diff --git a/mqtt_client.c b/mqtt_client.c index 0053207..f95217b 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -22,8 +22,8 @@ typedef struct mqtt_state uint8_t *out_buffer; int in_buffer_length; int out_buffer_length; - uint16_t message_length; - uint16_t message_length_read; + uint32_t message_length; + uint32_t message_length_read; mqtt_message_t *outbound_message; mqtt_connection_t mqtt_connection; uint16_t pending_msg_id; @@ -442,25 +442,20 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) return client->config->event_handle(&client->event); } return ESP_FAIL; -} +} static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { const char *mqtt_topic, *mqtt_data; - uint16_t mqtt_topic_length, mqtt_data_length, total_mqtt_len = 0; - uint16_t mqtt_len, mqtt_offset = 0; + uint32_t mqtt_topic_length, mqtt_data_length; + uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0; int len_read; do { - mqtt_topic_length = length; - mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length); - mqtt_data_length = length; - mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); - - if(total_mqtt_len == 0){ + if (total_mqtt_len == 0){ mqtt_topic_length = length; mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length); mqtt_data_length = length; @@ -472,7 +467,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i mqtt_data = (const char*)client->mqtt_state.in_buffer; } - ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_data_length, mqtt_topic_length); + ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); client->event.event_id = MQTT_EVENT_DATA; client->event.data = (char *)mqtt_data; client->event.data_len = mqtt_len; @@ -488,7 +483,8 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i len_read = transport_read(client->transport, (char *)client->mqtt_state.in_buffer, - client->mqtt_state.in_buffer_length, + client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ? + client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read, client->config->network_timeout_ms); if (len_read <= 0) { ESP_LOGE(TAG, "Read error or timeout: %d", errno);