From 372ab7b37451e7f13bf8433cb92ebd81b3e3d45b Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Thu, 30 Mar 2023 15:16:14 +0200 Subject: [PATCH] feat: Introduces outbox limit A memory limit for the outbox can be configured. User will not be able to publish or enqueue if the new message goes beyond the configured limit. --- host_test/CMakeLists.txt | 3 +- host_test/main/CMakeLists.txt | 7 + host_test/main/test_mqtt_client.cpp | 191 ++++++++++++++++------------ include/mqtt_client.h | 11 +- lib/include/mqtt_client_priv.h | 2 + lib/include/mqtt_msg.h | 4 +- lib/include/mqtt_outbox.h | 4 +- lib/mqtt_msg.c | 4 +- lib/mqtt_outbox.c | 56 ++++---- mqtt_client.c | 69 ++++++---- 10 files changed, 210 insertions(+), 141 deletions(-) diff --git a/host_test/CMakeLists.txt b/host_test/CMakeLists.txt index 5b63550..7c3cdb1 100644 --- a/host_test/CMakeLists.txt +++ b/host_test/CMakeLists.txt @@ -11,7 +11,6 @@ list(APPEND EXTRA_COMPONENT_DIRS "$ENV{IDF_PATH}/tools/mocks/lwip/" "$ENV{IDF_PATH}/tools/mocks/esp-tls/" "$ENV{IDF_PATH}/tools/mocks/http_parser/" - "$ENV{IDF_PATH}/tools/mocks/tcp_transport/" - ) + "$ENV{IDF_PATH}/tools/mocks/tcp_transport/") project(host_mqtt_client_test) diff --git a/host_test/main/CMakeLists.txt b/host_test/main/CMakeLists.txt index e50cbb6..682bc22 100644 --- a/host_test/main/CMakeLists.txt +++ b/host_test/main/CMakeLists.txt @@ -2,6 +2,13 @@ idf_component_register(SRCS "test_mqtt_client.cpp" INCLUDE_DIRS "$ENV{IDF_PATH}/tools/catch" REQUIRES cmock mqtt esp_timer esp_hw_support http_parser log) +target_compile_options(${COMPONENT_LIB} PUBLIC -fsanitize=address -fconcepts) +target_link_options(${COMPONENT_LIB} PUBLIC -fsanitize=address) + +idf_component_get_property(mqtt mqtt COMPONENT_LIB) +target_compile_options(${mqtt} PUBLIC -fsanitize=address -fconcepts) +target_link_options(${mqtt} PUBLIC -fsanitize=address) + if(CONFIG_GCOV_ENABLED) target_compile_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage) target_link_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage) diff --git a/host_test/main/test_mqtt_client.cpp b/host_test/main/test_mqtt_client.cpp index 55d0f2b..47dbde5 100644 --- a/host_test/main/test_mqtt_client.cpp +++ b/host_test/main/test_mqtt_client.cpp @@ -3,10 +3,15 @@ * * SPDX-License-Identifier: Apache-2.0 */ +#include +#include +#include +#include #include "esp_transport.h" #define CATCH_CONFIG_MAIN // This tells the catch header to generate a main #include "catch.hpp" +#include "mqtt_client.h" extern "C" { #include "Mockesp_event.h" #include "Mockesp_mac.h" @@ -30,99 +35,121 @@ extern "C" { } } -#include "mqtt_client.h" - -struct ClientInitializedFixture { - esp_mqtt_client_handle_t client; - ClientInitializedFixture() - { - [[maybe_unused]] auto protect = TEST_PROTECT(); - int mtx; - int transport_list; - int transport; - int event_group; - uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55}; - esp_timer_get_time_IgnoreAndReturn(0); - xQueueTakeMutexRecursive_IgnoreAndReturn(true); - xQueueGiveMutexRecursive_IgnoreAndReturn(true); - xQueueCreateMutex_ExpectAnyArgsAndReturn( - reinterpret_cast(&mtx)); - xEventGroupCreate_IgnoreAndReturn(reinterpret_cast(&event_group)); - esp_transport_list_init_IgnoreAndReturn(reinterpret_cast(&transport_list)); - esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast(&transport)); - esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast(&transport)); - esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast(&transport)); - esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK); - esp_transport_list_add_IgnoreAndReturn(ESP_OK); - esp_transport_set_default_port_IgnoreAndReturn(ESP_OK); - http_parser_parse_url_IgnoreAndReturn(0); - http_parser_url_init_ExpectAnyArgs(); - esp_event_loop_create_IgnoreAndReturn(ESP_OK); - esp_read_mac_IgnoreAndReturn(ESP_OK); - esp_read_mac_ReturnThruPtr_mac(mac); - esp_transport_list_destroy_IgnoreAndReturn(ESP_OK); - esp_transport_destroy_IgnoreAndReturn(ESP_OK); - vEventGroupDelete_Ignore(); - vQueueDelete_Ignore(); - - esp_mqtt_client_config_t config{}; - client = esp_mqtt_client_init(&config); - } - ~ClientInitializedFixture() - { - esp_mqtt_client_destroy(client); - } -}; -TEST_CASE_METHOD(ClientInitializedFixture, "Client set uri") +auto random_string(std::size_t n) { - struct http_parser_url ret_uri = { - .field_set = 1, - .port = 0, - .field_data = { { 0, 1} } - }; - SECTION("User set a correct URI") { - http_parser_parse_url_StopIgnore(); - http_parser_parse_url_ExpectAnyArgsAndReturn(0); - http_parser_parse_url_ReturnThruPtr_u(&ret_uri); - auto res = esp_mqtt_client_set_uri(client, " "); - REQUIRE(res == ESP_OK); - } - SECTION("Incorrect URI from user") { - http_parser_parse_url_StopIgnore(); - http_parser_parse_url_ExpectAnyArgsAndReturn(1); - http_parser_parse_url_ReturnThruPtr_u(&ret_uri); - auto res = esp_mqtt_client_set_uri(client, " "); - REQUIRE(res == ESP_FAIL); - } + static constexpr std::string_view char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ123456790"; + std::string str; + std::sample(char_set.begin(), char_set.end(), std::back_inserter(str), n, + std::mt19937 {std::random_device{}()}); + return str; } -TEST_CASE_METHOD(ClientInitializedFixture, "Client Start") + +using unique_mqtt_client = std::unique_ptr < std::remove_pointer_t, decltype([](esp_mqtt_client_handle_t client) { - SECTION("Successful start") { + esp_mqtt_client_destroy(client); +}) >; + +SCENARIO("MQTT Client Operation") +{ + // [[maybe_unused]] auto protect = TEST_PROTECT(); + // Set expectations for the mocked calls. + int mtx = 0; + int transport_list = 0; + int transport = 0; + int event_group = 0; + uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55}; + esp_timer_get_time_IgnoreAndReturn(0); + xQueueTakeMutexRecursive_IgnoreAndReturn(true); + xQueueGiveMutexRecursive_IgnoreAndReturn(true); + xQueueCreateMutex_ExpectAnyArgsAndReturn( + reinterpret_cast(&mtx)); + xEventGroupCreate_IgnoreAndReturn(reinterpret_cast(&event_group)); + esp_transport_list_init_IgnoreAndReturn(reinterpret_cast(&transport_list)); + esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast(&transport)); + esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast(&transport)); + esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast(&transport)); + esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK); + esp_transport_list_add_IgnoreAndReturn(ESP_OK); + esp_transport_set_default_port_IgnoreAndReturn(ESP_OK); + http_parser_url_init_Ignore(); + esp_event_loop_create_IgnoreAndReturn(ESP_OK); + esp_read_mac_IgnoreAndReturn(ESP_OK); + esp_read_mac_ReturnThruPtr_mac(mac); + esp_transport_list_destroy_IgnoreAndReturn(ESP_OK); + vEventGroupDelete_Ignore(); + vQueueDelete_Ignore(); + GIVEN("An a minimal config") { esp_mqtt_client_config_t config{}; config.broker.address.uri = "mqtt://1.1.1.1"; struct http_parser_url ret_uri = { - .field_set = 1 | (1<<1), + .field_set = 1 | (1 << 1), .port = 0, .field_data = { { 0, 4 } /*mqtt*/, { 7, 1 } } // at least *scheme* and *host* }; - http_parser_parse_url_StopIgnore(); http_parser_parse_url_ExpectAnyArgsAndReturn(0); http_parser_parse_url_ReturnThruPtr_u(&ret_uri); xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdTRUE); - auto res = esp_mqtt_set_config(client, &config); - REQUIRE(res == ESP_OK); - res = esp_mqtt_client_start(client); - REQUIRE(res == ESP_OK); - } - SECTION("Failed on initialization") { - xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE); - auto res = esp_mqtt_client_start(nullptr); - REQUIRE(res == ESP_ERR_INVALID_ARG); - } - SECTION("Client already started") {} - SECTION("Failed to start task") { - xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE); - auto res = esp_mqtt_client_start(client); - REQUIRE(res == ESP_FAIL); + SECTION("Client with minimal config") { + auto client = unique_mqtt_client{esp_mqtt_client_init(&config)}; + REQUIRE(client != nullptr); + SECTION("User will set a new uri") { + struct http_parser_url ret_uri = { + .field_set = 1, + .port = 0, + .field_data = { { 0, 1} } + }; + SECTION("User set a correct URI") { + http_parser_parse_url_StopIgnore(); + http_parser_parse_url_ExpectAnyArgsAndReturn(0); + http_parser_parse_url_ReturnThruPtr_u(&ret_uri); + auto res = esp_mqtt_client_set_uri(client.get(), " "); + REQUIRE(res == ESP_OK); + } + SECTION("Incorrect URI from user") { + http_parser_parse_url_StopIgnore(); + http_parser_parse_url_ExpectAnyArgsAndReturn(1); + http_parser_parse_url_ReturnThruPtr_u(&ret_uri); + auto res = esp_mqtt_client_set_uri(client.get(), " "); + REQUIRE(res == ESP_FAIL); + } + } + SECTION("After Start Client Is Cleanly destroyed") { + REQUIRE(esp_mqtt_client_start(client.get()) == ESP_OK); + // Only need to start the client, destroy is called automatically at the end of + // scope + } + } + SECTION("Client with all allocating configuration set") { + auto host = random_string(20); + auto path = random_string(10); + auto username = random_string(10); + auto client_id = random_string(10); + auto password = random_string(10); + auto lw_topic = random_string(10); + auto lw_msg = random_string(10); + + config.broker = {.address = { + .hostname = host.data(), + .path = path.data() + } + }; + config.credentials = { + .username = username.data(), + .client_id = client_id.data(), + .authentication = { + .password = password.data() + } + }; + config.session = { + .last_will { + .topic = lw_topic.data(), + .msg = lw_msg.data() + } + }; + auto client = unique_mqtt_client{esp_mqtt_client_init(&config)}; + REQUIRE(client != nullptr); + + } } } + diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 520a8c9..8d85125 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -352,6 +352,9 @@ typedef struct esp_mqtt_client_config_t { int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by ``buffer_size`` */ } buffer; /*!< Buffer size configuration.*/ + struct outbox_config_t { + uint64_t limit; /*!< Size limit for the outbox in bytes.*/ + } outbox; } esp_mqtt_client_config_t; /** @@ -430,7 +433,6 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client); */ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); - #ifdef __cplusplus #define esp_mqtt_client_subscribe esp_mqtt_client_subscribe_single @@ -449,6 +451,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); * * @return message_id of the subscribe message on success * -1 on failure + * -2 in case of full outbox. */ #define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \ char *: esp_mqtt_client_subscribe_single, \ @@ -473,6 +476,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); * * @return message_id of the subscribe message on success * -1 on failure + * -2 in case of full outbox. */ int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos); @@ -493,6 +497,7 @@ int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, * * @return message_id of the subscribe message on success * -1 on failure + * -2 in case of full outbox. */ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, const esp_mqtt_topic_t *topic_list, int size); @@ -536,7 +541,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, * @param retain retain flag * * @return message_id of the publish message (for QoS 0 message_id will always - * be zero) on success. -1 on failure. + * be zero) on success. -1 on failure, -2 in case of full outbox. */ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); @@ -561,7 +566,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, * @param store if true, all messages are enqueued; otherwise only QoS 1 and * QoS 2 are enqueued * - * @return message_id if queued successfully, -1 otherwise + * @return message_id if queued successfully, -1 on failure, -2 in case of full outbox. */ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, diff --git a/lib/include/mqtt_client_priv.h b/lib/include/mqtt_client_priv.h index 2bf08ca..0b5ee48 100644 --- a/lib/include/mqtt_client_priv.h +++ b/lib/include/mqtt_client_priv.h @@ -7,6 +7,7 @@ #ifndef _MQTT_CLIENT_PRIV_H_ #define _MQTT_CLIENT_PRIV_H_ +#include #include #include #include @@ -88,6 +89,7 @@ typedef struct { bool use_secure_element; void *ds_data; int message_retransmit_timeout; + uint64_t outbox_limit; esp_transport_handle_t transport; } mqtt_config_storage_t; diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index f3ad000..f09590d 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -130,8 +130,8 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length); uint16_t mqtt_get_id(uint8_t *buffer, size_t length); int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length); -esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size); -void mqtt_connection_destroy(mqtt_connection_t *connection); +esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size); +void mqtt_msg_buffer_destroy(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info); mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id); diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index e180fca..241b335 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -14,7 +14,7 @@ extern "C" { struct outbox_item; -typedef struct outbox_list_t *outbox_handle_t; +typedef struct outbox_t *outbox_handle_t; typedef struct outbox_item *outbox_item_handle_t; typedef struct outbox_message *outbox_message_handle_t; typedef long long outbox_tick_t; @@ -54,7 +54,7 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); pending_state_t outbox_item_get_pending(outbox_item_handle_t item); esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick); -int outbox_get_size(outbox_handle_t outbox); +uint64_t outbox_get_size(outbox_handle_t outbox); void outbox_destroy(outbox_handle_t outbox); void outbox_delete_all_items(outbox_handle_t outbox); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index a7a91e7..781bbea 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -616,7 +616,7 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length) } } -esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size) +esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size) { memset(connection, 0, sizeof(mqtt_connection_t)); connection->buffer = (uint8_t *)calloc(0, buffer_size); @@ -627,7 +627,7 @@ esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size) return ESP_OK; } -void mqtt_connection_destroy(mqtt_connection_t *connection) +void mqtt_msg_buffer_destroy(mqtt_connection_t *connection) { if (connection) { free(connection->buffer); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 1915e59..ef2d889 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -1,4 +1,5 @@ #include "mqtt_outbox.h" +#include #include #include #include "mqtt_config.h" @@ -22,12 +23,19 @@ typedef struct outbox_item { STAILQ_HEAD(outbox_list_t, outbox_item); +struct outbox_t { + uint64_t size; + struct outbox_list_t *list; +}; outbox_handle_t outbox_init(void) { - outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t)); + outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t)); ESP_MEM_CHECK(TAG, outbox, return NULL); - STAILQ_INIT(outbox); + outbox->list = calloc(1, sizeof(struct outbox_list_t)); + ESP_MEM_CHECK(TAG, outbox->list, return NULL); //TODO: Free outbox on failure + outbox->size = 0; + STAILQ_INIT(outbox->list); return outbox; } @@ -50,7 +58,8 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl if (message->remaining_data) { memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len); } - STAILQ_INSERT_TAIL(outbox, item, next); + STAILQ_INSERT_TAIL(outbox->list, item, next); + outbox->size += item->len; ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox)); return item; } @@ -58,7 +67,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) { outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (item->msg_id == msg_id) { return item; } @@ -69,7 +78,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick) { outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (item->pending == pending) { if (tick) { *tick = item->tick; @@ -83,9 +92,10 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete) { outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (item == item_to_delete) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); + outbox->size -= item->len; free(item->buffer); free(item); return ESP_OK; @@ -109,9 +119,10 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) { outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { + STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) { if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); + outbox->size -= item->len; free(item->buffer); free(item); ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox)); @@ -154,10 +165,11 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t { int msg_id = -1; outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (current_tick - item->tick > timeout) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); free(item->buffer); + outbox->size -= item->len; msg_id = item->msg_id; free(item); return msg_id; @@ -171,10 +183,11 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou { int deleted_items = 0; outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { + STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) { if (current_tick - item->tick > timeout) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); free(item->buffer); + outbox->size -= item->len; free(item); deleted_items ++; } @@ -183,23 +196,17 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou return deleted_items; } -int outbox_get_size(outbox_handle_t outbox) +uint64_t outbox_get_size(outbox_handle_t outbox) { - int siz = 0; - outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { - // Suppressing "use after free" warning as this could happen only if queue is in inconsistent state - // which never happens if STAILQ interface used - siz += item->len; // NOLINT(clang-analyzer-unix.Malloc) - } - return siz; + return outbox->size; } void outbox_delete_all_items(outbox_handle_t outbox) { outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) { + STAILQ_REMOVE(outbox->list, item, outbox_item, next); + outbox->size -= item->len; free(item->buffer); free(item); } @@ -207,6 +214,7 @@ void outbox_delete_all_items(outbox_handle_t outbox) void outbox_destroy(outbox_handle_t outbox) { outbox_delete_all_items(outbox); + free(outbox->list); free(outbox); } diff --git a/mqtt_client.c b/mqtt_client.c index 6719a49..1792206 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1,10 +1,12 @@ -#include "mqtt_client.h" -#include "esp_transport.h" -#include "mqtt_client_priv.h" -#include "esp_log.h" #include +#include "esp_err.h" +#include "esp_log.h" #include "esp_heap_caps.h" +#include "esp_transport.h" +#include "mqtt_client.h" +#include "mqtt_client_priv.h" #include "mqtt_msg.h" +#include "mqtt_outbox.h" _Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type"); #ifdef ESP_EVENT_ANY_ID @@ -564,6 +566,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl goto _mqtt_set_config_failed; } } + client->config->outbox_limit = config->outbox.limit; esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict(client->config, config); MQTT_API_UNLOCK(client); @@ -811,7 +814,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co // use separate value for output buffer size if configured int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size; - if (mqtt_connection_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) { + if (mqtt_msg_buffer_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) { goto _mqtt_init_failed; } @@ -871,7 +874,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client) vEventGroupDelete(client->status_bits); } free(client->mqtt_state.in_buffer); - mqtt_connection_destroy(&client->mqtt_state.connection); + mqtt_msg_buffer_destroy(&client->mqtt_state.connection); if (client->api_lock) { vSemaphoreDelete(client->api_lock); } @@ -983,7 +986,7 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY); ret = esp_event_loop_run(client->config->event_loop_handle, 0); #else - return ESP_FAIL; + return ESP_FAIL; #endif if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 @@ -1277,7 +1280,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) // If the message was valid, get the type, quality of service and id of the message msg_type = mqtt_get_type(client->mqtt_state.in_buffer); msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, read_len); #endif @@ -1456,7 +1459,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item } } else if (client->mqtt_state.pending_publish_qos > 0) { #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_increment_packet_counter(client); } #endif @@ -1808,6 +1811,10 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, ESP_LOGE(TAG, "Client was not initialized"); return -1; } + + if (client->config->outbox_limit > 0 && outbox_get_size(client->outbox) > client->config->outbox_limit) { + return -2; + } MQTT_API_LOCK(client); if (client->state != MQTT_STATE_CONNECTED) { ESP_LOGE(TAG, "Client has not connected"); @@ -1828,16 +1835,16 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, return -1; } mqtt5_msg_subscribe(&client->mqtt_state.connection, - topic_list, size, - &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); + topic_list, size, + &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->subscribe_property_info = NULL; } #endif } else { mqtt_msg_subscribe(&client->mqtt_state.connection, - topic_list, size, - &client->mqtt_state.pending_msg_id); + topic_list, size, + &client->mqtt_state.pending_msg_id); } if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Subscribe message cannot be created"); @@ -1885,16 +1892,16 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 mqtt5_msg_unsubscribe(&client->mqtt_state.connection, - topic, - &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); + topic, + &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->unsubscribe_property_info = NULL; } #endif } else { mqtt_msg_unsubscribe(&client->mqtt_state.connection, - topic, - &client->mqtt_state.pending_msg_id); + topic, + &client->mqtt_state.pending_msg_id); } if (client->mqtt_state.connection.outbound_message.length == 0) { MQTT_API_UNLOCK(client); @@ -1928,18 +1935,18 @@ static int make_publish(esp_mqtt_client_handle_t client, const char *topic, cons if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 mqtt5_msg_publish(&client->mqtt_state.connection, - topic, data, len, - qos, retain, - &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); + topic, data, len, + qos, retain, + &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->publish_property_info = NULL; } #endif } else { mqtt_msg_publish(&client->mqtt_state.connection, - topic, data, len, - qos, retain, - &pending_msg_id); + topic, data, len, + qos, retain, + &pending_msg_id); } if (client->mqtt_state.connection.outbound_message.length == 0) { @@ -2008,6 +2015,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } + if (client->config->outbox_limit > 0 && qos > 0) { + if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) { + MQTT_API_UNLOCK(client); + return -2; + } + } + int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false); if (pending_msg_id < 0) { MQTT_API_UNLOCK(client); @@ -2064,7 +2078,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, if (qos > 0) { #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_increment_packet_counter(client); } #endif @@ -2102,6 +2116,13 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } + if (client->config->outbox_limit > 0) { + if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) { + return -2; + } + + } + MQTT_API_LOCK(client); #ifdef MQTT_PROTOCOL_5 if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {