Merge branch 'feature/mqtt_support_arbitrary_datalen' into 'idf'

MQTT rework for packed and fragmented messages -- available for testing

See merge request idf/esp-mqtt!12
This commit is contained in:
David Čermák
2019-01-03 18:57:30 +08:00
5 changed files with 214 additions and 84 deletions

View File

@ -72,7 +72,8 @@ typedef struct mqtt_message
{
uint8_t* data;
uint32_t length;
uint32_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */
uint32_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
} mqtt_message_t;
typedef struct mqtt_connection

View File

@ -15,9 +15,19 @@ struct outbox_item;
typedef struct outbox_list_t * outbox_handle_t;
typedef struct outbox_item * outbox_item_handle_t;
typedef struct outbox_message * outbox_message_handle_t;
typedef struct outbox_message {
uint8_t *data;
int len;
int msg_id;
int msg_type;
uint8_t *remaining_data;
int remaining_len;
} outbox_message_t;
outbox_handle_t outbox_init();
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick);
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox);
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);

View File

@ -34,7 +34,7 @@
#include "mqtt_config.h"
#include "platform.h"
#define MQTT_MAX_FIXED_HEADER_SIZE 3
#define MQTT_MAX_FIXED_HEADER_SIZE 5
enum mqtt_connect_flag
{
@ -105,22 +105,42 @@ static mqtt_message_t* fail_message(mqtt_connection_t* connection)
static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
{
int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
if (remaining_length > 127)
{
connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[1] = 0x80 | (remaining_length % 128);
connection->buffer[2] = remaining_length / 128;
connection->message.length = remaining_length + 3;
connection->message.data = connection->buffer;
int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
int encoded_length = 0;
uint8_t encoded_lens[4] = {0};
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
}
else
{
connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[2] = remaining_length;
connection->message.length = remaining_length + 2;
connection->message.data = connection->buffer + 1;
// Encode MQTT message length
int len_bytes = 0; // size of encoded message length
do {
encoded_length = total_length % 128;
total_length /= 128;
if (total_length > 0) {
encoded_length |= 0x80;
}
encoded_lens[len_bytes] = encoded_length;
len_bytes++;
} while (total_length > 0);
// Sanity check for MQTT header
if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) {
return fail_message(connection);
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
for (int j = 0; j<len_bytes; j++) {
connection->buffer[offs++] = encoded_lens[j];
}
return &connection->message;
@ -377,11 +397,17 @@ mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topi
else
*message_id = 0;
if (connection->message.length + data_length > connection->buffer_length)
return fail_message(connection);
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
if (connection->message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
} else {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
connection->message.fragmented_msg_total_length = 0;
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}

View File

@ -31,22 +31,25 @@ outbox_handle_t outbox_init()
return outbox;
}
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick)
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick)
{
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = msg_id;
item->msg_type = msg_type;
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;
item->tick = tick;
item->len = len;
item->buffer = malloc(len);
item->len = message->len;
item->buffer = malloc(message->len + message->remaining_len);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
return NULL;
});
memcpy(item->buffer, data, len);
memcpy(item->buffer, message->data, message->len);
if (message->remaining_data) {
memcpy(item->buffer+message->len, message->remaining_data, message->remaining_len);
}
STAILQ_INSERT_TAIL(outbox, item, next);
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", msg_id, msg_type, len, outbox_get_size(outbox));
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
return item;
}

View File

