diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 308cce2..5d019fa 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -328,7 +328,8 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); /** - * @brief Enqueue a message (with qos set to 1 or 2) to the outbox, to be sent later. + * @brief Enqueue a message to the outbox, to be sent later. Typically used for messages with qos>0, but could + * be also used for qos=0 messages if store=true. * * This API generates and stores the publish message into the internal outbox and the actual sending * to the network is performed in the mqtt-task context (in contrast to the esp_mqtt_client_publish() @@ -339,12 +340,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, * @param topic topic string * @param data payload string (set to NULL, sending empty payload message) * @param len data length, if set to 0, length is calculated from payload string - * @param qos qos of publish message, this API is applicable only to qos=1 or 2 + * @param qos qos of publish message * @param retain retain flag + * @param store if true, all messages are enqueued; otherwise only qos1 and qos 2 are enqueued * * @return message_id if queued successfully, -1 otherwise */ -int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store); /** * @brief Destroys the client handle diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 418f09a..cd51c47 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -43,6 +43,7 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); +esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item); int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); /** * @brief Deletes single expired message returning it's message id diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index b0bd784..531f83f 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -80,6 +80,20 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend return NULL; } +esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete) +{ + outbox_item_handle_t item; + STAILQ_FOREACH(item, outbox, next) { + if (item == item_to_delete) { + STAILQ_REMOVE(outbox, item, outbox_item, next); + free(item->buffer); + free(item); + return ESP_OK; + } + } + return ESP_FAIL; +} + uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) { if (item) { diff --git a/mqtt_client.c b/mqtt_client.c index 2d6e315..ef2201d 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1279,6 +1279,14 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item esp_mqtt_abort_connection(client); return ESP_FAIL; } + + // check if it was QoS-0 publish message + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) { + // delete all qos0 publish messages once we process them + if (outbox_delete_item(client->outbox, item) != ESP_OK) { + ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox"); + } + } return ESP_OK; } @@ -1629,7 +1637,8 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top return client->mqtt_state.pending_msg_id; } -static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, + int len, int qos, int retain, bool store) { uint16_t pending_msg_id = 0; @@ -1653,7 +1662,7 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons } /* We have to set as pending all the qos>0 messages */ client->mqtt_state.outbound_message = publish_msg; - if (qos > 0) { + if (qos > 0 || store) { client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; @@ -1675,12 +1684,12 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, MQTT_API_LOCK(client); #if MQTT_SKIP_PUBLISH_IF_DISCONNECTED if (client->state != MQTT_STATE_CONNECTED) { - ESP_LOGE(TAG, "Publish failed: client is not connected"); + ESP_LOGI(TAG, "Publishing skipped: client is not connected"); MQTT_API_UNLOCK(client); return -1; } #endif - int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false); if (pending_msg_id < 0) { MQTT_API_UNLOCK(client); return -1; @@ -1759,13 +1768,13 @@ cannot_publish: return ret; } -int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store) { MQTT_API_LOCK(client); - int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store); MQTT_API_UNLOCK(client); - if (ret == 0) { - // messages with qos=0 are not enqueued -> indicate as error + if (ret == 0 && store == false) { + // messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error return -1; } return ret;