diff --git a/include/mqtt_client.h b/include/mqtt_client.h index d8c69d2..07dd978 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -318,6 +318,25 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top */ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); +/** + * @brief Enqueue a message (with qos set to 1 or 2) to the outbox, to be sent later. + * + * This API generates and stores the publish message into the internal outbox and the actual sending + * to the network is performed in the mqtt-task context (in contrast to the esp_mqtt_client_publish() + * which sends the publish message immediately in the user task's context). + * Thus, it could be used as a non blocking version of esp_mqtt_client_publish(). + * + * @param client mqtt client handle + * @param topic topic string + * @param data payload string (set to NULL, sending empty payload message) + * @param len data length, if set to 0, length is calculated from payload string + * @param qos qos of publish message, this API is applicable only to qos=1 or 2 + * @param retain retain flag + * + * @return message_id if queued successfully, -1 otherwise + */ +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); + /** * @brief Destroys the client handle * diff --git a/mqtt_client.c b/mqtt_client.c index f2d6557..651515c 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1609,10 +1609,9 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top return client->mqtt_state.pending_msg_id; } -int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) { uint16_t pending_msg_id = 0; - int ret = 0; /* Acceptable publish messages: data == NULL, len == 0: publish null message @@ -1623,15 +1622,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } - MQTT_API_LOCK(client); mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, - topic, data, len, - qos, retain, - &pending_msg_id); + topic, data, len, + qos, retain, + &pending_msg_id); if (publish_msg->length == 0) { ESP_LOGE(TAG, "Publish message cannot be created"); - MQTT_API_UNLOCK(client); return -1; } /* We have to set as pending all the qos>0 messages */ @@ -1647,8 +1644,21 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, } else { int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment); + client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; } } + return pending_msg_id; +} + +int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +{ + MQTT_API_LOCK(client); + int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + if (pending_msg_id < 0) { + MQTT_API_UNLOCK(client); + return -1; + } + int ret = 0; /* Skip sending if not connected (rely on resending) */ if (client->state != MQTT_STATE_CONNECTED) { @@ -1657,7 +1667,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, ret = pending_msg_id; } - //Delete message after OUTBOX_EXPIRED_TIMEOUT_MS miliseconds + // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds int deleted = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); client->mqtt_state.pending_msg_count -= deleted; @@ -1727,6 +1737,17 @@ cannot_publish: return ret; } +int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) +{ + MQTT_API_LOCK(client); + int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain); + MQTT_API_UNLOCK(client); + if (ret == 0) { + // messages with qos=0 are not enqueued -> indicate as error + return -1; + } + return ret; +} esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void* event_handler_arg) {