Merge branch 'feature/resend_qos12_rebased' into 'idf'

Updated MQTT library

See merge request idf/esp-mqtt!17
This commit is contained in:
David Čermák
2019-02-16 03:58:15 +08:00
5 changed files with 261 additions and 142 deletions

View File

@@ -10,6 +10,7 @@
#define MQTT_PROTOCOL_311 CONFIG_MQTT_PROTOCOL_311
#define MQTT_RECONNECT_TIMEOUT_MS (10*1000)
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
#if CONFIG_MQTT_BUFFER_SIZE
#define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE
@@ -61,6 +62,10 @@
#define MQTT_CORE_SELECTION_ENABLED CONFIG_MQTT_TASK_CORE_SELECTION_ENABLED
#ifdef CONFIG_MQTT_DISABLE_API_LOCKS
#define MQTT_DISABLE_API_LOCKS CONFIG_MQTT_DISABLE_API_LOCKS
#endif
#ifdef CONFIG_MQTT_USE_CORE_0
#define MQTT_TASK_CORE 0
#else

View File

@@ -106,6 +106,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >>
static inline int mqtt_get_connect_session_present(uint8_t* buffer) { return buffer[2] & 0x01; }
static inline int mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; }
static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline void mqtt_set_dup(uint8_t* buffer) { buffer[0] |= 0x08; }
static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }

View File

@@ -21,21 +21,29 @@ typedef struct outbox_message {
uint8_t *data;
int len;
int msg_id;
int msg_qos;
int msg_type;
uint8_t *remaining_data;
int remaining_len;
} outbox_message_t;
typedef enum pending_state {
QUEUED,
TRANSMITTED,
CONFIRMED
} pending_state_t;
outbox_handle_t outbox_init();
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick);
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
esp_err_t outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
int outbox_get_size(outbox_handle_t outbox);
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size);
void outbox_destroy(outbox_handle_t outbox);

View File

