diff --git a/include/mqtt_client.h b/include/mqtt_client.h index f6f4fe0..308cce2 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -57,6 +57,13 @@ typedef enum { and current data offset updating. */ MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */ + MQTT_EVENT_DELETED, /*!< Notification on delete of one message from the internal outbox, + if the message couldn't have been sent and acknowledged before expiring + defined in OUTBOX_EXPIRED_TIMEOUT_MS. + (events are not posted upon deletion of successfully acknowledged messages) + - This event id is posted only if MQTT_REPORT_DELETED_MESSAGES==1 + - Additional context: msg_id (id of the deleted message). + */ } esp_mqtt_event_id_t; /** diff --git a/lib/include/mqtt_config.h b/lib/include/mqtt_config.h index da2a93a..ca96d0c 100644 --- a/lib/include/mqtt_config.h +++ b/lib/include/mqtt_config.h @@ -19,6 +19,8 @@ #define MQTT_SKIP_PUBLISH_IF_DISCONNECTED CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED +#define MQTT_REPORT_DELETED_MESSAGES CONFIG_MQTT_REPORT_DELETED_MESSAGES + #if CONFIG_MQTT_BUFFER_SIZE #define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE #else diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 21b1436..418f09a 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -44,6 +44,12 @@ esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); +/** + * @brief Deletes single expired message returning it's message id + * + * @return msg id of the deleted message, -1 if no expired message in the outbox + */ +int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 7409965..b0bd784 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -153,6 +153,22 @@ esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type) } return ESP_OK; } +int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) +{ + int msg_id = -1; + outbox_item_handle_t item, tmp; + STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { + if (current_tick - item->tick > timeout) { + STAILQ_REMOVE(outbox, item, outbox_item, next); + free(item->buffer); + msg_id = item->msg_id; + free(item); + return msg_id; + } + + } + return msg_id; +} int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) { diff --git a/mqtt_client.c b/mqtt_client.c index def0e28..2d6e315 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1282,6 +1282,31 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item return ESP_OK; } +static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client) +{ + // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds +#if MQTT_REPORT_DELETED_MESSAGES + // also report the deleted items as MQTT_EVENT_DELETED events if enabled + int deleted_items = 0; + int msg_id = 0; + while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) { + client->event.event_id = MQTT_EVENT_DELETED; + client->event.msg_id = msg_id; + if (esp_mqtt_dispatch_event(client) != ESP_OK) { + ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id); + } + deleted_items ++; + } +#else + int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); +#endif + client->mqtt_state.pending_msg_count -= deleted_items; + + if (client->mqtt_state.pending_msg_count < 0) { + client->mqtt_state.pending_msg_count = 0; + } +} + static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; @@ -1353,13 +1378,8 @@ static void esp_mqtt_task(void *pv) break; } - //Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds - int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); - client->mqtt_state.pending_msg_count -= deleted; - - if (client->mqtt_state.pending_msg_count < 0) { - client->mqtt_state.pending_msg_count = 0; - } + // delete long pending messages + mqtt_delete_expired_messages(client); // resend all non-transmitted messages first outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL); @@ -1674,13 +1694,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, ret = pending_msg_id; } - // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds - int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); - client->mqtt_state.pending_msg_count -= deleted; - - if (client->mqtt_state.pending_msg_count < 0) { - client->mqtt_state.pending_msg_count = 0; - } + // delete long pending messages + mqtt_delete_expired_messages(client); goto cannot_publish; }