diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 809f2e9..71026fc 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -82,6 +82,10 @@ typedef enum esp_mqtt_event_id_t { - Additional context: msg_id (id of the deleted message). */ + MQTT_USER_EVENT, /*!< Custom event used to queue tasks into mqtt event handler + All fields from the esp_mqtt_event_t type could be used to pass + an additional context data to the handler. + */ } esp_mqtt_event_id_t; /** @@ -569,6 +573,16 @@ esp_err_t esp_mqtt_client_unregister_event(esp_mqtt_client_handle_t client, esp_ */ int esp_mqtt_client_get_outbox_size(esp_mqtt_client_handle_t client); +/** + * @brief Dispatch user event to the mqtt internal event loop + * + * @param client *MQTT* client handle + * @param event *MQTT* event handle structure + * @return ESP_OK on success + * ESP_ERR_TIMEOUT if the event couldn't be queued (ref also CONFIG_MQTT_EVENT_QUEUE_SIZE) + */ +esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event); + #ifdef __cplusplus } #endif //__cplusplus diff --git a/lib/include/mqtt_client_priv.h b/lib/include/mqtt_client_priv.h index 40490e5..e359420 100644 --- a/lib/include/mqtt_client_priv.h +++ b/lib/include/mqtt_client_priv.h @@ -9,6 +9,7 @@ #include #include +#include #include "esp_err.h" #include "platform.h" @@ -123,6 +124,9 @@ struct esp_mqtt_client { EventGroupHandle_t status_bits; SemaphoreHandle_t api_lock; TaskHandle_t task_handle; +#if MQTT_EVENT_QUEUE_SIZE > 1 + atomic_int queued_events; +#endif }; bool esp_mqtt_set_if_config(char const *const new_config, char **old_config); diff --git a/lib/include/mqtt_config.h b/lib/include/mqtt_config.h index 4cc06d7..500db52 100644 --- a/lib/include/mqtt_config.h +++ b/lib/include/mqtt_config.h @@ -96,6 +96,11 @@ #define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET #define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE +#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE +#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE +#else +#define MQTT_EVENT_QUEUE_SIZE 10 +#endif #define OUTBOX_MAX_SIZE (4*1024) #endif diff --git a/mqtt_client.c b/mqtt_client.c index d8d1c94..2f32dcc 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -765,7 +765,14 @@ static bool create_client_data(esp_mqtt_client_handle_t client) esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config) { - esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client)); + esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client), +#if MQTT_EVENT_QUEUE_SIZE > 1 + // if supporting multiple queued events, we keep track of them + // using atomic variable, so need to make sure it won't get allocated in PSRAM + MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT); +#else + MALLOC_CAP_DEFAULT); +#endif ESP_MEM_CHECK(TAG, client, return NULL); if (!create_client_data(client)) { goto _mqtt_init_failed; @@ -776,10 +783,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co } #ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP esp_event_loop_args_t no_task_loop = { - .queue_size = 1, + .queue_size = MQTT_EVENT_QUEUE_SIZE, .task_name = NULL, }; esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle); +#if MQTT_EVENT_QUEUE_SIZE > 1 + atomic_init(&client->queued_events, 0); +#endif #endif client->keepalive_tick = platform_tick_get_ms(); @@ -939,6 +949,17 @@ static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t cli return esp_mqtt_dispatch_event(client); } +esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event) +{ + esp_err_t ret = esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, MQTT_USER_EVENT, event, sizeof(*event), 0); +#if MQTT_EVENT_QUEUE_SIZE > 1 + if (ret == ESP_OK) { + atomic_fetch_add(&client->queued_events, 1); + } +#endif + return ret; +} + static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) { client->event.client = client; @@ -1447,6 +1468,34 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client) } } +/** + * @brief When using multiple queued item, we'd like to reduce the poll timeout to proceed with event loop exacution + */ +static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_timeout) +{ + return +#if MQTT_EVENT_QUEUE_SIZE > 1 + atomic_load(&client->queued_events) > 0 ? 10: max_timeout; +#else + max_timeout; +#endif +} + +static inline void run_event_loop(esp_mqtt_client_handle_t client) +{ +#if MQTT_EVENT_QUEUE_SIZE > 1 + if (atomic_load(&client->queued_events) > 0) { + atomic_fetch_sub(&client->queued_events, 1); +#else + { +#endif + esp_err_t ret = esp_event_loop_run(client->config->event_loop_handle, 0); + if (ret != ESP_OK) { + ESP_LOGE(TAG, "Error in running event_loop %d", ret); + } + } +} + static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; @@ -1470,6 +1519,7 @@ static void esp_mqtt_task(void *pv) xEventGroupClearBits(client->status_bits, STOPPED_BIT); while (client->run) { MQTT_API_LOCK(client); + run_event_loop(client); switch (client->state) { case MQTT_STATE_DISCONNECTED: break; @@ -1571,7 +1621,7 @@ static void esp_mqtt_task(void *pv) } MQTT_API_UNLOCK(client); xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true, - client->wait_timeout_ms / 2 / portTICK_PERIOD_MS); + max_poll_timeout(client, client->wait_timeout_ms / 2 / portTICK_PERIOD_MS)); // continue the while loop instead of break, as the mutex is unlocked continue; default: @@ -1580,7 +1630,7 @@ static void esp_mqtt_task(void *pv) } MQTT_API_UNLOCK(client); if (MQTT_STATE_CONNECTED == client->state) { - if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) { + if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) { ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno); esp_mqtt_abort_connection(client); }