MQTT Client: Added support for receiving empty payload

This commit is contained in:
Gregory Eslinger
2019-04-01 21:22:21 +02:00
parent 55e72e4ded
commit 0450bd0093
3 changed files with 68 additions and 12 deletions

View File

@ -111,6 +111,7 @@ static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length);
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length); uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length); const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length); const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);

View File

@ -29,6 +29,7 @@
* *
*/ */
#include <stdint.h> #include <stdint.h>
#include <stdbool.h>
#include <string.h> #include <string.h>
#include "mqtt_msg.h" #include "mqtt_msg.h"
#include "mqtt_config.h" #include "mqtt_config.h"
@ -172,6 +173,38 @@ uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
return totlen; return totlen;
} }
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length)
{
uint16_t i;
uint16_t topiclen;
for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i)
{
if(i >= buffer_length)
return false;
if ((buffer[i] & 0x80) == 0)
{
++i;
break;
}
}
// i is now the length of the fixed header
if (i + 2 >= buffer_length)
return false;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
i += topiclen;
if (mqtt_get_qos(buffer) > 0)
{
i += 2;
}
// i is now the length of the fixed + variable header
return buffer_length >= i;
}
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length) const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length)
{ {
int i; int i;

View File

@ -67,6 +67,12 @@ typedef enum {
MQTT_STATE_WAIT_TIMEOUT, MQTT_STATE_WAIT_TIMEOUT,
} mqtt_client_state_t; } mqtt_client_state_t;
/* State values for reading MQTT message header */
typedef enum {
MQTT_HEADER_STATE_INCOMPLETE = -1,
MQTT_HEADER_STATE_COMPLETE = 0,
} mqtt_header_state_t;
struct esp_mqtt_client { struct esp_mqtt_client {
esp_transport_list_handle_t transport_list; esp_transport_list_handle_t transport_list;
esp_transport_handle_t transport; esp_transport_handle_t transport;
@ -530,9 +536,10 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
{ {
mqtt_header_state_t mqtt_header_state = MQTT_HEADER_STATE_INCOMPLETE;
const char *mqtt_topic = NULL, *mqtt_data = NULL; const char *mqtt_topic = NULL, *mqtt_data = NULL;
uint32_t mqtt_topic_length, mqtt_data_length; uint32_t mqtt_topic_length, mqtt_data_length;
uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0; uint32_t mqtt_len = 0, mqtt_offset = 0;
int len_read= length; int len_read= length;
int max_to_read = client->mqtt_state.in_buffer_length; int max_to_read = client->mqtt_state.in_buffer_length;
int buffer_offset = 0; int buffer_offset = 0;
@ -540,12 +547,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
do do
{ {
if (total_mqtt_len == 0) { if (mqtt_header_state == MQTT_HEADER_STATE_INCOMPLETE) {
/* any further reading only the underlying payload */ // any further reading only the underlying payload
transport = esp_transport_get_payload_transport_handle(transport); transport = esp_transport_get_payload_transport_handle(transport);
mqtt_data_length = mqtt_topic_length = length; if (!mqtt_header_complete(message, length)) {
if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) ||
NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) {
// mqtt header is not complete, continue reading // mqtt header is not complete, continue reading
memmove(client->mqtt_state.in_buffer, message, length); memmove(client->mqtt_state.in_buffer, message, length);
buffer_offset = length; buffer_offset = length;
@ -553,10 +558,27 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
max_to_read = client->mqtt_state.in_buffer_length - length; max_to_read = client->mqtt_state.in_buffer_length - length;
mqtt_len = 0; mqtt_len = 0;
} else { } else {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; // mqtt header was fully read
mqtt_len = mqtt_data_length; mqtt_header_state = MQTT_HEADER_STATE_COMPLETE;
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message); mqtt_data_length = mqtt_topic_length = length;
/* read msg id only once */ // Read the topic
if(NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length))) {
ESP_LOGE(TAG, "Unable to read topic from header");
break;
}
// Read the payload
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
if(mqtt_data_length > 0 && mqtt_data == NULL) {
ESP_LOGE(TAG, "Unable to read data from message");
break;
} else if(mqtt_data_length > 0) {
mqtt_len = mqtt_data_length;
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
} else {
mqtt_len = 0;
}
// read msg id only once
client->event.msg_id = mqtt_get_id(message, length); client->event.msg_id = mqtt_get_id(message, length);
} }
} else { } else {
@ -566,10 +588,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
mqtt_topic_length = 0; mqtt_topic_length = 0;
} }
if (total_mqtt_len != 0) { if (mqtt_header_state == MQTT_HEADER_STATE_COMPLETE) {
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA; client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data; client->event.data = mqtt_len > 0 ? (char *)mqtt_data : NULL;
client->event.data_len = mqtt_len; client->event.data_len = mqtt_len;
client->event.total_data_len = mqtt_data_length; client->event.total_data_len = mqtt_data_length;
client->event.current_data_offset = mqtt_offset; client->event.current_data_offset = mqtt_offset;