From 72833c7f8a6e98e1122a6d8780388aa1975706b9 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Wed, 12 Apr 2023 15:33:30 +0200 Subject: [PATCH 1/3] Merge enqueue functions - enqueue and oversized enqueue did the same work with small differences this clean up the extra unnecessary function --- mqtt_client.c | 33 +++++++-------------------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 258d1da..4abc103 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1128,39 +1128,20 @@ static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_ty return false; } -static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) -{ - ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful", - client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - //lock mutex - outbox_message_t msg = { 0 }; - msg.data = client->mqtt_state.outbound_message->data; - msg.len = client->mqtt_state.outbound_message->length; - msg.msg_id = client->mqtt_state.pending_msg_id; - msg.msg_type = client->mqtt_state.pending_msg_type; - msg.msg_qos = client->mqtt_state.pending_publish_qos; - msg.remaining_data = remaining_data; - msg.remaining_len = remaining_len; - //Copy to queue buffer - return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); - //unlock -} - -static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client) +static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - if (client->mqtt_state.pending_msg_count > 0) { outbox_message_t msg = { 0 }; msg.data = client->mqtt_state.outbound_message->data; msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; msg.msg_type = client->mqtt_state.pending_msg_type; msg.msg_qos = client->mqtt_state.pending_publish_qos; + msg.remaining_data = remaining_data; + msg.remaining_len = remaining_len; //Copy to queue buffer return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); - } - return NULL; } @@ -1875,7 +1856,7 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; //move pending msg to outbox (if have) - if (!mqtt_enqueue(client)) { + if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); return -1; } @@ -1933,7 +1914,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; - if (!mqtt_enqueue(client)) { + if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); return -1; } @@ -1984,12 +1965,12 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons client->mqtt_state.pending_msg_count ++; // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { - if (!mqtt_enqueue(client)) { + if (!mqtt_enqueue(client, NULL, 0)) { return -1; } } else { int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; - if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { + if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { return -1; } client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; From 572904868349837b572b56ecdd728dfe28df0394 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Thu, 13 Apr 2023 09:21:14 +0200 Subject: [PATCH 2/3] Bugfix: Dispatch transport error on all write operations - During connect the error wasn't dispatched. - Merged esp_mqtt_write with mqtt_write_data since the only difference was the error dispatched. --- mqtt_client.c | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 4abc103..57cc896 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -641,8 +641,11 @@ static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client) client->config->network_timeout_ms); if (wlen < 0) { ESP_LOGE(TAG, "Writing failed: errno=%d", errno); + esp_mqtt_client_dispatch_transport_error(client); return ESP_FAIL; - } else if (wlen == 0) { + } + + if (wlen == 0) { ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno); return ESP_ERR_TIMEOUT; } @@ -937,14 +940,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u return ESP_OK; } -static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) -{ - if (esp_mqtt_write(client) != ESP_OK) { - esp_mqtt_client_dispatch_transport_error(client); - return ESP_FAIL; - } - return ESP_OK; -} static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) { @@ -1344,7 +1339,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) if (msg_qos == 1 || msg_qos == 2) { ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); return ESP_FAIL; } @@ -1381,7 +1376,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) } outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED); - mqtt_write_data(client); + esp_mqtt_write(client); break; case MQTT_MSG_TYPE_PUBREL: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); @@ -1398,7 +1393,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_FAIL; } - mqtt_write_data(client); + esp_mqtt_write(client); break; case MQTT_MSG_TYPE_PUBCOMP: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); @@ -1444,7 +1439,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item } // try to resend the data - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error to resend data "); esp_mqtt_abort_connection(client); return ESP_FAIL; @@ -1751,7 +1746,7 @@ static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client) ESP_LOGE(TAG, "Disconnect message cannot be created"); return ESP_FAIL; } - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error sending disconnect message"); } return ESP_OK; @@ -1801,7 +1796,7 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client) return ESP_FAIL; } - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error sending ping"); return ESP_FAIL; } @@ -1862,7 +1857,7 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, } outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos); MQTT_API_UNLOCK(client); return -1; @@ -1920,7 +1915,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top } outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic); MQTT_API_UNLOCK(client); return -1; @@ -2040,7 +2035,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, while (sending) { - if (mqtt_write_data(client) != ESP_OK) { + if (esp_mqtt_write(client) != ESP_OK) { esp_mqtt_abort_connection(client); ret = -1; goto cannot_publish; From da6d38a17ef1f169b3633a9405ff8c33f706fe65 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Thu, 13 Apr 2023 11:11:13 +0200 Subject: [PATCH 3/3] Removes pending message count The information was used only to log remaining messages on debug log. It was checked on writing but updated prior to every call making the verification meaningless. --- lib/include/mqtt_client_priv.h | 1 - mqtt_client.c | 19 +++---------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/lib/include/mqtt_client_priv.h b/lib/include/mqtt_client_priv.h index e359420..bf09ceb 100644 --- a/lib/include/mqtt_client_priv.h +++ b/lib/include/mqtt_client_priv.h @@ -58,7 +58,6 @@ typedef struct mqtt_state { uint16_t pending_msg_id; int pending_msg_type; int pending_publish_qos; - int pending_msg_count; } mqtt_state_t; typedef struct { diff --git a/mqtt_client.c b/mqtt_client.c index 57cc896..c68bd53 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1111,15 +1111,12 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) // Return false when message is not found, making the received counterpart invalid. static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id) { - ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count); - if (client->mqtt_state.pending_msg_count == 0) { - return false; - } if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) { - client->mqtt_state.pending_msg_count --; + ESP_LOGD(TAG, "Removed pending_id=%d", client->mqtt_state.pending_msg_id); return true; } + ESP_LOGD(TAG, "Failed to remove pending_id=%d", client->mqtt_state.pending_msg_id); return false; } @@ -1468,7 +1465,6 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client) // Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds #if MQTT_REPORT_DELETED_MESSAGES // also report the deleted items as MQTT_EVENT_DELETED events if enabled - int deleted_items = 0; int msg_id = 0; while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) { client->event.event_id = MQTT_EVENT_DELETED; @@ -1476,16 +1472,10 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client) if (esp_mqtt_dispatch_event(client) != ESP_OK) { ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id); } - deleted_items ++; } #else - int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); + outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); #endif - client->mqtt_state.pending_msg_count -= deleted_items; - - if (client->mqtt_state.pending_msg_count < 0) { - client->mqtt_state.pending_msg_count = 0; - } } /** @@ -1849,7 +1839,6 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, } client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_count ++; //move pending msg to outbox (if have) if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); @@ -1908,7 +1897,6 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_count ++; if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); return -1; @@ -1957,7 +1945,6 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; - client->mqtt_state.pending_msg_count ++; // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { if (!mqtt_enqueue(client, NULL, 0)) {