@ -77,6 +77,7 @@ struct esp_mqtt_client {
const static int STOPPED_BIT = BIT0;
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms);
static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
@ -293,7 +294,7 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
client->event.event_id = MQTT_EVENT_DISCONNECTED;
client->wait_for_ping_resp = false;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_OK;
}
@ -490,9 +491,14 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
return ESP_OK;
}
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
return esp_mqtt_dispatch_event(client);
}
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
client->event.user_context = client->config->user_context;
client->event.client = client;
@ -502,32 +508,40 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
typedef struct {
char *path;
char *buffer;
esp_transport_handle_t parent;
} transport_ws_t;
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
{
const char *mqtt_topic, *mqtt_data;
const char *mqtt_topic = NULL, *mqtt_data = NULL;
uint32_t mqtt_topic_length, mqtt_data_length;
uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0;
int len_read;
uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0;
int len_read= length;
int max_to_read = client->mqtt_state.in_buffer_length;
int buffer_offset = 0;
esp_transport_handle_t transport = client->transport;
do
{
if (total_mqtt_len == 0) {
mqtt_topic_length = length;
mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length);
mqtt_data_length = length;
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
/* any further reading only the underlying payload */
transport = esp_transport_get_payload_transport_handle(transport);
mqtt_data_length = mqtt_topic_length = length;
if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) ||
NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) {
// mqtt header is not complete, continue reading
memmove(client->mqtt_state.in_buffer, message, length);
buffer_offset = length;
message = client->mqtt_state.in_buffer;
max_to_read = client->mqtt_state.in_buffer_length - length;
mqtt_len = 0;
} else {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
if (client->mqtt_state.message_length_read < client->mqtt_state.message_length) {
/* if message is framented -> correct the size for the first DATA event */
mqtt_data_length = client->mqtt_state.message_length_read - ((uint8_t*)mqtt_data- message);
}
/* read msg id only once */
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
}
} else {
mqtt_len = len_read;
mqtt_data = (const char*)client->mqtt_state.in_buffer;
@ -535,15 +549,17 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
mqtt_topic_length = 0;
}
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data;
client->event.data_len = mqtt_len;
client->event.total_data_len = total_mqtt_len;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
if (total_mqtt_len != 0) {
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data;
client->event.data_len = mqtt_len;
client->event.total_data_len = total_mqtt_len;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
}
mqtt_offset += mqtt_len;
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
@ -551,18 +567,20 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
}
len_read = esp_transport_read(transport,
(char *)client->mqtt_state.in_buffer,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ?
client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
(char *)client->mqtt_state.in_buffer + buffer_offset,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ?
max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
client->config->network_timeout_ms);
length = len_read + buffer_offset;
buffer_offset = 0;
max_to_read = client->mqtt_state.in_buffer_length;
if (len_read <= 0) {
ESP_LOGE(TAG, "Read error or timeout: %d", errno);
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", len_read, errno);
break;
}
client->mqtt_state.message_length_read += len_read;
} while (1);
}
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
@ -583,19 +601,40 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int
return false;
}
static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
outbox_message_t msg = { 0 };
if (client->mqtt_state.pending_msg_count > 0) {
client->mqtt_state.pending_msg_count --;
}
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
//unlock
}
static void mqtt_enqueue(esp_mqtt_client_handle_t client)
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
if (client->mqtt_state.pending_msg_count > 0) {
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
//Copy to queue buffer
outbox_enqueue(client->outbox,
client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length,
client->mqtt_state.pending_msg_id,
client->mqtt_state.pending_msg_type,
platform_tick_get_ms());
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
//unlock
}
@ -606,6 +645,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
uint32_t transport_message_offset = 0 ;
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 1000);
@ -618,9 +658,13 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
return ESP_OK;
}
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
// In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here
while ( transport_message_offset < read_len ){
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
switch (msg_type)
@ -629,14 +673,14 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
@ -656,17 +700,18 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
// return ESP_FAIL;
}
}
client->mqtt_state.message_length_read = read_len;
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
// Deliver the publish message
client->mqtt_state.message_length_read = read_len - transport_message_offset;
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
@ -686,7 +731,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PINGRESP:
@ -695,6 +740,9 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
break;
}
transport_message_offset += mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset) ;
}
return ESP_OK;
}
@ -722,7 +770,7 @@ static void esp_mqtt_task(void *pv)
switch ((int)client->state) {
case MQTT_STATE_INIT:
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
if (client->transport == NULL) {
ESP_LOGE(TAG, "There are no transport");
@ -746,7 +794,7 @@ static void esp_mqtt_task(void *pv)
client->event.event_id = MQTT_EVENT_CONNECTED;
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
client->state = MQTT_STATE_CONNECTED;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
client->refresh_connection_tick = platform_tick_get_ms();
break;
@ -929,9 +977,51 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
client->mqtt_state.outbound_message = publish_msg;
}
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos);
return -1;
/* Provide support for sending fragmented message if it doesn't fit buffer */
int remaining_len = len;
const char *current_data = data;
bool sending = true;
while (sending) {
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos);
return -1;
}
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
remaining_len -= data_sent;
current_data += data_sent;
if (remaining_len > 0) {
mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection;
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
if (connection->message.fragmented_msg_data_offset) {
// asked to enqueue oversized message (first time only)
connection->message.fragmented_msg_data_offset = 0;
connection->message.fragmented_msg_total_length = 0;
if (qos > 0) {
// internally enqueue all big messages, as they dont fit 'pending msg' structure
mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len);
}
}
if (remaining_len > connection->buffer_length) {
// Continue with sending
memcpy(connection->buffer, current_data, connection->buffer_length);
connection->message.length = connection->buffer_length;
sending = true;
} else {
memcpy(connection->buffer, current_data, remaining_len);
connection->message.length = remaining_len;
sending = true;
}
connection->message.data = connection->buffer;
client->mqtt_state.outbound_message = &connection->message;
} else {
// Message was sent correctly
sending = false;
}
}
return pending_msg_id;
}