forked from espressif/esp-mqtt
mqtt support for sending fragmented messages and full mqtt message length support
This commit is contained in:
@ -72,7 +72,8 @@ typedef struct mqtt_message
|
||||
{
|
||||
uint8_t* data;
|
||||
uint32_t length;
|
||||
|
||||
uint32_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */
|
||||
uint32_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
|
||||
} mqtt_message_t;
|
||||
|
||||
typedef struct mqtt_connection
|
||||
|
@ -15,9 +15,19 @@ struct outbox_item;
|
||||
|
||||
typedef struct outbox_list_t * outbox_handle_t;
|
||||
typedef struct outbox_item * outbox_item_handle_t;
|
||||
typedef struct outbox_message * outbox_message_handle_t;
|
||||
|
||||
typedef struct outbox_message {
|
||||
uint8_t *data;
|
||||
int len;
|
||||
int msg_id;
|
||||
int msg_type;
|
||||
uint8_t *remaining_data;
|
||||
int remaining_len;
|
||||
} outbox_message_t;
|
||||
|
||||
outbox_handle_t outbox_init();
|
||||
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick);
|
||||
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
|
||||
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox);
|
||||
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
|
||||
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
||||
|
@ -34,7 +34,7 @@
|
||||
#include "mqtt_config.h"
|
||||
#include "platform.h"
|
||||
|
||||
#define MQTT_MAX_FIXED_HEADER_SIZE 3
|
||||
#define MQTT_MAX_FIXED_HEADER_SIZE 5
|
||||
|
||||
enum mqtt_connect_flag
|
||||
{
|
||||
@ -105,22 +105,42 @@ static mqtt_message_t* fail_message(mqtt_connection_t* connection)
|
||||
|
||||
static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
|
||||
{
|
||||
int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
|
||||
if (remaining_length > 127)
|
||||
{
|
||||
connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
|
||||
connection->buffer[1] = 0x80 | (remaining_length % 128);
|
||||
connection->buffer[2] = remaining_length / 128;
|
||||
connection->message.length = remaining_length + 3;
|
||||
connection->message.data = connection->buffer;
|
||||
int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
int total_length = message_length;
|
||||
int encoded_length = 0;
|
||||
uint8_t encoded_lens[4] = {0};
|
||||
// Check if we have fragmented message and update total_len
|
||||
if (connection->message.fragmented_msg_total_length) {
|
||||
total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
}
|
||||
else
|
||||
{
|
||||
connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
|
||||
connection->buffer[2] = remaining_length;
|
||||
connection->message.length = remaining_length + 2;
|
||||
connection->message.data = connection->buffer + 1;
|
||||
|
||||
// Encode MQTT message length
|
||||
int len_bytes = 0; // size of encoded message length
|
||||
do {
|
||||
encoded_length = total_length % 128;
|
||||
total_length /= 128;
|
||||
if (total_length > 0) {
|
||||
encoded_length |= 0x80;
|
||||
}
|
||||
encoded_lens[len_bytes] = encoded_length;
|
||||
len_bytes++;
|
||||
} while (total_length > 0);
|
||||
|
||||
// Sanity check for MQTT header
|
||||
if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
// Save the header bytes
|
||||
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
|
||||
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
|
||||
connection->message.data = connection->buffer + offs;
|
||||
connection->message.fragmented_msg_data_offset -= offs;
|
||||
// type byte
|
||||
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
|
||||
// length bytes
|
||||
for (int j = 0; j<len_bytes; j++) {
|
||||
connection->buffer[offs++] = encoded_lens[j];
|
||||
}
|
||||
|
||||
return &connection->message;
|
||||
@ -377,11 +397,17 @@ mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topi
|
||||
else
|
||||
*message_id = 0;
|
||||
|
||||
if (connection->message.length + data_length > connection->buffer_length)
|
||||
return fail_message(connection);
|
||||
memcpy(connection->buffer + connection->message.length, data, data_length);
|
||||
connection->message.length += data_length;
|
||||
|
||||
if (connection->message.length + data_length > connection->buffer_length) {
|
||||
// Not enough size in buffer -> fragment this message
|
||||
connection->message.fragmented_msg_data_offset = connection->message.length;
|
||||
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
|
||||
connection->message.length = connection->buffer_length;
|
||||
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
|
||||
} else {
|
||||
memcpy(connection->buffer + connection->message.length, data, data_length);
|
||||
connection->message.length += data_length;
|
||||
connection->message.fragmented_msg_total_length = 0;
|
||||
}
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
|
||||
}
|
||||
|
||||
|
@ -31,22 +31,25 @@ outbox_handle_t outbox_init()
|
||||
return outbox;
|
||||
}
|
||||
|
||||
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick)
|
||||
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick)
|
||||
{
|
||||
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
|
||||
ESP_MEM_CHECK(TAG, item, return NULL);
|
||||
item->msg_id = msg_id;
|
||||
item->msg_type = msg_type;
|
||||
item->msg_id = message->msg_id;
|
||||
item->msg_type = message->msg_type;
|
||||
item->tick = tick;
|
||||
item->len = len;
|
||||
item->buffer = malloc(len);
|
||||
item->len = message->len;
|
||||
item->buffer = malloc(message->len + message->remaining_len);
|
||||
ESP_MEM_CHECK(TAG, item->buffer, {
|
||||
free(item);
|
||||
return NULL;
|
||||
});
|
||||
memcpy(item->buffer, data, len);
|
||||
memcpy(item->buffer, message->data, message->len);
|
||||
if (message->remaining_data) {
|
||||
memcpy(item->buffer+message->len, message->remaining_data, message->remaining_len);
|
||||
}
|
||||
STAILQ_INSERT_TAIL(outbox, item, next);
|
||||
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", msg_id, msg_type, len, outbox_get_size(outbox));
|
||||
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
|
||||
return item;
|
||||
}
|
||||
|
||||
|
@ -596,19 +596,40 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int
|
||||
return false;
|
||||
}
|
||||
|
||||
static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
|
||||
{
|
||||
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
|
||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
//lock mutex
|
||||
outbox_message_t msg = { 0 };
|
||||
if (client->mqtt_state.pending_msg_count > 0) {
|
||||
client->mqtt_state.pending_msg_count --;
|
||||
}
|
||||
msg.data = client->mqtt_state.outbound_message->data;
|
||||
msg.len = client->mqtt_state.outbound_message->length;
|
||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||
msg.remaining_data = remaining_data;
|
||||
msg.remaining_len = remaining_len;
|
||||
//Copy to queue buffer
|
||||
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||
|
||||
//unlock
|
||||
}
|
||||
|
||||
static void mqtt_enqueue(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
//lock mutex
|
||||
if (client->mqtt_state.pending_msg_count > 0) {
|
||||
outbox_message_t msg = { 0 };
|
||||
msg.data = client->mqtt_state.outbound_message->data;
|
||||
msg.len = client->mqtt_state.outbound_message->length;
|
||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||
//Copy to queue buffer
|
||||
outbox_enqueue(client->outbox,
|
||||
client->mqtt_state.outbound_message->data,
|
||||
client->mqtt_state.outbound_message->length,
|
||||
client->mqtt_state.pending_msg_id,
|
||||
client->mqtt_state.pending_msg_type,
|
||||
platform_tick_get_ms());
|
||||
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||
}
|
||||
//unlock
|
||||
}
|
||||
@ -951,9 +972,51 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
client->mqtt_state.outbound_message = publish_msg;
|
||||
}
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos);
|
||||
return -1;
|
||||
/* Provide support for sending fragmented message if it doesn't fit buffer */
|
||||
int remaining_len = len;
|
||||
const char *current_data = data;
|
||||
bool sending = true;
|
||||
|
||||
while (sending) {
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||
remaining_len -= data_sent;
|
||||
current_data += data_sent;
|
||||
|
||||
if (remaining_len > 0) {
|
||||
mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection;
|
||||
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
|
||||
if (connection->message.fragmented_msg_data_offset) {
|
||||
// asked to enqueue oversized message (first time only)
|
||||
connection->message.fragmented_msg_data_offset = 0;
|
||||
connection->message.fragmented_msg_total_length = 0;
|
||||
if (qos > 0) {
|
||||
// internally enqueue all big messages, as they dont fit 'pending msg' structure
|
||||
mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len);
|
||||
}
|
||||
}
|
||||
|
||||
if (remaining_len > connection->buffer_length) {
|
||||
// Continue with sending
|
||||
memcpy(connection->buffer, current_data, connection->buffer_length);
|
||||
connection->message.length = connection->buffer_length;
|
||||
sending = true;
|
||||
} else {
|
||||
memcpy(connection->buffer, current_data, remaining_len);
|
||||
connection->message.length = remaining_len;
|
||||
sending = true;
|
||||
}
|
||||
connection->message.data = connection->buffer;
|
||||
client->mqtt_state.outbound_message = &connection->message;
|
||||
} else {
|
||||
// Message was sent correctly
|
||||
sending = false;
|
||||
}
|
||||
}
|
||||
return pending_msg_id;
|
||||
}
|
||||
|
Reference in New Issue
Block a user