forked from espressif/esp-mqtt
Merge branch 'bugfix/user_events' into 'master'
client: Add support for user events See merge request espressif/esp-mqtt!140
This commit is contained in:
@ -82,6 +82,10 @@ typedef enum esp_mqtt_event_id_t {
|
||||
- Additional context: msg_id (id of the deleted
|
||||
message).
|
||||
*/
|
||||
MQTT_USER_EVENT, /*!< Custom event used to queue tasks into mqtt event handler
|
||||
All fields from the esp_mqtt_event_t type could be used to pass
|
||||
an additional context data to the handler.
|
||||
*/
|
||||
} esp_mqtt_event_id_t;
|
||||
|
||||
/**
|
||||
@ -569,6 +573,16 @@ esp_err_t esp_mqtt_client_unregister_event(esp_mqtt_client_handle_t client, esp_
|
||||
*/
|
||||
int esp_mqtt_client_get_outbox_size(esp_mqtt_client_handle_t client);
|
||||
|
||||
/**
|
||||
* @brief Dispatch user event to the mqtt internal event loop
|
||||
*
|
||||
* @param client *MQTT* client handle
|
||||
* @param event *MQTT* event handle structure
|
||||
* @return ESP_OK on success
|
||||
* ESP_ERR_TIMEOUT if the event couldn't be queued (ref also CONFIG_MQTT_EVENT_QUEUE_SIZE)
|
||||
*/
|
||||
esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif //__cplusplus
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdatomic.h>
|
||||
#include "esp_err.h"
|
||||
#include "platform.h"
|
||||
|
||||
@ -123,6 +124,9 @@ struct esp_mqtt_client {
|
||||
EventGroupHandle_t status_bits;
|
||||
SemaphoreHandle_t api_lock;
|
||||
TaskHandle_t task_handle;
|
||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||
atomic_int queued_events;
|
||||
#endif
|
||||
};
|
||||
|
||||
bool esp_mqtt_set_if_config(char const *const new_config, char **old_config);
|
||||
|
@ -96,6 +96,11 @@
|
||||
#define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET
|
||||
#define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE
|
||||
|
||||
#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE
|
||||
#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE
|
||||
#else
|
||||
#define MQTT_EVENT_QUEUE_SIZE 10
|
||||
#endif
|
||||
|
||||
#define OUTBOX_MAX_SIZE (4*1024)
|
||||
#endif
|
||||
|
@ -765,7 +765,14 @@ static bool create_client_data(esp_mqtt_client_handle_t client)
|
||||
|
||||
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
|
||||
{
|
||||
esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client));
|
||||
esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client),
|
||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||
// if supporting multiple queued events, we keep track of them
|
||||
// using atomic variable, so need to make sure it won't get allocated in PSRAM
|
||||
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
|
||||
#else
|
||||
MALLOC_CAP_DEFAULT);
|
||||
#endif
|
||||
ESP_MEM_CHECK(TAG, client, return NULL);
|
||||
if (!create_client_data(client)) {
|
||||
goto _mqtt_init_failed;
|
||||
@ -776,10 +783,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
||||
}
|
||||
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
|
||||
esp_event_loop_args_t no_task_loop = {
|
||||
.queue_size = 1,
|
||||
.queue_size = MQTT_EVENT_QUEUE_SIZE,
|
||||
.task_name = NULL,
|
||||
};
|
||||
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
|
||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||
atomic_init(&client->queued_events, 0);
|
||||
#endif
|
||||
#endif
|
||||
|
||||
client->keepalive_tick = platform_tick_get_ms();
|
||||
@ -939,6 +949,17 @@ static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t cli
|
||||
return esp_mqtt_dispatch_event(client);
|
||||
}
|
||||
|
||||
esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event)
|
||||
{
|
||||
esp_err_t ret = esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, MQTT_USER_EVENT, event, sizeof(*event), 0);
|
||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||
if (ret == ESP_OK) {
|
||||
atomic_fetch_add(&client->queued_events, 1);
|
||||
}
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
client->event.client = client;
|
||||
@ -1447,6 +1468,34 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief When using multiple queued item, we'd like to reduce the poll timeout to proceed with event loop exacution
|
||||
*/
|
||||
static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_timeout)
|
||||
{
|
||||
return
|
||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||
atomic_load(&client->queued_events) > 0 ? 10: max_timeout;
|
||||
#else
|
||||
max_timeout;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline void run_event_loop(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||
if (atomic_load(&client->queued_events) > 0) {
|
||||
atomic_fetch_sub(&client->queued_events, 1);
|
||||
#else
|
||||
{
|
||||
#endif
|
||||
esp_err_t ret = esp_event_loop_run(client->config->event_loop_handle, 0);
|
||||
if (ret != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error in running event_loop %d", ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void esp_mqtt_task(void *pv)
|
||||
{
|
||||
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
|
||||
@ -1470,6 +1519,7 @@ static void esp_mqtt_task(void *pv)
|
||||
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
|
||||
while (client->run) {
|
||||
MQTT_API_LOCK(client);
|
||||
run_event_loop(client);
|
||||
switch (client->state) {
|
||||
case MQTT_STATE_DISCONNECTED:
|
||||
break;
|
||||
@ -1571,7 +1621,7 @@ static void esp_mqtt_task(void *pv)
|
||||
}
|
||||
MQTT_API_UNLOCK(client);
|
||||
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
|
||||
client->wait_timeout_ms / 2 / portTICK_PERIOD_MS);
|
||||
max_poll_timeout(client, client->wait_timeout_ms / 2 / portTICK_PERIOD_MS));
|
||||
// continue the while loop instead of break, as the mutex is unlocked
|
||||
continue;
|
||||
default:
|
||||
@ -1580,7 +1630,7 @@ static void esp_mqtt_task(void *pv)
|
||||
}
|
||||
MQTT_API_UNLOCK(client);
|
||||
if (MQTT_STATE_CONNECTED == client->state) {
|
||||
if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) {
|
||||
if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
|
||||
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
|
||||
esp_mqtt_abort_connection(client);
|
||||
}
|
||||
|
Reference in New Issue
Block a user