mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-30 02:38:19 +02:00
Publish: Allow for qos=0 messages to be stored using esp_mqtt_client_enqueue()
The API presents a boolean parameter to control storing the qos=0 messages into the internal outbox
This commit is contained in:
@ -328,7 +328,8 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
|
||||
|
||||
/**
|
||||
* @brief Enqueue a message (with qos set to 1 or 2) to the outbox, to be sent later.
|
||||
* @brief Enqueue a message to the outbox, to be sent later. Typically used for messages with qos>0, but could
|
||||
* be also used for qos=0 messages if store=true.
|
||||
*
|
||||
* This API generates and stores the publish message into the internal outbox and the actual sending
|
||||
* to the network is performed in the mqtt-task context (in contrast to the esp_mqtt_client_publish()
|
||||
@ -339,12 +340,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
* @param topic topic string
|
||||
* @param data payload string (set to NULL, sending empty payload message)
|
||||
* @param len data length, if set to 0, length is calculated from payload string
|
||||
* @param qos qos of publish message, this API is applicable only to qos=1 or 2
|
||||
* @param qos qos of publish message
|
||||
* @param retain retain flag
|
||||
* @param store if true, all messages are enqueued; otherwise only qos1 and qos 2 are enqueued
|
||||
*
|
||||
* @return message_id if queued successfully, -1 otherwise
|
||||
*/
|
||||
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
|
||||
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store);
|
||||
|
||||
/**
|
||||
* @brief Destroys the client handle
|
||||
|
@ -43,6 +43,7 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t
|
||||
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
||||
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
|
||||
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
|
||||
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
|
||||
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
||||
/**
|
||||
* @brief Deletes single expired message returning it's message id
|
||||
|
@ -80,6 +80,20 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
|
||||
return NULL;
|
||||
}
|
||||
|
||||
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
|
||||
{
|
||||
outbox_item_handle_t item;
|
||||
STAILQ_FOREACH(item, outbox, next) {
|
||||
if (item == item_to_delete) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
free(item->buffer);
|
||||
free(item);
|
||||
return ESP_OK;
|
||||
}
|
||||
}
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
|
||||
{
|
||||
if (item) {
|
||||
|
@ -1279,6 +1279,14 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
|
||||
esp_mqtt_abort_connection(client);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
// check if it was QoS-0 publish message
|
||||
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
|
||||
// delete all qos0 publish messages once we process them
|
||||
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
|
||||
}
|
||||
}
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
@ -1629,7 +1637,8 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
||||
return client->mqtt_state.pending_msg_id;
|
||||
}
|
||||
|
||||
static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||
static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data,
|
||||
int len, int qos, int retain, bool store)
|
||||
{
|
||||
uint16_t pending_msg_id = 0;
|
||||
|
||||
@ -1653,7 +1662,7 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
|
||||
}
|
||||
/* We have to set as pending all the qos>0 messages */
|
||||
client->mqtt_state.outbound_message = publish_msg;
|
||||
if (qos > 0) {
|
||||
if (qos > 0 || store) {
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_id = pending_msg_id;
|
||||
client->mqtt_state.pending_publish_qos = qos;
|
||||
@ -1675,12 +1684,12 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
MQTT_API_LOCK(client);
|
||||
#if MQTT_SKIP_PUBLISH_IF_DISCONNECTED
|
||||
if (client->state != MQTT_STATE_CONNECTED) {
|
||||
ESP_LOGE(TAG, "Publish failed: client is not connected");
|
||||
ESP_LOGI(TAG, "Publishing skipped: client is not connected");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain);
|
||||
int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false);
|
||||
if (pending_msg_id < 0) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
@ -1759,13 +1768,13 @@ cannot_publish:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store)
|
||||
{
|
||||
MQTT_API_LOCK(client);
|
||||
int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain);
|
||||
int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store);
|
||||
MQTT_API_UNLOCK(client);
|
||||
if (ret == 0) {
|
||||
// messages with qos=0 are not enqueued -> indicate as error
|
||||
if (ret == 0 && store == false) {
|
||||
// messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error
|
||||
return -1;
|
||||
}
|
||||
return ret;
|
||||
|
Reference in New Issue
Block a user