mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-30 10:48:06 +02:00
Merge branch 'feature/queue_if_disconnect' into 'master'
esp-mqtt: More configs to queue when disconnected, events on delete, incremental msg-id See merge request espressif/esp-mqtt!85
This commit is contained in:
@ -12,11 +12,7 @@
|
|||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include "esp_err.h"
|
#include "esp_err.h"
|
||||||
|
|
||||||
#include "mqtt_config.h"
|
|
||||||
#include "esp_event.h"
|
#include "esp_event.h"
|
||||||
#if CONFIG_ESP_TLS_USE_DS_PERIPHERAL
|
|
||||||
#include "rsa_sign_alt.h"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
@ -61,6 +57,13 @@ typedef enum {
|
|||||||
and current data offset updating.
|
and current data offset updating.
|
||||||
*/
|
*/
|
||||||
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
|
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
|
||||||
|
MQTT_EVENT_DELETED, /*!< Notification on delete of one message from the internal outbox,
|
||||||
|
if the message couldn't have been sent and acknowledged before expiring
|
||||||
|
defined in OUTBOX_EXPIRED_TIMEOUT_MS.
|
||||||
|
(events are not posted upon deletion of successfully acknowledged messages)
|
||||||
|
- This event id is posted only if MQTT_REPORT_DELETED_MESSAGES==1
|
||||||
|
- Additional context: msg_id (id of the deleted message).
|
||||||
|
*/
|
||||||
} esp_mqtt_event_id_t;
|
} esp_mqtt_event_id_t;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -306,8 +309,10 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
* - This API might block for several seconds, either due to network timeout (10s)
|
* - This API might block for several seconds, either due to network timeout (10s)
|
||||||
* or if publishing payloads longer than internal buffer (due to message
|
* or if publishing payloads longer than internal buffer (due to message
|
||||||
* fragmentation)
|
* fragmentation)
|
||||||
* - Client doesn't have to be connected to send publish message
|
* - Client doesn't have to be connected for this API to work, enqueueing the messages
|
||||||
* (although it would drop all qos=0 messages, qos>1 messages would be enqueued)
|
* with qos>1 (returning -1 for all the qos=0 messages if disconnected).
|
||||||
|
* If MQTT_SKIP_PUBLISH_IF_DISCONNECTED is enabled, this API will not attempt to publish
|
||||||
|
* when the client is not connected and will always return -1.
|
||||||
* - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details
|
* - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details
|
||||||
*
|
*
|
||||||
* @param client mqtt client handle
|
* @param client mqtt client handle
|
||||||
@ -322,6 +327,27 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
*/
|
*/
|
||||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
|
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Enqueue a message to the outbox, to be sent later. Typically used for messages with qos>0, but could
|
||||||
|
* be also used for qos=0 messages if store=true.
|
||||||
|
*
|
||||||
|
* This API generates and stores the publish message into the internal outbox and the actual sending
|
||||||
|
* to the network is performed in the mqtt-task context (in contrast to the esp_mqtt_client_publish()
|
||||||
|
* which sends the publish message immediately in the user task's context).
|
||||||
|
* Thus, it could be used as a non blocking version of esp_mqtt_client_publish().
|
||||||
|
*
|
||||||
|
* @param client mqtt client handle
|
||||||
|
* @param topic topic string
|
||||||
|
* @param data payload string (set to NULL, sending empty payload message)
|
||||||
|
* @param len data length, if set to 0, length is calculated from payload string
|
||||||
|
* @param qos qos of publish message
|
||||||
|
* @param retain retain flag
|
||||||
|
* @param store if true, all messages are enqueued; otherwise only qos1 and qos 2 are enqueued
|
||||||
|
*
|
||||||
|
* @return message_id if queued successfully, -1 otherwise
|
||||||
|
*/
|
||||||
|
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Destroys the client handle
|
* @brief Destroys the client handle
|
||||||
*
|
*
|
||||||
|
@ -15,6 +15,12 @@
|
|||||||
#define MQTT_RECON_DEFAULT_MS (10*1000)
|
#define MQTT_RECON_DEFAULT_MS (10*1000)
|
||||||
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
|
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
|
||||||
|
|
||||||
|
#define MQTT_MSG_ID_INCREMENTAL CONFIG_MQTT_MSG_ID_INCREMENTAL
|
||||||
|
|
||||||
|
#define MQTT_SKIP_PUBLISH_IF_DISCONNECTED CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED
|
||||||
|
|
||||||
|
#define MQTT_REPORT_DELETED_MESSAGES CONFIG_MQTT_REPORT_DELETED_MESSAGES
|
||||||
|
|
||||||
#if CONFIG_MQTT_BUFFER_SIZE
|
#if CONFIG_MQTT_BUFFER_SIZE
|
||||||
#define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE
|
#define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE
|
||||||
#else
|
#else
|
@ -70,8 +70,9 @@ typedef struct mqtt_message {
|
|||||||
|
|
||||||
typedef struct mqtt_connection {
|
typedef struct mqtt_connection {
|
||||||
mqtt_message_t message;
|
mqtt_message_t message;
|
||||||
|
#if MQTT_MSG_ID_INCREMENTAL
|
||||||
uint16_t message_id;
|
uint16_t last_message_id; /*!< last used id if incremental message id configured */
|
||||||
|
#endif
|
||||||
uint8_t *buffer;
|
uint8_t *buffer;
|
||||||
size_t buffer_length;
|
size_t buffer_length;
|
||||||
|
|
||||||
@ -83,7 +84,7 @@ typedef struct mqtt_connect_info {
|
|||||||
char *password;
|
char *password;
|
||||||
char *will_topic;
|
char *will_topic;
|
||||||
char *will_message;
|
char *will_message;
|
||||||
int keepalive; // keepalive=0 -> keepalive is disabled
|
int keepalive; /*!< keepalive=0 -> keepalive is disabled */
|
||||||
int will_length;
|
int will_length;
|
||||||
int will_qos;
|
int will_qos;
|
||||||
int will_retain;
|
int will_retain;
|
||||||
|
@ -43,7 +43,14 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t
|
|||||||
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
||||||
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
|
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
|
||||||
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
|
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
|
||||||
|
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
|
||||||
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
||||||
|
/**
|
||||||
|
* @brief Deletes single expired message returning it's message id
|
||||||
|
*
|
||||||
|
* @return msg id of the deleted message, -1 if no expired message in the outbox
|
||||||
|
*/
|
||||||
|
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
||||||
|
|
||||||
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
|
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
|
||||||
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
|
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
|
||||||
|
@ -64,7 +64,11 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
|
|||||||
// If message_id is zero then we should assign one, otherwise
|
// If message_id is zero then we should assign one, otherwise
|
||||||
// we'll use the one supplied by the caller
|
// we'll use the one supplied by the caller
|
||||||
while (message_id == 0) {
|
while (message_id == 0) {
|
||||||
|
#if MQTT_MSG_ID_INCREMENTAL
|
||||||
|
message_id = ++connection->last_message_id;
|
||||||
|
#else
|
||||||
message_id = platform_random(65535);
|
message_id = platform_random(65535);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connection->message.length + 2 > connection->buffer_length) {
|
if (connection->message.length + 2 > connection->buffer_length) {
|
||||||
|
@ -80,6 +80,20 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
|
||||||
|
{
|
||||||
|
outbox_item_handle_t item;
|
||||||
|
STAILQ_FOREACH(item, outbox, next) {
|
||||||
|
if (item == item_to_delete) {
|
||||||
|
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||||
|
free(item->buffer);
|
||||||
|
free(item);
|
||||||
|
return ESP_OK;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ESP_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
|
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
|
||||||
{
|
{
|
||||||
if (item) {
|
if (item) {
|
||||||
@ -153,6 +167,22 @@ esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
|
|||||||
}
|
}
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
|
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
|
||||||
|
{
|
||||||
|
int msg_id = -1;
|
||||||
|
outbox_item_handle_t item, tmp;
|
||||||
|
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
|
||||||
|
if (current_tick - item->tick > timeout) {
|
||||||
|
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||||
|
free(item->buffer);
|
||||||
|
msg_id = item->msg_id;
|
||||||
|
free(item);
|
||||||
|
return msg_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return msg_id;
|
||||||
|
}
|
||||||
|
|
||||||
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
|
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
|
||||||
{
|
{
|
||||||
|
@ -1279,9 +1279,42 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
|
|||||||
esp_mqtt_abort_connection(client);
|
esp_mqtt_abort_connection(client);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if it was QoS-0 publish message
|
||||||
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
|
||||||
|
// delete all qos0 publish messages once we process them
|
||||||
|
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
|
||||||
|
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
|
||||||
|
}
|
||||||
|
}
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
|
||||||
|
{
|
||||||
|
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
|
||||||
|
#if MQTT_REPORT_DELETED_MESSAGES
|
||||||
|
// also report the deleted items as MQTT_EVENT_DELETED events if enabled
|
||||||
|
int deleted_items = 0;
|
||||||
|
int msg_id = 0;
|
||||||
|
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) {
|
||||||
|
client->event.event_id = MQTT_EVENT_DELETED;
|
||||||
|
client->event.msg_id = msg_id;
|
||||||
|
if (esp_mqtt_dispatch_event(client) != ESP_OK) {
|
||||||
|
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
|
||||||
|
}
|
||||||
|
deleted_items ++;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||||
|
#endif
|
||||||
|
client->mqtt_state.pending_msg_count -= deleted_items;
|
||||||
|
|
||||||
|
if (client->mqtt_state.pending_msg_count < 0) {
|
||||||
|
client->mqtt_state.pending_msg_count = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void esp_mqtt_task(void *pv)
|
static void esp_mqtt_task(void *pv)
|
||||||
{
|
{
|
||||||
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
|
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
|
||||||
@ -1353,13 +1386,8 @@ static void esp_mqtt_task(void *pv)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
//Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds
|
// delete long pending messages
|
||||||
int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
mqtt_delete_expired_messages(client);
|
||||||
client->mqtt_state.pending_msg_count -= deleted;
|
|
||||||
|
|
||||||
if (client->mqtt_state.pending_msg_count < 0) {
|
|
||||||
client->mqtt_state.pending_msg_count = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// resend all non-transmitted messages first
|
// resend all non-transmitted messages first
|
||||||
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
|
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
|
||||||
@ -1609,10 +1637,10 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
return client->mqtt_state.pending_msg_id;
|
return client->mqtt_state.pending_msg_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data,
|
||||||
|
int len, int qos, int retain, bool store)
|
||||||
{
|
{
|
||||||
uint16_t pending_msg_id = 0;
|
uint16_t pending_msg_id = 0;
|
||||||
int ret = 0;
|
|
||||||
|
|
||||||
/* Acceptable publish messages:
|
/* Acceptable publish messages:
|
||||||
data == NULL, len == 0: publish null message
|
data == NULL, len == 0: publish null message
|
||||||
@ -1623,20 +1651,18 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
len = strlen(data);
|
len = strlen(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTT_API_LOCK(client);
|
|
||||||
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||||
topic, data, len,
|
topic, data, len,
|
||||||
qos, retain,
|
qos, retain,
|
||||||
&pending_msg_id);
|
&pending_msg_id);
|
||||||
|
|
||||||
if (publish_msg->length == 0) {
|
if (publish_msg->length == 0) {
|
||||||
ESP_LOGE(TAG, "Publish message cannot be created");
|
ESP_LOGE(TAG, "Publish message cannot be created");
|
||||||
MQTT_API_UNLOCK(client);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
/* We have to set as pending all the qos>0 messages */
|
/* We have to set as pending all the qos>0 messages */
|
||||||
client->mqtt_state.outbound_message = publish_msg;
|
client->mqtt_state.outbound_message = publish_msg;
|
||||||
if (qos > 0) {
|
if (qos > 0 || store) {
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_id = pending_msg_id;
|
client->mqtt_state.pending_msg_id = pending_msg_id;
|
||||||
client->mqtt_state.pending_publish_qos = qos;
|
client->mqtt_state.pending_publish_qos = qos;
|
||||||
@ -1647,8 +1673,28 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
} else {
|
} else {
|
||||||
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||||
mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment);
|
mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment);
|
||||||
|
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return pending_msg_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||||
|
{
|
||||||
|
MQTT_API_LOCK(client);
|
||||||
|
#if MQTT_SKIP_PUBLISH_IF_DISCONNECTED
|
||||||
|
if (client->state != MQTT_STATE_CONNECTED) {
|
||||||
|
ESP_LOGI(TAG, "Publishing skipped: client is not connected");
|
||||||
|
MQTT_API_UNLOCK(client);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false);
|
||||||
|
if (pending_msg_id < 0) {
|
||||||
|
MQTT_API_UNLOCK(client);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
/* Skip sending if not connected (rely on resending) */
|
/* Skip sending if not connected (rely on resending) */
|
||||||
if (client->state != MQTT_STATE_CONNECTED) {
|
if (client->state != MQTT_STATE_CONNECTED) {
|
||||||
@ -1657,13 +1703,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
ret = pending_msg_id;
|
ret = pending_msg_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
//Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds
|
// delete long pending messages
|
||||||
int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
mqtt_delete_expired_messages(client);
|
||||||
client->mqtt_state.pending_msg_count -= deleted;
|
|
||||||
|
|
||||||
if (client->mqtt_state.pending_msg_count < 0) {
|
|
||||||
client->mqtt_state.pending_msg_count = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
goto cannot_publish;
|
goto cannot_publish;
|
||||||
}
|
}
|
||||||
@ -1727,6 +1768,17 @@ cannot_publish:
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store)
|
||||||
|
{
|
||||||
|
MQTT_API_LOCK(client);
|
||||||
|
int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store);
|
||||||
|
MQTT_API_UNLOCK(client);
|
||||||
|
if (ret == 0 && store == false) {
|
||||||
|
// messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void* event_handler_arg)
|
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void* event_handler_arg)
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user