mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-30 10:48:06 +02:00
Events: Add new event to report deleted messages from outbox
This commit is contained in:
@ -57,6 +57,13 @@ typedef enum {
|
|||||||
and current data offset updating.
|
and current data offset updating.
|
||||||
*/
|
*/
|
||||||
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
|
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
|
||||||
|
MQTT_EVENT_DELETED, /*!< Notification on delete of one message from the internal outbox,
|
||||||
|
if the message couldn't have been sent and acknowledged before expiring
|
||||||
|
defined in OUTBOX_EXPIRED_TIMEOUT_MS.
|
||||||
|
(events are not posted upon deletion of successfully acknowledged messages)
|
||||||
|
- This event id is posted only if MQTT_REPORT_DELETED_MESSAGES==1
|
||||||
|
- Additional context: msg_id (id of the deleted message).
|
||||||
|
*/
|
||||||
} esp_mqtt_event_id_t;
|
} esp_mqtt_event_id_t;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
|
|
||||||
#define MQTT_SKIP_PUBLISH_IF_DISCONNECTED CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED
|
#define MQTT_SKIP_PUBLISH_IF_DISCONNECTED CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED
|
||||||
|
|
||||||
|
#define MQTT_REPORT_DELETED_MESSAGES CONFIG_MQTT_REPORT_DELETED_MESSAGES
|
||||||
|
|
||||||
#if CONFIG_MQTT_BUFFER_SIZE
|
#if CONFIG_MQTT_BUFFER_SIZE
|
||||||
#define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE
|
#define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE
|
||||||
#else
|
#else
|
||||||
|
@ -44,6 +44,12 @@ esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
|||||||
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
|
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
|
||||||
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
|
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
|
||||||
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
||||||
|
/**
|
||||||
|
* @brief Deletes single expired message returning it's message id
|
||||||
|
*
|
||||||
|
* @return msg id of the deleted message, -1 if no expired message in the outbox
|
||||||
|
*/
|
||||||
|
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
||||||
|
|
||||||
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
|
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
|
||||||
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
|
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
|
||||||
|
@ -153,6 +153,22 @@ esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
|
|||||||
}
|
}
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
|
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
|
||||||
|
{
|
||||||
|
int msg_id = -1;
|
||||||
|
outbox_item_handle_t item, tmp;
|
||||||
|
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
|
||||||
|
if (current_tick - item->tick > timeout) {
|
||||||
|
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||||
|
free(item->buffer);
|
||||||
|
msg_id = item->msg_id;
|
||||||
|
free(item);
|
||||||
|
return msg_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return msg_id;
|
||||||
|
}
|
||||||
|
|
||||||
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
|
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
|
||||||
{
|
{
|
||||||
|
@ -1282,6 +1282,31 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
|
|||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
|
||||||
|
{
|
||||||
|
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
|
||||||
|
#if MQTT_REPORT_DELETED_MESSAGES
|
||||||
|
// also report the deleted items as MQTT_EVENT_DELETED events if enabled
|
||||||
|
int deleted_items = 0;
|
||||||
|
int msg_id = 0;
|
||||||
|
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) {
|
||||||
|
client->event.event_id = MQTT_EVENT_DELETED;
|
||||||
|
client->event.msg_id = msg_id;
|
||||||
|
if (esp_mqtt_dispatch_event(client) != ESP_OK) {
|
||||||
|
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
|
||||||
|
}
|
||||||
|
deleted_items ++;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||||
|
#endif
|
||||||
|
client->mqtt_state.pending_msg_count -= deleted_items;
|
||||||
|
|
||||||
|
if (client->mqtt_state.pending_msg_count < 0) {
|
||||||
|
client->mqtt_state.pending_msg_count = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void esp_mqtt_task(void *pv)
|
static void esp_mqtt_task(void *pv)
|
||||||
{
|
{
|
||||||
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
|
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
|
||||||
@ -1353,13 +1378,8 @@ static void esp_mqtt_task(void *pv)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
//Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds
|
// delete long pending messages
|
||||||
int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
mqtt_delete_expired_messages(client);
|
||||||
client->mqtt_state.pending_msg_count -= deleted;
|
|
||||||
|
|
||||||
if (client->mqtt_state.pending_msg_count < 0) {
|
|
||||||
client->mqtt_state.pending_msg_count = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// resend all non-transmitted messages first
|
// resend all non-transmitted messages first
|
||||||
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
|
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
|
||||||
@ -1674,13 +1694,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
ret = pending_msg_id;
|
ret = pending_msg_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
|
// delete long pending messages
|
||||||
int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
mqtt_delete_expired_messages(client);
|
||||||
client->mqtt_state.pending_msg_count -= deleted;
|
|
||||||
|
|
||||||
if (client->mqtt_state.pending_msg_count < 0) {
|
|
||||||
client->mqtt_state.pending_msg_count = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
goto cannot_publish;
|
goto cannot_publish;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user