MQTT Client: Added support for receiving empty payload

This commit is contained in:
Gregory Eslinger
2019-04-01 21:22:21 +02:00
parent 55e72e4ded
commit 0450bd0093
3 changed files with 68 additions and 12 deletions

View File

@ -111,6 +111,7 @@ static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length);
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);

View File

@ -29,6 +29,7 @@
*
*/
#include <stdint.h>
#include <stdbool.h>
#include <string.h>
#include "mqtt_msg.h"
#include "mqtt_config.h"
@ -172,6 +173,38 @@ uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
return totlen;
}
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length)
{
uint16_t i;
uint16_t topiclen;
for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i)
{
if(i >= buffer_length)
return false;
if ((buffer[i] & 0x80) == 0)
{
++i;
break;
}
}
// i is now the length of the fixed header
if (i + 2 >= buffer_length)
return false;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
i += topiclen;
if (mqtt_get_qos(buffer) > 0)
{
i += 2;
}
// i is now the length of the fixed + variable header
return buffer_length >= i;
}
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length)
{
int i;

View File

@ -67,6 +67,12 @@ typedef enum {
MQTT_STATE_WAIT_TIMEOUT,
} mqtt_client_state_t;
/* State values for reading MQTT message header */
typedef enum {
MQTT_HEADER_STATE_INCOMPLETE = -1,
MQTT_HEADER_STATE_COMPLETE = 0,
} mqtt_header_state_t;
struct esp_mqtt_client {
esp_transport_list_handle_t transport_list;
esp_transport_handle_t transport;
@ -530,9 +536,10 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
{
mqtt_header_state_t mqtt_header_state = MQTT_HEADER_STATE_INCOMPLETE;
const char *mqtt_topic = NULL, *mqtt_data = NULL;
uint32_t mqtt_topic_length, mqtt_data_length;
uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0;
uint32_t mqtt_len = 0, mqtt_offset = 0;
int len_read= length;
int max_to_read = client->mqtt_state.in_buffer_length;
int buffer_offset = 0;
@ -540,12 +547,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
do
{
if (total_mqtt_len == 0) {
/* any further reading only the underlying payload */
if (mqtt_header_state == MQTT_HEADER_STATE_INCOMPLETE) {
// any further reading only the underlying payload
transport = esp_transport_get_payload_transport_handle(transport);
mqtt_data_length = mqtt_topic_length = length;
if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) ||
NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) {
if (!mqtt_header_complete(message, length)) {
// mqtt header is not complete, continue reading
memmove(client->mqtt_state.in_buffer, message, length);
buffer_offset = length;
@ -553,10 +558,27 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
max_to_read = client->mqtt_state.in_buffer_length - length;
mqtt_len = 0;
} else {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
/* read msg id only once */
// mqtt header was fully read
mqtt_header_state = MQTT_HEADER_STATE_COMPLETE;
mqtt_data_length = mqtt_topic_length = length;
// Read the topic
if(NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length))) {
ESP_LOGE(TAG, "Unable to read topic from header");
break;
}
// Read the payload
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
if(mqtt_data_length > 0 && mqtt_data == NULL) {
ESP_LOGE(TAG, "Unable to read data from message");
break;
} else if(mqtt_data_length > 0) {
mqtt_len = mqtt_data_length;
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
} else {
mqtt_len = 0;
}
// read msg id only once
client->event.msg_id = mqtt_get_id(message, length);
}
} else {
@ -566,10 +588,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
mqtt_topic_length = 0;
}
if (total_mqtt_len != 0) {
if (mqtt_header_state == MQTT_HEADER_STATE_COMPLETE) {
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data;
client->event.data = mqtt_len > 0 ? (char *)mqtt_data : NULL;
client->event.data_len = mqtt_len;
client->event.total_data_len = mqtt_data_length;
client->event.current_data_offset = mqtt_offset;