From f65d5d05db9b984065079e622e0adeac4c4c7db7 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Sun, 6 Dec 2020 11:25:48 +0100 Subject: [PATCH 1/6] Cleanup public include dirs --- include/mqtt_client.h | 4 ---- {include => lib/include}/mqtt_config.h | 0 {include => lib/include}/mqtt_supported_features.h | 0 3 files changed, 4 deletions(-) rename {include => lib/include}/mqtt_config.h (100%) rename {include => lib/include}/mqtt_supported_features.h (100%) diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 67c0828..d8c69d2 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -12,11 +12,7 @@ #include #include "esp_err.h" -#include "mqtt_config.h" #include "esp_event.h" -#if CONFIG_ESP_TLS_USE_DS_PERIPHERAL -#include "rsa_sign_alt.h" -#endif #ifdef __cplusplus extern "C" { diff --git a/include/mqtt_config.h b/lib/include/mqtt_config.h similarity index 100% rename from include/mqtt_config.h rename to lib/include/mqtt_config.h diff --git a/include/mqtt_supported_features.h b/lib/include/mqtt_supported_features.h similarity index 100% rename from include/mqtt_supported_features.h rename to lib/include/mqtt_supported_features.h From 8bb4a26f46b9bb0b00f66d61671833a184fc7afa Mon Sep 17 00:00:00 2001 From: David Cermak Date: Sun, 6 Dec 2020 11:46:36 +0100 Subject: [PATCH 2/6] Config: Add a new option to use incremental message id This option is off by default, so the client still uses random message id unless CONFIG_MQTT_MSG_ID_INCREMENTAL is set Closes https://github.com/espressif/esp-mqtt/issues/176 --- lib/include/mqtt_config.h | 2 ++ lib/include/mqtt_msg.h | 7 ++++--- lib/mqtt_msg.c | 4 ++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/include/mqtt_config.h b/lib/include/mqtt_config.h index d9b64c7..4932297 100644 --- a/lib/include/mqtt_config.h +++ b/lib/include/mqtt_config.h @@ -15,6 +15,8 @@ #define MQTT_RECON_DEFAULT_MS (10*1000) #define MQTT_POLL_READ_TIMEOUT_MS (1000) +#define MQTT_MSG_ID_INCREMENTAL CONFIG_MQTT_MSG_ID_INCREMENTAL + #if CONFIG_MQTT_BUFFER_SIZE #define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE #else diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 78d6e40..a1dc612 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -70,8 +70,9 @@ typedef struct mqtt_message { typedef struct mqtt_connection { mqtt_message_t message; - - uint16_t message_id; +#if MQTT_MSG_ID_INCREMENTAL + uint16_t last_message_id; /*!< last used id if incremental message id configured */ +#endif uint8_t *buffer; size_t buffer_length; @@ -83,7 +84,7 @@ typedef struct mqtt_connect_info { char *password; char *will_topic; char *will_message; - int keepalive; // keepalive=0 -> keepalive is disabled + int keepalive; /*!< keepalive=0 -> keepalive is disabled */ int will_length; int will_qos; int will_retain; diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index fcfe538..c884577 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -64,7 +64,11 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag // If message_id is zero then we should assign one, otherwise // we'll use the one supplied by the caller while (message_id == 0) { +#if MQTT_MSG_ID_INCREMENTAL + message_id = ++connection->last_message_id; +#else message_id = platform_random(65535); +#endif } if (connection->message.length + 2 > connection->buffer_length) { From dc7fd5c0b1c132dc0e3c265db6bc2ac886f0f0b2 Mon Sep 17 00:00:00 2001 From: Umer Ilyas Date: Mon, 13 Jul 2020 18:52:34 +0500 Subject: [PATCH 3/6] Publish: Add new API to enqueue qos>0 messages Closes https://github.com/espressif/esp-mqtt/issues/155 --- include/mqtt_client.h | 19 +++++++++++++++++++ mqtt_client.c | 37 +++++++++++++++++++++++++++++-------- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/include/mqtt_client.h b/include/mqtt_client.h index d8c69d2..07dd978 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -318,6 +318,25 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top */ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); +/** + * @brief Enqueue a message (with qos set to 1 or 2) to the outbox, to be sent later. + * + * This API generates and stores the publish message into the internal outbox and the actual sending + * to the network is performed in the mqtt-task context (in contrast to the esp_mqtt_client_publish() + * which sends the publish message immediately in the user task's context). + * Thus, it could be used as a non blocking version of esp_mqtt_client_publish(). + * + * @param client mqtt client handle + * @param topic topic string + * @param data payload string (set to NULL, sending empty payload message) + * @param len data length, if set to 0, length is calculated from payload string + * @param qos qos of publish message, this API is applicable only to qos=1 or 2 + * @param retain retain flag + * + * @return message_id if queued successfully, -1 otherwise + */ +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); + /** * @brief Destroys the client handle * diff --git a/mqtt_client.c b/mqtt_client.c index f2d6557..651515c 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1609,10 +1609,9 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top return client->mqtt_state.pending_msg_id; } -int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) { uint16_t pending_msg_id = 0; - int ret = 0; /* Acceptable publish messages: data == NULL, len == 0: publish null message @@ -1623,15 +1622,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } - MQTT_API_LOCK(client); mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, - topic, data, len, - qos, retain, - &pending_msg_id); + topic, data, len, + qos, retain, + &pending_msg_id); if (publish_msg->length == 0) { ESP_LOGE(TAG, "Publish message cannot be created"); - MQTT_API_UNLOCK(client); return -1; } /* We have to set as pending all the qos>0 messages */ @@ -1647,8 +1644,21 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, } else { int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment); + client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; } } + return pending_msg_id; +} + +int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +{ + MQTT_API_LOCK(client); + int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + if (pending_msg_id < 0) { + MQTT_API_UNLOCK(client); + return -1; + } + int ret = 0; /* Skip sending if not connected (rely on resending) */ if (client->state != MQTT_STATE_CONNECTED) { @@ -1657,7 +1667,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, ret = pending_msg_id; } - //Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds + // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); client->mqtt_state.pending_msg_count -= deleted; @@ -1727,6 +1737,17 @@ cannot_publish: return ret; } +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +{ + MQTT_API_LOCK(client); + int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + MQTT_API_UNLOCK(client); + if (ret == 0) { + // messages with qos=0 are not enqueued -> indicate as error + return -1; + } + return ret; +} esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void* event_handler_arg) { From f44dcb1c26e47cc96eef379810a343246d8c265b Mon Sep 17 00:00:00 2001 From: David Cermak Date: Sun, 6 Dec 2020 14:08:45 +0100 Subject: [PATCH 4/6] Config: Add a new option to disable publishing when disconnected Related https://github.com/espressif/esp-mqtt/pull/177 --- include/mqtt_client.h | 6 ++++-- lib/include/mqtt_config.h | 2 ++ mqtt_client.c | 7 +++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 07dd978..f6f4fe0 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -302,8 +302,10 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top * - This API might block for several seconds, either due to network timeout (10s) * or if publishing payloads longer than internal buffer (due to message * fragmentation) - * - Client doesn't have to be connected to send publish message - * (although it would drop all qos=0 messages, qos>1 messages would be enqueued) + * - Client doesn't have to be connected for this API to work, enqueueing the messages + * with qos>1 (returning -1 for all the qos=0 messages if disconnected). + * If MQTT_SKIP_PUBLISH_IF_DISCONNECTED is enabled, this API will not attempt to publish + * when the client is not connected and will always return -1. * - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details * * @param client mqtt client handle diff --git a/lib/include/mqtt_config.h b/lib/include/mqtt_config.h index 4932297..da2a93a 100644 --- a/lib/include/mqtt_config.h +++ b/lib/include/mqtt_config.h @@ -17,6 +17,8 @@ #define MQTT_MSG_ID_INCREMENTAL CONFIG_MQTT_MSG_ID_INCREMENTAL +#define MQTT_SKIP_PUBLISH_IF_DISCONNECTED CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED + #if CONFIG_MQTT_BUFFER_SIZE #define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE #else diff --git a/mqtt_client.c b/mqtt_client.c index 651515c..def0e28 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1653,6 +1653,13 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) { MQTT_API_LOCK(client); +#if MQTT_SKIP_PUBLISH_IF_DISCONNECTED + if (client->state != MQTT_STATE_CONNECTED) { + ESP_LOGE(TAG, "Publish failed: client is not connected"); + MQTT_API_UNLOCK(client); + return -1; + } +#endif int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); if (pending_msg_id < 0) { MQTT_API_UNLOCK(client); From 2e35d4d4d53a9a4d74c475de228e8c78f69a51bd Mon Sep 17 00:00:00 2001 From: David Cermak Date: Sun, 6 Dec 2020 15:23:08 +0100 Subject: [PATCH 5/6] Events: Add new event to report deleted messages from outbox --- include/mqtt_client.h | 7 +++++++ lib/include/mqtt_config.h | 2 ++ lib/include/mqtt_outbox.h | 6 ++++++ lib/mqtt_outbox.c | 16 +++++++++++++++ mqtt_client.c | 43 ++++++++++++++++++++++++++------------- 5 files changed, 60 insertions(+), 14 deletions(-) diff --git a/include/mqtt_client.h b/include/mqtt_client.h index f6f4fe0..308cce2 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -57,6 +57,13 @@ typedef enum { and current data offset updating. */ MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */ + MQTT_EVENT_DELETED, /*!< Notification on delete of one message from the internal outbox, + if the message couldn't have been sent and acknowledged before expiring + defined in OUTBOX_EXPIRED_TIMEOUT_MS. + (events are not posted upon deletion of successfully acknowledged messages) + - This event id is posted only if MQTT_REPORT_DELETED_MESSAGES==1 + - Additional context: msg_id (id of the deleted message). + */ } esp_mqtt_event_id_t; /** diff --git a/lib/include/mqtt_config.h b/lib/include/mqtt_config.h index da2a93a..ca96d0c 100644 --- a/lib/include/mqtt_config.h +++ b/lib/include/mqtt_config.h @@ -19,6 +19,8 @@ #define MQTT_SKIP_PUBLISH_IF_DISCONNECTED CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED +#define MQTT_REPORT_DELETED_MESSAGES CONFIG_MQTT_REPORT_DELETED_MESSAGES + #if CONFIG_MQTT_BUFFER_SIZE #define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE #else diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 21b1436..418f09a 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -44,6 +44,12 @@ esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); +/** + * @brief Deletes single expired message returning it's message id + * + * @return msg id of the deleted message, -1 if no expired message in the outbox + */ +int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 7409965..b0bd784 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -153,6 +153,22 @@ esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type) } return ESP_OK; } +int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) +{ + int msg_id = -1; + outbox_item_handle_t item, tmp; + STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { + if (current_tick - item->tick > timeout) { + STAILQ_REMOVE(outbox, item, outbox_item, next); + free(item->buffer); + msg_id = item->msg_id; + free(item); + return msg_id; + } + + } + return msg_id; +} int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) { diff --git a/mqtt_client.c b/mqtt_client.c index def0e28..2d6e315 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1282,6 +1282,31 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item return ESP_OK; } +static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client) +{ + // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds +#if MQTT_REPORT_DELETED_MESSAGES + // also report the deleted items as MQTT_EVENT_DELETED events if enabled + int deleted_items = 0; + int msg_id = 0; + while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) { + client->event.event_id = MQTT_EVENT_DELETED; + client->event.msg_id = msg_id; + if (esp_mqtt_dispatch_event(client) != ESP_OK) { + ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id); + } + deleted_items ++; + } +#else + int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); +#endif + client->mqtt_state.pending_msg_count -= deleted_items; + + if (client->mqtt_state.pending_msg_count < 0) { + client->mqtt_state.pending_msg_count = 0; + } +} + static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; @@ -1353,13 +1378,8 @@ static void esp_mqtt_task(void *pv) break; } - //Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds - int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); - client->mqtt_state.pending_msg_count -= deleted; - - if (client->mqtt_state.pending_msg_count < 0) { - client->mqtt_state.pending_msg_count = 0; - } + // delete long pending messages + mqtt_delete_expired_messages(client); // resend all non-transmitted messages first outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL); @@ -1674,13 +1694,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, ret = pending_msg_id; } - // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds - int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); - client->mqtt_state.pending_msg_count -= deleted; - - if (client->mqtt_state.pending_msg_count < 0) { - client->mqtt_state.pending_msg_count = 0; - } + // delete long pending messages + mqtt_delete_expired_messages(client); goto cannot_publish; } From e2de0f3e3eb4d4afd4fdb5c2860a96a2ab8b1e21 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Tue, 8 Dec 2020 20:57:00 +0100 Subject: [PATCH 6/6] Publish: Allow for qos=0 messages to be stored using esp_mqtt_client_enqueue() The API presents a boolean parameter to control storing the qos=0 messages into the internal outbox --- include/mqtt_client.h | 8 +++++--- lib/include/mqtt_outbox.h | 1 + lib/mqtt_outbox.c | 14 ++++++++++++++ mqtt_client.c | 25 +++++++++++++++++-------- 4 files changed, 37 insertions(+), 11 deletions(-) diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 308cce2..5d019fa 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -328,7 +328,8 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); /** - * @brief Enqueue a message (with qos set to 1 or 2) to the outbox, to be sent later. + * @brief Enqueue a message to the outbox, to be sent later. Typically used for messages with qos>0, but could + * be also used for qos=0 messages if store=true. * * This API generates and stores the publish message into the internal outbox and the actual sending * to the network is performed in the mqtt-task context (in contrast to the esp_mqtt_client_publish() @@ -339,12 +340,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, * @param topic topic string * @param data payload string (set to NULL, sending empty payload message) * @param len data length, if set to 0, length is calculated from payload string - * @param qos qos of publish message, this API is applicable only to qos=1 or 2 + * @param qos qos of publish message * @param retain retain flag + * @param store if true, all messages are enqueued; otherwise only qos1 and qos 2 are enqueued * * @return message_id if queued successfully, -1 otherwise */ -int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store); /** * @brief Destroys the client handle diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 418f09a..cd51c47 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -43,6 +43,7 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); +esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item); int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); /** * @brief Deletes single expired message returning it's message id diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index b0bd784..531f83f 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -80,6 +80,20 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend return NULL; } +esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete) +{ + outbox_item_handle_t item; + STAILQ_FOREACH(item, outbox, next) { + if (item == item_to_delete) { + STAILQ_REMOVE(outbox, item, outbox_item, next); + free(item->buffer); + free(item); + return ESP_OK; + } + } + return ESP_FAIL; +} + uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) { if (item) { diff --git a/mqtt_client.c b/mqtt_client.c index 2d6e315..ef2201d 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1279,6 +1279,14 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item esp_mqtt_abort_connection(client); return ESP_FAIL; } + + // check if it was QoS-0 publish message + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) { + // delete all qos0 publish messages once we process them + if (outbox_delete_item(client->outbox, item) != ESP_OK) { + ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox"); + } + } return ESP_OK; } @@ -1629,7 +1637,8 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top return client->mqtt_state.pending_msg_id; } -static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, + int len, int qos, int retain, bool store) { uint16_t pending_msg_id = 0; @@ -1653,7 +1662,7 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons } /* We have to set as pending all the qos>0 messages */ client->mqtt_state.outbound_message = publish_msg; - if (qos > 0) { + if (qos > 0 || store) { client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; @@ -1675,12 +1684,12 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, MQTT_API_LOCK(client); #if MQTT_SKIP_PUBLISH_IF_DISCONNECTED if (client->state != MQTT_STATE_CONNECTED) { - ESP_LOGE(TAG, "Publish failed: client is not connected"); + ESP_LOGI(TAG, "Publishing skipped: client is not connected"); MQTT_API_UNLOCK(client); return -1; } #endif - int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false); if (pending_msg_id < 0) { MQTT_API_UNLOCK(client); return -1; @@ -1759,13 +1768,13 @@ cannot_publish: return ret; } -int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store) { MQTT_API_LOCK(client); - int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store); MQTT_API_UNLOCK(client); - if (ret == 0) { - // messages with qos=0 are not enqueued -> indicate as error + if (ret == 0 && store == false) { + // messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error return -1; } return ret;