@@ -14,9 +14,10 @@ typedef struct outbox_item {
int len;
int msg_id;
int msg_type;
int msg_qos;
int tick;
int retry_count;
bool pending;
pending_state_t pending;
STAILQ_ENTRY(outbox_item) next;
} outbox_item_t;
@@ -37,8 +38,10 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;
item->msg_qos = message->msg_qos;
item->tick = tick;
item->len = message->len;
item->len = message->len + message->remaining_len;
item->pending = QUEUED;
item->buffer = malloc(message->len + message->remaining_len);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
@@ -64,21 +67,37 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
return NULL;
}
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox)
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
if (!item->pending) {
if (item->pending == pending) {
if (tick) {
*tick = item->tick;
}
return item;
}
}
return NULL;
}
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
{
if (item) {
*len = item->len;
*msg_id = item->msg_id;
*msg_type = item->msg_type;
*qos = item->msg_qos;
return (uint8_t*)item->buffer;
}
return NULL;
}
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_id == msg_id && item->msg_type == msg_type) {
if (item->msg_id == msg_id && (0xFF&(item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
@@ -102,11 +121,11 @@ esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
}
return ESP_OK;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id)
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
item->pending = true;
item->pending = pending;
return ESP_OK;
}
return ESP_FAIL;
@@ -153,7 +172,7 @@ int outbox_get_size(outbox_handle_t outbox)
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size)
{
while(outbox_get_size(outbox) > max_size) {
outbox_item_handle_t item = outbox_dequeue(outbox);
outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED, NULL);
if (item == NULL) {
return ESP_FAIL;
}

View File

@@ -13,6 +13,18 @@
/* using uri parser */
#include "http_parser.h"
#ifdef MQTT_DISABLE_API_LOCKS
# define MQTT_API_LOCK(c)
# define MQTT_API_UNLOCK(c)
# define MQTT_API_LOCK_FROM_OTHER_TASK(c)
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c)
#else
# define MQTT_API_LOCK(c) xSemaphoreTake(c->api_lock, portMAX_DELAY)
# define MQTT_API_UNLOCK(c) xSemaphoreGive(c->api_lock)
# define MQTT_API_LOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreTake(c->api_lock, portMAX_DELAY); } }
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreGive(c->api_lock); } }
#endif /* MQTT_USE_API_LOCKS */
static const char *TAG = "MQTT_CLIENT";
typedef struct mqtt_state
@@ -72,6 +84,8 @@ struct esp_mqtt_client {
bool wait_for_ping_resp;
outbox_handle_t outbox;
EventGroupHandle_t status_bits;
SemaphoreHandle_t api_lock;
TaskHandle_t task_handle;
};
const static int STOPPED_BIT = BIT0;
@@ -87,6 +101,7 @@ static char *create_string(const char *ptr, int len);
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
MQTT_API_LOCK(client);
//Copy user configurations to client context
esp_err_t err = ESP_OK;
mqtt_config_storage_t *cfg;
@@ -94,7 +109,10 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
cfg = client->config;
} else {
cfg = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM);
ESP_MEM_CHECK(TAG, cfg, {
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
});
client->config = cfg;
}
if (config->task_prio) {
@@ -200,10 +218,11 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
if (config->disable_auto_reconnect == cfg->auto_reconnect) {
cfg->auto_reconnect = !config->disable_auto_reconnect;
}
MQTT_API_UNLOCK(client);
return ESP_OK;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
MQTT_API_UNLOCK(client);
return err;
}
@@ -303,9 +322,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
{
esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client));
ESP_MEM_CHECK(TAG, client, return NULL);
client->api_lock = xSemaphoreCreateMutex();
if (!client->api_lock) {
free(client);
return NULL;
}
esp_mqtt_set_config(client, config);
MQTT_API_LOCK(client);
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
@@ -365,11 +388,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
}
}
if (client->config->scheme == NULL) {
client->config->scheme = create_string("mqtt", 4);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
}
client->keepalive_tick = platform_tick_get_ms();
client->reconnect_tick = platform_tick_get_ms();
client->refresh_connection_tick = platform_tick_get_ms();
@@ -391,9 +409,11 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
MQTT_API_UNLOCK(client);
return client;
_mqtt_init_failed:
esp_mqtt_client_destroy(client);
MQTT_API_UNLOCK(client);
return NULL;
}
@@ -406,6 +426,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
vEventGroupDelete(client->status_bits);
free(client->mqtt_state.in_buffer);
free(client->mqtt_state.out_buffer);
vSemaphoreDelete(client->api_lock);
free(client);
return ESP_OK;
}
@@ -432,17 +453,15 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
return ESP_FAIL;
}
if (client->config->scheme == NULL) {
// set uri overrides actual scheme, host, path if configured previously
free(client->config->scheme);
free(client->config->host);
free(client->config->path);
client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len);
}
if (client->config->host == NULL) {
client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len);
}
if (client->config->path == NULL) {
client->config->path = create_string(uri + puri.field_data[UF_PATH].off, puri.field_data[UF_PATH].len);
}
if (client->config->path) {
esp_transport_handle_t trans = esp_transport_list_get_transport(client->transport_list, "ws");
if (trans) {
@@ -482,7 +501,7 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
client->config->network_timeout_ms);
// client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
if (write_len <= 0) {
ESP_LOGE(TAG, "Error write data or timeout, written len = %d", write_len);
ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno);
return ESP_FAIL;
}
/* we've just sent a mqtt control packet, update keepalive counter
@@ -536,12 +555,9 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
} else {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
if (client->mqtt_state.message_length_read < client->mqtt_state.message_length) {
/* if message is framented -> correct the size for the first DATA event */
mqtt_data_length = client->mqtt_state.message_length_read - ((uint8_t*)mqtt_data- message);
}
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
/* read msg id only once */
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
client->event.msg_id = mqtt_get_id(message, length);
}
} else {
mqtt_len = len_read;
@@ -555,7 +571,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data;
client->event.data_len = mqtt_len;
client->event.total_data_len = total_mqtt_len;
client->event.total_data_len = mqtt_data_length;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
@@ -608,13 +624,11 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
outbox_message_t msg = { 0 };
if (client->mqtt_state.pending_msg_count > 0) {
client->mqtt_state.pending_msg_count --;
}
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
@@ -634,6 +648,7 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client)
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
//Copy to queue buffer
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
@@ -648,10 +663,10 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
uint16_t msg_id;
uint32_t transport_message_offset = 0 ;
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 1000);
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0);
if (read_len < 0) {
ESP_LOGE(TAG, "Read error or end of stream");
ESP_LOGE(TAG, "Read error or end of stream, errno:%d", errno);
return ESP_FAIL;
}
@@ -665,9 +680,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
client->mqtt_state.message_length_read = read_len - transport_message_offset;
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
switch (msg_type)
{
case MQTT_MSG_TYPE_SUBACK:
@@ -701,31 +718,28 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
// return ESP_FAIL;
}
}
// Deliver the publish message
client->mqtt_state.message_length_read = read_len - transport_message_offset;
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length);
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
@@ -741,15 +755,36 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
break;
}
transport_message_offset += mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset) ;
transport_message_offset += client->mqtt_state.message_length;
}
return ESP_OK;
}
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
// decode queued data
client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
// set duplicate flag for QoS-2 message
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) {
mqtt_set_dup(client->mqtt_state.outbound_message->data);
}
// try to resend the data
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
return ESP_OK;
}
static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
uint32_t last_retransmit = 0;
int32_t msg_tick = 0;
client->run = true;
//get transport by scheme
@@ -767,7 +802,7 @@ static void esp_mqtt_task(void *pv)
client->state = MQTT_STATE_INIT;
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
while (client->run) {
MQTT_API_LOCK(client);
switch ((int)client->state) {
case MQTT_STATE_INIT:
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
@@ -807,6 +842,21 @@ static void esp_mqtt_task(void *pv)
break;
}
// resend all non-transmitted messages first
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
if (item) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
}
// resend other "transmitted" messages after 1s
} else if (platform_tick_get_ms() - last_retransmit > 1000) {
last_retransmit = platform_tick_get_ms();
item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick);
if (item && (last_retransmit - msg_tick > 1000)) {
mqtt_resend_queued(client, item);
}
}
if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) {
//No ping resp from last ping => Disconnected
if(client->wait_for_ping_resp){
@@ -849,10 +899,20 @@ static void esp_mqtt_task(void *pv)
ESP_LOGD(TAG, "Reconnecting...");
break;
}
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
client->wait_timeout_ms / 2 / portTICK_RATE_MS);
break;
// continue the while loop insted of break, as the mutex is unlocked
continue;
}
MQTT_API_UNLOCK(client);
if (MQTT_STATE_CONNECTED == client->state) {
if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) {
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
esp_mqtt_abort_connection(client);
}
}
}
esp_transport_close(client->transport);
xEventGroupSetBits(client->status_bits, STOPPED_BIT);
@@ -868,13 +928,13 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
}
#if MQTT_CORE_SELECTION_ENABLED
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL, MQTT_TASK_CORE) != pdTRUE) {
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
return ESP_FAIL;
}
#else
ESP_LOGD(TAG, "Core selection disabled");
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL) != pdTRUE) {
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
return ESP_FAIL;
}
@@ -925,20 +985,23 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
mqtt_enqueue(client); //move pending msg to outbox (if have)
MQTT_API_LOCK_FROM_OTHER_TASK(client);
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
&client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
mqtt_enqueue(client); //move pending msg to outbox (if have)
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return -1;
}
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return client->mqtt_state.pending_msg_id;
}
@@ -948,7 +1011,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
mqtt_enqueue(client);
MQTT_API_LOCK_FROM_OTHER_TASK(client);
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id);
@@ -956,43 +1019,54 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
mqtt_enqueue(client);
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return -1;
}
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return client->mqtt_state.pending_msg_id;
}
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
{
uint16_t pending_msg_id = 0;
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
if (len <= 0) {
len = strlen(data);
}
MQTT_API_LOCK_FROM_OTHER_TASK(client);
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len,
qos, retain,
&pending_msg_id);
/* We have to set as pending all the qos>0 messages) */
/* We have to set as pending all the qos>0 messages */
if (qos > 0) {
mqtt_enqueue(client);
client->mqtt_state.outbound_message = publish_msg;
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
client->mqtt_state.pending_msg_count ++;
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
mqtt_enqueue(client);
}
} else {
client->mqtt_state.outbound_message = publish_msg;
}
/* Skip sending if not connected (rely on resending) */
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGD(TAG, "Publish: client is not connected");
goto cannot_publish;
}
/* Provide support for sending fragmented message if it doesn't fit buffer */
int remaining_len = len;
const char *current_data = data;
@@ -1001,8 +1075,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
while (sending) {
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos);
return -1;
esp_mqtt_abort_connection(client);
goto cannot_publish;
}
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
@@ -1039,7 +1113,19 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
sending = false;
}
}
if (qos > 0) {
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return pending_msg_id;
cannot_publish:
if (qos == 0) {
ESP_LOGW(TAG, "Publish: Loosing qos0 data when client not connected");
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return 0;
}