diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index b4f1e4f..c9422e9 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -35,7 +35,7 @@ typedef enum pending_state { outbox_handle_t outbox_init(); outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending); +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 403fd9c..933d55a 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -40,7 +40,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl item->msg_type = message->msg_type; item->msg_qos = message->msg_qos; item->tick = tick; - item->len = message->len; + item->len = message->len + message->remaining_len; item->pending = QUEUED; item->buffer = malloc(message->len + message->remaining_len); ESP_MEM_CHECK(TAG, item->buffer, { @@ -67,11 +67,14 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) return NULL; } -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending) +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick) { outbox_item_handle_t item; STAILQ_FOREACH(item, outbox, next) { if (item->pending == pending) { + if (tick) { + *tick = item->tick; + } return item; } } @@ -169,7 +172,7 @@ int outbox_get_size(outbox_handle_t outbox) esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size) { while(outbox_get_size(outbox) > max_size) { - outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED); + outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED, NULL); if (item == NULL) { return ESP_FAIL; } diff --git a/mqtt_client.c b/mqtt_client.c index 8fe31cc..c409222 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -501,7 +501,7 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) client->config->network_timeout_ms); // client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); if (write_len <= 0) { - ESP_LOGE(TAG, "Error write data or timeout, written len = %d", write_len); + ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno); return ESP_FAIL; } /* we've just sent a mqtt control packet, update keepalive counter @@ -624,9 +624,6 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); //lock mutex outbox_message_t msg = { 0 }; - if (client->mqtt_state.pending_msg_count > 0) { - client->mqtt_state.pending_msg_count --; - } msg.data = client->mqtt_state.outbound_message->data; msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; @@ -669,7 +666,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0); if (read_len < 0) { - ESP_LOGE(TAG, "Read error or end of stream"); + ESP_LOGE(TAG, "Read error or end of stream, errno:%d", errno); return ESP_FAIL; } @@ -787,6 +784,7 @@ static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; uint32_t last_retransmit = 0; + int32_t msg_tick = 0; client->run = true; //get transport by scheme @@ -845,7 +843,7 @@ static void esp_mqtt_task(void *pv) } // resend all non-transmitted messages first - outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED); + outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL); if (item) { if (mqtt_resend_queued(client, item) == ESP_OK) { outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); @@ -853,8 +851,8 @@ static void esp_mqtt_task(void *pv) // resend other "transmitted" messages after 1s } else if (platform_tick_get_ms() - last_retransmit > 1000) { last_retransmit = platform_tick_get_ms(); - item = outbox_dequeue(client->outbox, TRANSMITTED); - if (item) { + item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick); + if (item && (last_retransmit - msg_tick > 1000)) { mqtt_resend_queued(client, item); } } @@ -1048,15 +1046,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, qos, retain, &pending_msg_id); - /* We have to set as pending all the qos>0 messages) */ + /* We have to set as pending all the qos>0 messages */ if (qos > 0) { client->mqtt_state.outbound_message = publish_msg; client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; client->mqtt_state.pending_msg_count ++; - // by default store as QUEUED (not transmitted yet) - mqtt_enqueue(client); + // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer + if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { + mqtt_enqueue(client); + } } else { client->mqtt_state.outbound_message = publish_msg; }