From 0c25441fddf163897477285ae90dfa7405a0a858 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Wed, 15 Aug 2018 17:24:46 +0200 Subject: [PATCH] support for custom implementation of msg outbox --- Kconfig | 18 +++++++++++------ examples/mqtt_tcp/main/Kconfig.projbuild | 4 ++++ examples/mqtt_tcp/main/app_main.c | 25 ++++++++++++++---------- examples/mqtt_ws/main/app_main.c | 9 +++------ lib/include/mqtt_outbox.h | 15 ++------------ lib/mqtt_outbox.c | 19 ++++++++++++++++++ mqtt_client.c | 6 +----- 7 files changed, 56 insertions(+), 40 deletions(-) diff --git a/Kconfig b/Kconfig index f798f2d..c17f83c 100644 --- a/Kconfig +++ b/Kconfig @@ -79,11 +79,11 @@ config MQTT_TASK_STACK_SIZE MQTT task stack size config MQTT_TASK_CORE_SELECTION_ENABLED - bool "Enable MQTT task core selection" - default false - help - This will enable core selection - + bool "Enable MQTT task core selection" + default false + help + This will enable core selection + choice depends on MQTT_TASK_CORE_SELECTION_ENABLED prompt "Core to use ?" @@ -92,5 +92,11 @@ choice config MQTT_USE_CORE_1 bool "Core 1" endchoice - + +config MQTT_CUSTOM_OUTBOX + bool "Enable custom outbox implementation" + default n + help + Set to true if a specific implementation of message outbox is needed (e.g. persistant outbox in NVM or similar). + endmenu diff --git a/examples/mqtt_tcp/main/Kconfig.projbuild b/examples/mqtt_tcp/main/Kconfig.projbuild index 0310c2c..c96a495 100644 --- a/examples/mqtt_tcp/main/Kconfig.projbuild +++ b/examples/mqtt_tcp/main/Kconfig.projbuild @@ -18,4 +18,8 @@ config BROKER_URL help URL of the broker to connect to +config BROKER_URL_FROM_STDIN + bool + default y if BROKER_URL = "FROM_STDIN" + endmenu diff --git a/examples/mqtt_tcp/main/app_main.c b/examples/mqtt_tcp/main/app_main.c index d690b1c..f2c855b 100755 --- a/examples/mqtt_tcp/main/app_main.c +++ b/examples/mqtt_tcp/main/app_main.c @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -38,14 +37,14 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); - // msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); - // ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); + ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - // msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); - // ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); + ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - // msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); - // ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); + msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); + ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); @@ -53,8 +52,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); - // msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); - // ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); + msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); + ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); @@ -118,13 +117,15 @@ static void wifi_init(void) static void mqtt_app_start(void) { - char line[128]; esp_mqtt_client_config_t mqtt_cfg = { .uri = CONFIG_BROKER_URL, .event_handle = mqtt_event_handler, // .user_context = (void *)your_context }; +#if CONFIG_BROKER_URL_FROM_STDIN + char line[128]; + if (strcmp(mqtt_cfg.uri, "FROM_STDIN") == 0) { int count = 0; printf("Please enter url of mqtt broker\n"); @@ -141,7 +142,11 @@ static void mqtt_app_start(void) } mqtt_cfg.uri = line; printf("Broker url: %s\n", line); + } else { + ESP_LOGE(TAG, "Configuration mismatch: wrong broker url"); + abort(); } +#endif /* CONFIG_BROKER_URL_FROM_STDIN */ esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_start(client); diff --git a/examples/mqtt_ws/main/app_main.c b/examples/mqtt_ws/main/app_main.c index 2ebc65e..c9c65a5 100755 --- a/examples/mqtt_ws/main/app_main.c +++ b/examples/mqtt_ws/main/app_main.c @@ -40,8 +40,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - // msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); - // ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); + msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); + ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); @@ -49,10 +49,7 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); - msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_1", 0, 1, 0); - printf("sending msgid=%d\n", msg_id); - msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data_0", 0, 0, 0); - printf("sending msgid=%d\n", msg_id); + msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index b94bd80..a87b466 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -11,21 +11,10 @@ extern "C" { #endif -typedef struct outbox_item { - char *buffer; - int len; - int msg_id; - int msg_type; - int tick; - int retry_count; - bool pending; - STAILQ_ENTRY(outbox_item) next; -} outbox_item_t; - -STAILQ_HEAD(outbox_list_t, outbox_item); +struct outbox_item; typedef struct outbox_list_t * outbox_handle_t; -typedef outbox_item_t *outbox_item_handle_t; +typedef struct outbox_item * outbox_item_handle_t; outbox_handle_t outbox_init(); outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 8175475..300e698 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -4,8 +4,25 @@ #include "rom/queue.h" #include "esp_log.h" +#ifndef CONFIG_MQTT_CUSTOM_OUTBOX + + static const char *TAG = "OUTBOX"; +typedef struct outbox_item { + char *buffer; + int len; + int msg_id; + int msg_type; + int tick; + int retry_count; + bool pending; + STAILQ_ENTRY(outbox_item) next; +} outbox_item_t; + +STAILQ_HEAD(outbox_list_t, outbox_item); + + outbox_handle_t outbox_init() { outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t)); @@ -149,3 +166,5 @@ void outbox_destroy(outbox_handle_t outbox) outbox_cleanup(outbox, 0); free(outbox); } + +#endif /* CONFIG_MQTT_CUSTOM_OUTBOX */ \ No newline at end of file diff --git a/mqtt_client.c b/mqtt_client.c index fd1cedb..2b2c6c4 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -844,10 +844,6 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, if (len <= 0) { len = strlen(data); } - // Note: Need to enqueue the qos>0 msg - // if (qos > 0) { - // mqtt_enqueue(client); - // } client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, topic, data, len, @@ -857,7 +853,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_msg_count ++; - // Note: Need to enqueue the qos>0 msg to pass https://code.google.com/archive/p/mqtt4erl/wikis/QualityOfServiceUseCases.wiki + /* Have to enqueue all the qos>0 messages) */ mqtt_enqueue(client); }