mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-29 18:28:24 +02:00
Publish: Add new API to enqueue qos>0 messages
Closes https://github.com/espressif/esp-mqtt/issues/155
This commit is contained in:
@ -318,6 +318,25 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
||||
*/
|
||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
|
||||
|
||||
/**
|
||||
* @brief Enqueue a message (with qos set to 1 or 2) to the outbox, to be sent later.
|
||||
*
|
||||
* This API generates and stores the publish message into the internal outbox and the actual sending
|
||||
* to the network is performed in the mqtt-task context (in contrast to the esp_mqtt_client_publish()
|
||||
* which sends the publish message immediately in the user task's context).
|
||||
* Thus, it could be used as a non blocking version of esp_mqtt_client_publish().
|
||||
*
|
||||
* @param client mqtt client handle
|
||||
* @param topic topic string
|
||||
* @param data payload string (set to NULL, sending empty payload message)
|
||||
* @param len data length, if set to 0, length is calculated from payload string
|
||||
* @param qos qos of publish message, this API is applicable only to qos=1 or 2
|
||||
* @param retain retain flag
|
||||
*
|
||||
* @return message_id if queued successfully, -1 otherwise
|
||||
*/
|
||||
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
|
||||
|
||||
/**
|
||||
* @brief Destroys the client handle
|
||||
*
|
||||
|
@ -1609,10 +1609,9 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
||||
return client->mqtt_state.pending_msg_id;
|
||||
}
|
||||
|
||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||
static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||
{
|
||||
uint16_t pending_msg_id = 0;
|
||||
int ret = 0;
|
||||
|
||||
/* Acceptable publish messages:
|
||||
data == NULL, len == 0: publish null message
|
||||
@ -1623,15 +1622,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
len = strlen(data);
|
||||
}
|
||||
|
||||
MQTT_API_LOCK(client);
|
||||
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||
topic, data, len,
|
||||
qos, retain,
|
||||
&pending_msg_id);
|
||||
topic, data, len,
|
||||
qos, retain,
|
||||
&pending_msg_id);
|
||||
|
||||
if (publish_msg->length == 0) {
|
||||
ESP_LOGE(TAG, "Publish message cannot be created");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
/* We have to set as pending all the qos>0 messages */
|
||||
@ -1647,8 +1644,21 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
} else {
|
||||
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||
mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment);
|
||||
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||
}
|
||||
}
|
||||
return pending_msg_id;
|
||||
}
|
||||
|
||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||
{
|
||||
MQTT_API_LOCK(client);
|
||||
int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain);
|
||||
if (pending_msg_id < 0) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
int ret = 0;
|
||||
|
||||
/* Skip sending if not connected (rely on resending) */
|
||||
if (client->state != MQTT_STATE_CONNECTED) {
|
||||
@ -1657,7 +1667,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
ret = pending_msg_id;
|
||||
}
|
||||
|
||||
//Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds
|
||||
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
|
||||
int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||
client->mqtt_state.pending_msg_count -= deleted;
|
||||
|
||||
@ -1727,6 +1737,17 @@ cannot_publish:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
|
||||
{
|
||||
MQTT_API_LOCK(client);
|
||||
int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain);
|
||||
MQTT_API_UNLOCK(client);
|
||||
if (ret == 0) {
|
||||
// messages with qos=0 are not enqueued -> indicate as error
|
||||
return -1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void* event_handler_arg)
|
||||
{
|
||||
|
Reference in New Issue
Block a user