diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 1a07cae..d57a5c5 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -111,6 +111,7 @@ static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1 static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); +bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length); uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length); const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length); const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index 15eb93e..8ca68a1 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -29,6 +29,7 @@ * */ #include +#include #include #include "mqtt_msg.h" #include "mqtt_config.h" @@ -172,6 +173,38 @@ uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length) return totlen; } +bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length) +{ + uint16_t i; + uint16_t topiclen; + + for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i) + { + if(i >= buffer_length) + return false; + if ((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + // i is now the length of the fixed header + + if (i + 2 >= buffer_length) + return false; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + i += topiclen; + + if (mqtt_get_qos(buffer) > 0) + { + i += 2; + } + // i is now the length of the fixed + variable header + return buffer_length >= i; +} + const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length) { int i; diff --git a/mqtt_client.c b/mqtt_client.c index 87c9541..270e618 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -67,6 +67,12 @@ typedef enum { MQTT_STATE_WAIT_TIMEOUT, } mqtt_client_state_t; +/* State values for reading MQTT message header */ +typedef enum { + MQTT_HEADER_STATE_INCOMPLETE = -1, + MQTT_HEADER_STATE_COMPLETE = 0, +} mqtt_header_state_t; + struct esp_mqtt_client { esp_transport_list_handle_t transport_list; esp_transport_handle_t transport; @@ -530,9 +536,10 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { + mqtt_header_state_t mqtt_header_state = MQTT_HEADER_STATE_INCOMPLETE; const char *mqtt_topic = NULL, *mqtt_data = NULL; uint32_t mqtt_topic_length, mqtt_data_length; - uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0; + uint32_t mqtt_len = 0, mqtt_offset = 0; int len_read= length; int max_to_read = client->mqtt_state.in_buffer_length; int buffer_offset = 0; @@ -540,12 +547,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i do { - if (total_mqtt_len == 0) { - /* any further reading only the underlying payload */ + if (mqtt_header_state == MQTT_HEADER_STATE_INCOMPLETE) { + // any further reading only the underlying payload transport = esp_transport_get_payload_transport_handle(transport); - mqtt_data_length = mqtt_topic_length = length; - if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) || - NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) { + if (!mqtt_header_complete(message, length)) { // mqtt header is not complete, continue reading memmove(client->mqtt_state.in_buffer, message, length); buffer_offset = length; @@ -553,10 +558,27 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i max_to_read = client->mqtt_state.in_buffer_length - length; mqtt_len = 0; } else { - total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; - mqtt_len = mqtt_data_length; - mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message); - /* read msg id only once */ + // mqtt header was fully read + mqtt_header_state = MQTT_HEADER_STATE_COMPLETE; + mqtt_data_length = mqtt_topic_length = length; + // Read the topic + if(NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length))) { + ESP_LOGE(TAG, "Unable to read topic from header"); + break; + } + // Read the payload + mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); + if(mqtt_data_length > 0 && mqtt_data == NULL) { + ESP_LOGE(TAG, "Unable to read data from message"); + break; + } else if(mqtt_data_length > 0) { + mqtt_len = mqtt_data_length; + mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message); + } else { + mqtt_len = 0; + } + + // read msg id only once client->event.msg_id = mqtt_get_id(message, length); } } else { @@ -566,10 +588,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i mqtt_topic_length = 0; } - if (total_mqtt_len != 0) { + if (mqtt_header_state == MQTT_HEADER_STATE_COMPLETE) { ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); client->event.event_id = MQTT_EVENT_DATA; - client->event.data = (char *)mqtt_data; + client->event.data = mqtt_len > 0 ? (char *)mqtt_data : NULL; client->event.data_len = mqtt_len; client->event.total_data_len = mqtt_data_length; client->event.current_data_offset = mqtt_offset;