forked from espressif/esp-mqtt
MQTT Client: Added support for receiving empty payload
This commit is contained in:
@ -111,6 +111,7 @@ static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1
|
|||||||
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
|
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
|
||||||
|
|
||||||
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
|
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
|
||||||
|
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length);
|
||||||
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length);
|
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length);
|
||||||
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
|
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
|
||||||
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);
|
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
#include <stdbool.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include "mqtt_msg.h"
|
#include "mqtt_msg.h"
|
||||||
#include "mqtt_config.h"
|
#include "mqtt_config.h"
|
||||||
@ -172,6 +173,38 @@ uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
|
|||||||
return totlen;
|
return totlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool mqtt_header_complete(uint8_t* buffer, uint16_t buffer_length)
|
||||||
|
{
|
||||||
|
uint16_t i;
|
||||||
|
uint16_t topiclen;
|
||||||
|
|
||||||
|
for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i)
|
||||||
|
{
|
||||||
|
if(i >= buffer_length)
|
||||||
|
return false;
|
||||||
|
if ((buffer[i] & 0x80) == 0)
|
||||||
|
{
|
||||||
|
++i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// i is now the length of the fixed header
|
||||||
|
|
||||||
|
if (i + 2 >= buffer_length)
|
||||||
|
return false;
|
||||||
|
topiclen = buffer[i++] << 8;
|
||||||
|
topiclen |= buffer[i++];
|
||||||
|
|
||||||
|
i += topiclen;
|
||||||
|
|
||||||
|
if (mqtt_get_qos(buffer) > 0)
|
||||||
|
{
|
||||||
|
i += 2;
|
||||||
|
}
|
||||||
|
// i is now the length of the fixed + variable header
|
||||||
|
return buffer_length >= i;
|
||||||
|
}
|
||||||
|
|
||||||
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length)
|
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
@ -67,6 +67,12 @@ typedef enum {
|
|||||||
MQTT_STATE_WAIT_TIMEOUT,
|
MQTT_STATE_WAIT_TIMEOUT,
|
||||||
} mqtt_client_state_t;
|
} mqtt_client_state_t;
|
||||||
|
|
||||||
|
/* State values for reading MQTT message header */
|
||||||
|
typedef enum {
|
||||||
|
MQTT_HEADER_STATE_INCOMPLETE = -1,
|
||||||
|
MQTT_HEADER_STATE_COMPLETE = 0,
|
||||||
|
} mqtt_header_state_t;
|
||||||
|
|
||||||
struct esp_mqtt_client {
|
struct esp_mqtt_client {
|
||||||
esp_transport_list_handle_t transport_list;
|
esp_transport_list_handle_t transport_list;
|
||||||
esp_transport_handle_t transport;
|
esp_transport_handle_t transport;
|
||||||
@ -530,9 +536,10 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
|
|||||||
|
|
||||||
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
|
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
|
||||||
{
|
{
|
||||||
|
mqtt_header_state_t mqtt_header_state = MQTT_HEADER_STATE_INCOMPLETE;
|
||||||
const char *mqtt_topic = NULL, *mqtt_data = NULL;
|
const char *mqtt_topic = NULL, *mqtt_data = NULL;
|
||||||
uint32_t mqtt_topic_length, mqtt_data_length;
|
uint32_t mqtt_topic_length, mqtt_data_length;
|
||||||
uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0;
|
uint32_t mqtt_len = 0, mqtt_offset = 0;
|
||||||
int len_read= length;
|
int len_read= length;
|
||||||
int max_to_read = client->mqtt_state.in_buffer_length;
|
int max_to_read = client->mqtt_state.in_buffer_length;
|
||||||
int buffer_offset = 0;
|
int buffer_offset = 0;
|
||||||
@ -540,12 +547,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
|||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
if (total_mqtt_len == 0) {
|
if (mqtt_header_state == MQTT_HEADER_STATE_INCOMPLETE) {
|
||||||
/* any further reading only the underlying payload */
|
// any further reading only the underlying payload
|
||||||
transport = esp_transport_get_payload_transport_handle(transport);
|
transport = esp_transport_get_payload_transport_handle(transport);
|
||||||
mqtt_data_length = mqtt_topic_length = length;
|
if (!mqtt_header_complete(message, length)) {
|
||||||
if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) ||
|
|
||||||
NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) {
|
|
||||||
// mqtt header is not complete, continue reading
|
// mqtt header is not complete, continue reading
|
||||||
memmove(client->mqtt_state.in_buffer, message, length);
|
memmove(client->mqtt_state.in_buffer, message, length);
|
||||||
buffer_offset = length;
|
buffer_offset = length;
|
||||||
@ -553,10 +558,27 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
|||||||
max_to_read = client->mqtt_state.in_buffer_length - length;
|
max_to_read = client->mqtt_state.in_buffer_length - length;
|
||||||
mqtt_len = 0;
|
mqtt_len = 0;
|
||||||
} else {
|
} else {
|
||||||
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
|
// mqtt header was fully read
|
||||||
mqtt_len = mqtt_data_length;
|
mqtt_header_state = MQTT_HEADER_STATE_COMPLETE;
|
||||||
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
|
mqtt_data_length = mqtt_topic_length = length;
|
||||||
/* read msg id only once */
|
// Read the topic
|
||||||
|
if(NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length))) {
|
||||||
|
ESP_LOGE(TAG, "Unable to read topic from header");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Read the payload
|
||||||
|
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
|
||||||
|
if(mqtt_data_length > 0 && mqtt_data == NULL) {
|
||||||
|
ESP_LOGE(TAG, "Unable to read data from message");
|
||||||
|
break;
|
||||||
|
} else if(mqtt_data_length > 0) {
|
||||||
|
mqtt_len = mqtt_data_length;
|
||||||
|
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
|
||||||
|
} else {
|
||||||
|
mqtt_len = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// read msg id only once
|
||||||
client->event.msg_id = mqtt_get_id(message, length);
|
client->event.msg_id = mqtt_get_id(message, length);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -566,10 +588,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
|||||||
mqtt_topic_length = 0;
|
mqtt_topic_length = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (total_mqtt_len != 0) {
|
if (mqtt_header_state == MQTT_HEADER_STATE_COMPLETE) {
|
||||||
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
|
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
|
||||||
client->event.event_id = MQTT_EVENT_DATA;
|
client->event.event_id = MQTT_EVENT_DATA;
|
||||||
client->event.data = (char *)mqtt_data;
|
client->event.data = mqtt_len > 0 ? (char *)mqtt_data : NULL;
|
||||||
client->event.data_len = mqtt_len;
|
client->event.data_len = mqtt_len;
|
||||||
client->event.total_data_len = mqtt_data_length;
|
client->event.total_data_len = mqtt_data_length;
|
||||||
client->event.current_data_offset = mqtt_offset;
|
client->event.current_data_offset = mqtt_offset;
|
||||||
|
Reference in New Issue
Block a user