From 6a0d1e7bff1e3085669d41ae3412c649db20108e Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 20 Dec 2018 21:46:55 +0100 Subject: [PATCH] support for qos1 and qos2 message retrasmission on reconnect --- lib/include/mqtt_msg.h | 1 + lib/include/mqtt_outbox.h | 12 +++++-- lib/mqtt_outbox.c | 30 ++++++++++++---- mqtt_client.c | 74 ++++++++++++++++++++++++++++++++------- 4 files changed, 96 insertions(+), 21 deletions(-) diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 931cbf9..1a07cae 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -106,6 +106,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> static inline int mqtt_get_connect_session_present(uint8_t* buffer) { return buffer[2] & 0x01; } static inline int mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; } static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } +static inline void mqtt_set_dup(uint8_t* buffer) { buffer[0] |= 0x08; } static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 31dd5b9..b4f1e4f 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -21,21 +21,29 @@ typedef struct outbox_message { uint8_t *data; int len; int msg_id; + int msg_qos; int msg_type; uint8_t *remaining_data; int remaining_len; } outbox_message_t; +typedef enum pending_state { + QUEUED, + TRANSMITTED, + CONFIRMED +} pending_state_t; + outbox_handle_t outbox_init(); outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox); +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); +uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); esp_err_t outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout); -esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id); +esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); int outbox_get_size(outbox_handle_t outbox); esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size); void outbox_destroy(outbox_handle_t outbox); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 6d6efc5..403fd9c 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -14,9 +14,10 @@ typedef struct outbox_item { int len; int msg_id; int msg_type; + int msg_qos; int tick; int retry_count; - bool pending; + pending_state_t pending; STAILQ_ENTRY(outbox_item) next; } outbox_item_t; @@ -37,8 +38,10 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl ESP_MEM_CHECK(TAG, item, return NULL); item->msg_id = message->msg_id; item->msg_type = message->msg_type; + item->msg_qos = message->msg_qos; item->tick = tick; item->len = message->len; + item->pending = QUEUED; item->buffer = malloc(message->len + message->remaining_len); ESP_MEM_CHECK(TAG, item->buffer, { free(item); @@ -64,21 +67,34 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) return NULL; } -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox) +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending) { outbox_item_handle_t item; STAILQ_FOREACH(item, outbox, next) { - if (!item->pending) { + if (item->pending == pending) { return item; } } return NULL; } + +uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) +{ + if (item) { + *len = item->len; + *msg_id = item->msg_id; + *msg_type = item->msg_type; + *qos = item->msg_qos; + return (uint8_t*)item->buffer; + } + return NULL; +} + esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) { outbox_item_handle_t item, tmp; STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - if (item->msg_id == msg_id && item->msg_type == msg_type) { + if (item->msg_id == msg_id && (0xFF&(item->msg_type)) == msg_type) { STAILQ_REMOVE(outbox, item, outbox_item, next); free(item->buffer); free(item); @@ -102,11 +118,11 @@ esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id) } return ESP_OK; } -esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id) +esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending) { outbox_item_handle_t item = outbox_get(outbox, msg_id); if (item) { - item->pending = true; + item->pending = pending; return ESP_OK; } return ESP_FAIL; @@ -153,7 +169,7 @@ int outbox_get_size(outbox_handle_t outbox) esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size) { while(outbox_get_size(outbox) > max_size) { - outbox_item_handle_t item = outbox_dequeue(outbox); + outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED); if (item == NULL) { return ESP_FAIL; } diff --git a/mqtt_client.c b/mqtt_client.c index 93b2219..73d3624 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -200,7 +200,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl if (config->disable_auto_reconnect == cfg->auto_reconnect) { cfg->auto_reconnect = !config->disable_auto_reconnect; } - return ESP_OK; _mqtt_set_config_failed: esp_mqtt_destroy_config(client); @@ -305,7 +304,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co ESP_MEM_CHECK(TAG, client, return NULL); esp_mqtt_set_config(client, config); - client->transport_list = esp_transport_list_init(); ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed); @@ -605,6 +603,7 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; msg.remaining_data = remaining_data; msg.remaining_len = remaining_len; //Copy to queue buffer @@ -624,6 +623,7 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client) msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; //Copy to queue buffer outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } @@ -700,21 +700,21 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) case MQTT_MSG_TYPE_PUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); + outbox_set_pending(client->outbox, msg_id, CONFIRMED); client->event.event_id = MQTT_EVENT_PUBLISHED; esp_mqtt_dispatch_event_with_msgid(client); } - break; case MQTT_MSG_TYPE_PUBREC: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + outbox_set_pending(client->outbox, msg_id, CONFIRMED); mqtt_write_data(client); break; case MQTT_MSG_TYPE_PUBREL: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); mqtt_write_data(client); - break; case MQTT_MSG_TYPE_PUBCOMP: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); @@ -736,9 +736,29 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_OK; } +static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item) +{ + // decode queued data + client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id, + &client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos); + // set duplicate flag for QoS-2 message + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) { + mqtt_set_dup(client->mqtt_state.outbound_message->data); + } + + // try to resend the data + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error to public data "); + esp_mqtt_abort_connection(client); + return ESP_FAIL; + } + return ESP_OK; +} + static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; + uint32_t last_retransmit = 0; client->run = true; //get transport by scheme @@ -796,6 +816,21 @@ static void esp_mqtt_task(void *pv) break; } + // resend all non-transmitted messages first + outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED); + if (item) { + if (mqtt_resend_queued(client, item) == ESP_OK) { + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); + } + // resend other "transmitted" messages after 1s + } else if (platform_tick_get_ms() - last_retransmit > 1000) { + last_retransmit = platform_tick_get_ms(); + item = outbox_dequeue(client->outbox, TRANSMITTED); + if (item) { + mqtt_resend_queued(client, item); + } + } + if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) { //No ping resp from last ping => Disconnected if(client->wait_for_ping_resp){ @@ -914,13 +949,13 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic ESP_LOGE(TAG, "Client has not connected"); return -1; } - mqtt_enqueue(client); //move pending msg to outbox (if have) client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, topic, qos, &client->mqtt_state.pending_msg_id); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; + mqtt_enqueue(client); //move pending msg to outbox (if have) if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); @@ -958,10 +993,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) { uint16_t pending_msg_id = 0; - if (client->state != MQTT_STATE_CONNECTED) { - ESP_LOGE(TAG, "Client has not connected"); - return -1; - } + if (len <= 0) { len = strlen(data); } @@ -973,15 +1005,23 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, /* We have to set as pending all the qos>0 messages) */ if (qos > 0) { - mqtt_enqueue(client); client->mqtt_state.outbound_message = publish_msg; client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; + client->mqtt_state.pending_publish_qos = qos; client->mqtt_state.pending_msg_count ++; + // by default store as QUEUED (not transmitted yet) + mqtt_enqueue(client); } else { client->mqtt_state.outbound_message = publish_msg; } + /* Skip sending if not connected (rely on resending) */ + if (client->state != MQTT_STATE_CONNECTED) { + ESP_LOGD(TAG, "Publish: client is not connected"); + goto cannot_publish; + } + /* Provide support for sending fragmented message if it doesn't fit buffer */ int remaining_len = len; const char *current_data = data; @@ -990,8 +1030,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, while (sending) { if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos); - return -1; + esp_mqtt_abort_connection(client); + goto cannot_publish; } int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; @@ -1028,7 +1068,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, sending = false; } } + + if (qos > 0) { + outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED); + } return pending_msg_id; + +cannot_publish: + if (qos == 0) { + ESP_LOGW(TAG, "Publishing qos0 data while client not connecting"); + } + return 0; }