support for qos1 and qos2 message retrasmission on reconnect

This commit is contained in:
David Cermak
2018-12-20 21:46:55 +01:00
parent 815623dfe5
commit 6a0d1e7bff
4 changed files with 96 additions and 21 deletions

View File

@ -106,6 +106,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >>
static inline int mqtt_get_connect_session_present(uint8_t* buffer) { return buffer[2] & 0x01; }
static inline int mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; }
static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline void mqtt_set_dup(uint8_t* buffer) { buffer[0] |= 0x08; }
static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }

View File

@ -21,21 +21,29 @@ typedef struct outbox_message {
uint8_t *data;
int len;
int msg_id;
int msg_qos;
int msg_type;
uint8_t *remaining_data;
int remaining_len;
} outbox_message_t;
typedef enum pending_state {
QUEUED,
TRANSMITTED,
CONFIRMED
} pending_state_t;
outbox_handle_t outbox_init();
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending);
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
esp_err_t outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
int outbox_get_size(outbox_handle_t outbox);
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size);
void outbox_destroy(outbox_handle_t outbox);

View File

@ -14,9 +14,10 @@ typedef struct outbox_item {
int len;
int msg_id;
int msg_type;
int msg_qos;
int tick;
int retry_count;
bool pending;
pending_state_t pending;
STAILQ_ENTRY(outbox_item) next;
} outbox_item_t;
@ -37,8 +38,10 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;
item->msg_qos = message->msg_qos;
item->tick = tick;
item->len = message->len;
item->pending = QUEUED;
item->buffer = malloc(message->len + message->remaining_len);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
@ -64,21 +67,34 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
return NULL;
}
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox)
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
if (!item->pending) {
if (item->pending == pending) {
return item;
}
}
return NULL;
}
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
{
if (item) {
*len = item->len;
*msg_id = item->msg_id;
*msg_type = item->msg_type;
*qos = item->msg_qos;
return (uint8_t*)item->buffer;
}
return NULL;
}
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_id == msg_id && item->msg_type == msg_type) {
if (item->msg_id == msg_id && (0xFF&(item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
@ -102,11 +118,11 @@ esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
}
return ESP_OK;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id)
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
item->pending = true;
item->pending = pending;
return ESP_OK;
}
return ESP_FAIL;
@ -153,7 +169,7 @@ int outbox_get_size(outbox_handle_t outbox)
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size)
{
while(outbox_get_size(outbox) > max_size) {
outbox_item_handle_t item = outbox_dequeue(outbox);
outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED);
if (item == NULL) {
return ESP_FAIL;
}

View File

@ -200,7 +200,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
if (config->disable_auto_reconnect == cfg->auto_reconnect) {
cfg->auto_reconnect = !config->disable_auto_reconnect;
}
return ESP_OK;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
@ -305,7 +304,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
ESP_MEM_CHECK(TAG, client, return NULL);
esp_mqtt_set_config(client, config);
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
@ -605,6 +603,7 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
@ -624,6 +623,7 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client)
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
//Copy to queue buffer
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
@ -700,21 +700,21 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
@ -736,9 +736,29 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
return ESP_OK;
}
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
// decode queued data
client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
// set duplicate flag for QoS-2 message
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) {
mqtt_set_dup(client->mqtt_state.outbound_message->data);
}
// try to resend the data
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
return ESP_OK;
}
static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
uint32_t last_retransmit = 0;
client->run = true;
//get transport by scheme
@ -796,6 +816,21 @@ static void esp_mqtt_task(void *pv)
break;
}
// resend all non-transmitted messages first
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED);
if (item) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
}
// resend other "transmitted" messages after 1s
} else if (platform_tick_get_ms() - last_retransmit > 1000) {
last_retransmit = platform_tick_get_ms();
item = outbox_dequeue(client->outbox, TRANSMITTED);
if (item) {
mqtt_resend_queued(client, item);
}
}
if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) {
//No ping resp from last ping => Disconnected
if(client->wait_for_ping_resp){
@ -914,13 +949,13 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
mqtt_enqueue(client); //move pending msg to outbox (if have)
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
&client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
mqtt_enqueue(client); //move pending msg to outbox (if have)
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
@ -958,10 +993,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
{
uint16_t pending_msg_id = 0;
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
if (len <= 0) {
len = strlen(data);
}
@ -973,15 +1005,23 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
/* We have to set as pending all the qos>0 messages) */
if (qos > 0) {
mqtt_enqueue(client);
client->mqtt_state.outbound_message = publish_msg;
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
client->mqtt_state.pending_msg_count ++;
// by default store as QUEUED (not transmitted yet)
mqtt_enqueue(client);
} else {
client->mqtt_state.outbound_message = publish_msg;
}
/* Skip sending if not connected (rely on resending) */
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGD(TAG, "Publish: client is not connected");
goto cannot_publish;
}
/* Provide support for sending fragmented message if it doesn't fit buffer */
int remaining_len = len;
const char *current_data = data;
@ -990,8 +1030,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
while (sending) {
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos);
return -1;
esp_mqtt_abort_connection(client);
goto cannot_publish;
}
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
@ -1028,7 +1068,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
sending = false;
}
}
if (qos > 0) {
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
}
return pending_msg_id;
cannot_publish:
if (qos == 0) {
ESP_LOGW(TAG, "Publishing qos0 data while client not connecting");
}
return 0;
}