diff --git a/examples/emitter-client/CMakeLists.txt b/examples/emitter-client/CMakeLists.txt deleted file mode 100644 index 0dd1242..0000000 --- a/examples/emitter-client/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ -cmake_minimum_required(VERSION 3.5) - -get_filename_component(DEV_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE) - -set(PROJECT_ROOT "${DEV_ROOT}/") - -set(SUBMODULE_ROOT "${DEV_ROOT}/../../../") - -set(PROJECT_NAME "mqtt_ssl") - -include($ENV{IDF_PATH}/tools/cmake/project.cmake) - -set(MAIN_SRCS ${PROJECT_ROOT}/main/app_main.c) - -set(EXTRA_COMPONENT_DIRS "${EXTRA_COMPONENT_DIRS} ${SUBMODULE_ROOT}") -set(BUILD_COMPONENTS "${BUILD_COMPONENTS} espmqtt") - -project(${PROJECT_NAME}) - diff --git a/examples/emitter-client/Makefile b/examples/emitter-client/Makefile deleted file mode 100644 index 5b1d297..0000000 --- a/examples/emitter-client/Makefile +++ /dev/null @@ -1,13 +0,0 @@ -# -# This is a project Makefile. It is assumed the directory this Makefile resides in is a -# project subdirectory. -# -# -# This is a project Makefile. It is assumed the directory this Makefile resides in is a -# project subdirectory. -# -PROJECT_NAME := emitter_client -EXTRA_COMPONENT_DIRS += $(PROJECT_PATH)/../../../ - -include $(IDF_PATH)/make/project.mk - diff --git a/examples/emitter-client/README.md b/examples/emitter-client/README.md deleted file mode 100644 index a9aabae..0000000 --- a/examples/emitter-client/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# ESPMQTT Emitter client - -## Before you run this Example -- Register an account from https://emitter.io/ -- Login and create channel key, grant access for the channel `/topic/` as the images bellow -- `make menuconfig` provide Wi-Fi information and CHANNEL_KEY to `MQTT Application example` -- `make flash monitor` - -![](generate-key-0.png) -![](generate-key-1.png) diff --git a/examples/emitter-client/generate-key-0.png b/examples/emitter-client/generate-key-0.png deleted file mode 100644 index b43b5ed..0000000 Binary files a/examples/emitter-client/generate-key-0.png and /dev/null differ diff --git a/examples/emitter-client/generate-key-1.png b/examples/emitter-client/generate-key-1.png deleted file mode 100644 index c05bdb1..0000000 Binary files a/examples/emitter-client/generate-key-1.png and /dev/null differ diff --git a/examples/emitter-client/main/Kconfig.projbuild b/examples/emitter-client/main/Kconfig.projbuild deleted file mode 100644 index 50f4951..0000000 --- a/examples/emitter-client/main/Kconfig.projbuild +++ /dev/null @@ -1,22 +0,0 @@ -menu "MQTT Application example" - -config WIFI_SSID - string "WiFi SSID" - default "myssid" - help - SSID (network name) for the example to connect to. - -config WIFI_PASSWORD - string "WiFi Password" - default "mypassword" - help - WiFi password (WPA or WPA2) for the example to use. - -config EMITTER_CHANNEL_KEY - string "Emitter channel key" - default "" - help - The Emitter channel key using to pub/sub - - -endmenu diff --git a/examples/emitter-client/main/app_main.c b/examples/emitter-client/main/app_main.c deleted file mode 100755 index c2ab9d9..0000000 --- a/examples/emitter-client/main/app_main.c +++ /dev/null @@ -1,156 +0,0 @@ -#include -#include -#include -#include -#include "esp_wifi.h" -#include "esp_system.h" -#include "nvs_flash.h" - -#include "freertos/FreeRTOS.h" -#include "freertos/task.h" -#include "freertos/semphr.h" -#include "freertos/queue.h" -#include "freertos/event_groups.h" - -#include "lwip/sockets.h" -#include "lwip/dns.h" -#include "lwip/netdb.h" - -#include "esp_log.h" -#include "mqtt_client.h" - -static const char *TAG = "MQTTS_SAMPLE"; - -static EventGroupHandle_t wifi_event_group; -const static int CONNECTED_BIT = BIT0; - -static void wifi_event_handler(void* arg, esp_event_base_t event_base, - int32_t event_id, void* event_data) -{ - switch (event_id) { - case WIFI_EVENT_STA_START: - esp_wifi_connect(); - break; - case WIFI_EVENT_STA_DISCONNECTED: - esp_wifi_connect(); - xEventGroupClearBits(wifi_event_group, CONNECTED_BIT); - break; - default: - break; - } - return; -} - -static void ip_event_handler(void* arg, esp_event_base_t event_base, - int32_t event_id, void* event_data) -{ - switch (event_id) { - case IP_EVENT_STA_GOT_IP: - xEventGroupSetBits(wifi_event_group, CONNECTED_BIT); - - break; - default: - break; - } - return; -} - -static void wifi_init(void) -{ - tcpip_adapter_init(); - wifi_event_group = xEventGroupCreate(); - ESP_ERROR_CHECK(esp_event_loop_create_default()); - ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, &wifi_event_handler, NULL)); - ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &ip_event_handler, NULL)); - - wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT(); - ESP_ERROR_CHECK(esp_wifi_init(&cfg)); - ESP_ERROR_CHECK(esp_wifi_set_storage(WIFI_STORAGE_RAM)); - wifi_config_t wifi_config = { - .sta = { - .ssid = CONFIG_WIFI_SSID, - .password = CONFIG_WIFI_PASSWORD, - }, - }; - ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA)); - ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config)); - ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******"); - ESP_ERROR_CHECK(esp_wifi_start()); - ESP_LOGI(TAG, "Waiting for wifi"); - xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY); -} - -static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) -{ - esp_mqtt_client_handle_t client = event->client; - int msg_id; - // your_context_t *context = event->context; - switch (event->event_id) { - case MQTT_EVENT_CONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); - msg_id = esp_mqtt_client_subscribe(client, CONFIG_EMITTER_CHANNEL_KEY"/topic/", 0); - ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - - break; - case MQTT_EVENT_DISCONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - break; - - case MQTT_EVENT_SUBSCRIBED: - ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); - msg_id = esp_mqtt_client_publish(client, CONFIG_EMITTER_CHANNEL_KEY"/topic/", "data", 0, 0, 0); - ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); - break; - case MQTT_EVENT_UNSUBSCRIBED: - ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); - break; - case MQTT_EVENT_PUBLISHED: - ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); - break; - case MQTT_EVENT_DATA: - ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); - printf("DATA=%.*s\r\n", event->data_len, event->data); - break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); - break; - default: - break; - } - return ESP_OK; -} - -static void mqtt_app_start(void) -{ - const esp_mqtt_client_config_t mqtt_cfg = { - .uri = "mqtts://api.emitter.io:443", // for mqtt over ssl - // .uri = "mqtt://api.emitter.io:8080", //for mqtt over tcp - // .uri = "ws://api.emitter.io:8080", //for mqtt over websocket - // .uri = "wss://api.emitter.io:443", //for mqtt over websocket secure - .event_handle = mqtt_event_handler, - }; - - ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); - esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); - esp_mqtt_client_start(client); -} - -void app_main() -{ - ESP_LOGI(TAG, "[APP] Startup.."); - ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); - ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version()); - - esp_log_level_set("*", ESP_LOG_INFO); - esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE); - esp_log_level_set("TRANSPORT_TCP", ESP_LOG_VERBOSE); - esp_log_level_set("TRANSPORT_SSL", ESP_LOG_VERBOSE); - esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE); - esp_log_level_set("OUTBOX", ESP_LOG_VERBOSE); - - nvs_flash_init(); - wifi_init(); - mqtt_app_start(); - -} diff --git a/examples/emitter-client/main/component.mk b/examples/emitter-client/main/component.mk deleted file mode 100644 index e69de29..0000000 diff --git a/examples/mqtt_ssl_mutual_auth/CMakeLists.txt b/examples/mqtt_ssl_mutual_auth/CMakeLists.txt deleted file mode 100644 index 106b117..0000000 --- a/examples/mqtt_ssl_mutual_auth/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ -cmake_minimum_required(VERSION 3.5) - -get_filename_component(DEV_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE) - -set(PROJECT_ROOT "${DEV_ROOT}/") - -set(SUBMODULE_ROOT "${DEV_ROOT}/../../../") - -set(PROJECT_NAME "mqtt_ssl_mutual_auth") - -include($ENV{IDF_PATH}/tools/cmake/project.cmake) - -set(MAIN_SRCS ${PROJECT_ROOT}/main/app_main.c) - -set(EXTRA_COMPONENT_DIRS "${EXTRA_COMPONENT_DIRS} ${SUBMODULE_ROOT}") -set(BUILD_COMPONENTS "${BUILD_COMPONENTS} espmqtt") - -project(${PROJECT_NAME}) - diff --git a/examples/mqtt_ssl_mutual_auth/Makefile b/examples/mqtt_ssl_mutual_auth/Makefile deleted file mode 100644 index c22f41d..0000000 --- a/examples/mqtt_ssl_mutual_auth/Makefile +++ /dev/null @@ -1,13 +0,0 @@ -# -# This is a project Makefile. It is assumed the directory this Makefile resides in is a -# project subdirectory. -# -# -# This is a project Makefile. It is assumed the directory this Makefile resides in is a -# project subdirectory. -# -PROJECT_NAME := mqtt_ssl_mutual_auth -EXTRA_COMPONENT_DIRS += $(PROJECT_PATH)/../../../ - -include $(IDF_PATH)/make/project.mk - diff --git a/examples/mqtt_ssl_mutual_auth/README.md b/examples/mqtt_ssl_mutual_auth/README.md deleted file mode 100644 index c415cdf..0000000 --- a/examples/mqtt_ssl_mutual_auth/README.md +++ /dev/null @@ -1,16 +0,0 @@ -# ESPMQTT SSL Sample application - -Navigate to the main directory - -``` -cd main -``` - -Generate a client key and a CSR. When you are generating the CSR, do not use the default values. At a minimum, the CSR must include the Country, Organisation and Common Name fields. - -``` -openssl genrsa -out client.key -openssl req -out client.csr -key client.key -new -``` - -Paste the generated CSR in the [Mosquitto test certificate signer](https://test.mosquitto.org/ssl/index.php), click Submit and copy the downloaded `client.crt` in the `main` directory. diff --git a/examples/mqtt_ssl_mutual_auth/main/Kconfig.projbuild b/examples/mqtt_ssl_mutual_auth/main/Kconfig.projbuild deleted file mode 100644 index 1c9c2e6..0000000 --- a/examples/mqtt_ssl_mutual_auth/main/Kconfig.projbuild +++ /dev/null @@ -1,15 +0,0 @@ -menu "MQTT Application sample" - -config WIFI_SSID - string "WiFi SSID" - default "myssid" - help - SSID (network name) for the example to connect to. - -config WIFI_PASSWORD - string "WiFi Password" - default "mypassword" - help - WiFi password (WPA or WPA2) for the example to use. - -endmenu diff --git a/examples/mqtt_ssl_mutual_auth/main/app_main.c b/examples/mqtt_ssl_mutual_auth/main/app_main.c deleted file mode 100755 index 21c8ece..0000000 --- a/examples/mqtt_ssl_mutual_auth/main/app_main.c +++ /dev/null @@ -1,167 +0,0 @@ -#include -#include -#include -#include -#include "esp_wifi.h" -#include "esp_system.h" -#include "nvs_flash.h" - -#include "freertos/FreeRTOS.h" -#include "freertos/task.h" -#include "freertos/semphr.h" -#include "freertos/queue.h" -#include "freertos/event_groups.h" - -#include "lwip/sockets.h" -#include "lwip/dns.h" -#include "lwip/netdb.h" - -#include "esp_log.h" -#include "esp_event.h" -#include "tcpip_adapter.h" -#include "mqtt_client.h" - -static const char *TAG = "MQTTS_SAMPLE"; - -static EventGroupHandle_t wifi_event_group; -const static int CONNECTED_BIT = BIT0; - -static void wifi_event_handler(void* arg, esp_event_base_t event_base, - int32_t event_id, void* event_data) -{ - switch (event_id) { - case WIFI_EVENT_STA_START: - esp_wifi_connect(); - break; - case WIFI_EVENT_STA_DISCONNECTED: - esp_wifi_connect(); - xEventGroupClearBits(wifi_event_group, CONNECTED_BIT); - break; - default: - break; - } - return; -} - -static void ip_event_handler(void* arg, esp_event_base_t event_base, - int32_t event_id, void* event_data) -{ - switch (event_id) { - case IP_EVENT_STA_GOT_IP: - xEventGroupSetBits(wifi_event_group, CONNECTED_BIT); - - break; - default: - break; - } - return; -} - -static void wifi_init(void) -{ - tcpip_adapter_init(); - wifi_event_group = xEventGroupCreate(); - ESP_ERROR_CHECK(esp_event_loop_create_default()); - ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, &wifi_event_handler, NULL)); - ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &ip_event_handler, NULL)); - - wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT(); - ESP_ERROR_CHECK(esp_wifi_init(&cfg)); - ESP_ERROR_CHECK(esp_wifi_set_storage(WIFI_STORAGE_RAM)); - wifi_config_t wifi_config = { - .sta = { - .ssid = CONFIG_WIFI_SSID, - .password = CONFIG_WIFI_PASSWORD, - }, - }; - ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA)); - ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config)); - ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******"); - ESP_ERROR_CHECK(esp_wifi_start()); - ESP_LOGI(TAG, "Waiting for wifi"); - xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY); -} - -extern const uint8_t client_cert_pem_start[] asm("_binary_client_crt_start"); -extern const uint8_t client_cert_pem_end[] asm("_binary_client_crt_end"); -extern const uint8_t client_key_pem_start[] asm("_binary_client_key_start"); -extern const uint8_t client_key_pem_end[] asm("_binary_client_key_end"); - -static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) -{ - esp_mqtt_client_handle_t client = event->client; - int msg_id; - // your_context_t *context = event->context; - switch (event->event_id) { - case MQTT_EVENT_CONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); - msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); - ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - - msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); - ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - - msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); - ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); - break; - case MQTT_EVENT_DISCONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - break; - - case MQTT_EVENT_SUBSCRIBED: - ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); - msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); - ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); - break; - case MQTT_EVENT_UNSUBSCRIBED: - ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); - break; - case MQTT_EVENT_PUBLISHED: - ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); - break; - case MQTT_EVENT_DATA: - ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); - printf("DATA=%.*s\r\n", event->data_len, event->data); - break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); - break; - default: - break; - } - return ESP_OK; -} - -static void mqtt_app_start(void) -{ - const esp_mqtt_client_config_t mqtt_cfg = { - .uri = "mqtts://test.mosquitto.org:8884", - .event_handle = mqtt_event_handler, - .client_cert_pem = (const char *)client_cert_pem_start, - .client_key_pem = (const char *)client_key_pem_start, - }; - - ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); - esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); - esp_mqtt_client_start(client); -} - -void app_main() -{ - ESP_LOGI(TAG, "[APP] Startup.."); - ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); - ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version()); - - esp_log_level_set("*", ESP_LOG_INFO); - esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE); - esp_log_level_set("TRANSPORT_TCP", ESP_LOG_VERBOSE); - esp_log_level_set("TRANSPORT_SSL", ESP_LOG_VERBOSE); - esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE); - esp_log_level_set("OUTBOX", ESP_LOG_VERBOSE); - - nvs_flash_init(); - wifi_init(); - mqtt_app_start(); - -} diff --git a/examples/mqtt_ssl_mutual_auth/main/component.mk b/examples/mqtt_ssl_mutual_auth/main/component.mk deleted file mode 100644 index 01adda5..0000000 --- a/examples/mqtt_ssl_mutual_auth/main/component.mk +++ /dev/null @@ -1 +0,0 @@ -COMPONENT_EMBED_TXTFILES := client.crt client.key diff --git a/include/mqtt_client.h b/include/mqtt_client.h index bbcafa4..796f431 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -223,6 +223,15 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client); */ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client); +/** + * @brief This api is typically used to force disconnection from the broker + * + * @param client mqtt client handle + * + * @return ESP_OK on success + */ +esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client); + /** * @brief Stops mqtt client tasks * diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index a0d2acc..f91dffd 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -93,19 +93,19 @@ typedef struct mqtt_connect_info { } mqtt_connect_info_t; -static inline int mqtt_get_type(uint8_t *buffer) +static inline int mqtt_get_type(const uint8_t *buffer) { return (buffer[0] & 0xf0) >> 4; } -static inline int mqtt_get_connect_session_present(uint8_t *buffer) +static inline int mqtt_get_connect_session_present(const uint8_t *buffer) { return buffer[2] & 0x01; } -static inline int mqtt_get_connect_return_code(uint8_t *buffer) +static inline int mqtt_get_connect_return_code(const uint8_t *buffer) { return buffer[3]; } -static inline int mqtt_get_dup(uint8_t *buffer) +static inline int mqtt_get_dup(const uint8_t *buffer) { return (buffer[0] & 0x08) >> 3; } @@ -113,18 +113,18 @@ static inline void mqtt_set_dup(uint8_t *buffer) { buffer[0] |= 0x08; } -static inline int mqtt_get_qos(uint8_t *buffer) +static inline int mqtt_get_qos(const uint8_t *buffer) { return (buffer[0] & 0x06) >> 1; } -static inline int mqtt_get_retain(uint8_t *buffer) +static inline int mqtt_get_retain(const uint8_t *buffer) { return (buffer[0] & 0x01); } void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint32_t buffer_length); bool mqtt_header_complete(uint8_t *buffer, uint32_t buffer_length); -uint32_t mqtt_get_total_length(uint8_t *buffer, uint32_t length, int *fixed_size_len); +uint32_t mqtt_get_total_length(const uint8_t *buffer, uint32_t length, int *fixed_size_len); char *mqtt_get_publish_topic(uint8_t *buffer, uint32_t *length); char *mqtt_get_publish_data(uint8_t *buffer, uint32_t *length); uint16_t mqtt_get_id(uint8_t *buffer, uint32_t length); diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 8d5f342..21b1436 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -16,6 +16,7 @@ struct outbox_item; typedef struct outbox_list_t *outbox_handle_t; typedef struct outbox_item *outbox_item_handle_t; typedef struct outbox_message *outbox_message_handle_t; +typedef long long outbox_tick_t; typedef struct outbox_message { uint8_t *data; @@ -35,17 +36,17 @@ typedef enum pending_state { } pending_state_t; outbox_handle_t outbox_init(void); -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick); +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick); +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); -int outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout); +int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); -esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, int tick); +esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick); int outbox_get_size(outbox_handle_t outbox); esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size); void outbox_destroy(outbox_handle_t outbox); diff --git a/lib/include/platform_esp32_idf.h b/lib/include/platform_esp32_idf.h index 1054a18..c3edb9f 100644 --- a/lib/include/platform_esp32_idf.h +++ b/lib/include/platform_esp32_idf.h @@ -32,4 +32,10 @@ void ms_to_timeval(int timeout_ms, struct timeval *tv); ESP_LOGE(TAG,"%s:%d (%s): %s", __FILE__, __LINE__, __FUNCTION__, "Memory exhausted"); \ action; \ } + +#define ESP_OK_CHECK(TAG, a, action) if ((a) != ESP_OK) { \ + ESP_LOGE(TAG,"%s:%d (%s): %s", __FILE__, __LINE__, __FUNCTION__, "Failed with non ESP_OK err code"); \ + action; \ + } + #endif diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index 37b2bd7..f6f983e 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -140,7 +140,7 @@ void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, uint32_t buff connection->buffer_length = buffer_length; } -uint32_t mqtt_get_total_length(uint8_t *buffer, uint32_t length, int *fixed_size_len) +uint32_t mqtt_get_total_length(const uint8_t *buffer, uint32_t length, int *fixed_size_len) { int i; uint32_t totlen = 0; diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 5db1224..4ba4b76 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -15,7 +15,7 @@ typedef struct outbox_item { int msg_id; int msg_type; int msg_qos; - int tick; + outbox_tick_t tick; int retry_count; pending_state_t pending; STAILQ_ENTRY(outbox_item) next; @@ -32,7 +32,7 @@ outbox_handle_t outbox_init(void) return outbox; } -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick) +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick) { outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t)); ESP_MEM_CHECK(TAG, item, return NULL); @@ -67,7 +67,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) return NULL; } -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick) +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick) { outbox_item_handle_t item; STAILQ_FOREACH(item, outbox, next) { @@ -131,7 +131,7 @@ esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t return ESP_FAIL; } -esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, int tick) +esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick) { outbox_item_handle_t item = outbox_get(outbox, msg_id); if (item) { @@ -155,7 +155,7 @@ esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type) return ESP_OK; } -int outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout) +int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) { int deleted_items = 0; outbox_item_handle_t item, tmp; @@ -199,7 +199,12 @@ esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size) void outbox_destroy(outbox_handle_t outbox) { - outbox_cleanup(outbox, 0); + outbox_item_handle_t item, tmp; + STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { + STAILQ_REMOVE(outbox, item, outbox_item, next); + free(item->buffer); + free(item); + } free(outbox); } diff --git a/mqtt_client.c b/mqtt_client.c index d5ab1c3..63dcd12 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -7,7 +7,6 @@ #include "esp_transport_tcp.h" #include "esp_transport_ssl.h" #include "esp_transport_ws.h" -#include "platform.h" #include "mqtt_outbox.h" #include "mqtt_supported_features.h" @@ -17,22 +16,12 @@ #ifdef MQTT_DISABLE_API_LOCKS # define MQTT_API_LOCK(c) # define MQTT_API_UNLOCK(c) -# define MQTT_API_LOCK_FROM_OTHER_TASK(c) -# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) #else -# define MQTT_API_LOCK(c) xSemaphoreTake(c->api_lock, portMAX_DELAY) -# define MQTT_API_UNLOCK(c) xSemaphoreGive(c->api_lock) -# define MQTT_API_LOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreTake(c->api_lock, portMAX_DELAY); } } -# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreGive(c->api_lock); } } +# define MQTT_API_LOCK(c) xSemaphoreTakeRecursive(c->api_lock, portMAX_DELAY) +# define MQTT_API_UNLOCK(c) xSemaphoreGiveRecursive(c->api_lock) #endif /* MQTT_USE_API_LOCKS */ -#ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES -# define MQTT_TRANSPORT_SET_CERT_OR_KEY(setfn, key, len) \ - { if (key) { if (len) { setfn##_der(ssl, key, len); } else { setfn(ssl, key, strlen(key)); } } } -#else -# define MQTT_TRANSPORT_SET_CERT_OR_KEY(setfn, key, len) \ - { if (key) { setfn(ssl, key, strlen(key)); } } -#endif +_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type"); static const char *TAG = "MQTT_CLIENT"; @@ -63,9 +52,7 @@ typedef struct mqtt_state typedef struct { mqtt_event_callback_t event_handle; -#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP esp_event_loop_handle_t event_loop_handle; -#endif int task_stack; int task_prio; char *uri; @@ -82,6 +69,14 @@ typedef struct { int num_alpn_protos; char *clientkey_password; int clientkey_password_len; + bool use_global_ca_store; + const char *cacert_buf; + size_t cacert_bytes; + const char *clientcert_buf; + size_t clientcert_bytes; + const char *clientkey_buf; + size_t clientkey_bytes; + const struct psk_key_hint *psk_hint_key; } mqtt_config_storage_t; typedef enum { @@ -121,6 +116,7 @@ struct esp_mqtt_client { const static int STOPPED_BIT = BIT0; const static int RECONNECT_BIT = BIT1; +const static int DISCONNECT_BIT = BIT2; static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client); @@ -131,9 +127,126 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client); static char *create_string(const char *ptr, int len); static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms); +#if MQTT_ENABLE_SSL +enum esp_mqtt_ssl_cert_key_api { + MQTT_SSL_DATA_API_CA_CERT, + MQTT_SSL_DATA_API_CLIENT_CERT, + MQTT_SSL_DATA_API_CLIENT_KEY, + MQTT_SSL_DATA_API_MAX, +}; + +static esp_err_t esp_mqtt_set_cert_key_data(esp_transport_handle_t ssl, enum esp_mqtt_ssl_cert_key_api what, const char *cert_key_data, int cert_key_len) +{ + char *data = (char *)cert_key_data; + int ssl_transport_api_id = what; + int len = cert_key_len; + + if (!data) { + return ESP_OK; + } + + if (len == 0) { + // if length not specified, expect 0-terminated PEM string + // and the original transport_api_id (by convention after the last api_id in the enum) + ssl_transport_api_id += MQTT_SSL_DATA_API_MAX; + len = strlen(data); + } +#ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES + else { + ESP_LOGE(TAG, "Explicit cert-/key-len is not available in IDF version %s", IDF_VER); + return ESP_ERR_NOT_SUPPORTED; + } +#endif + + // option to force the cert/key config to null (i.e. skip validation) when existing config updates + if (0 == strcmp(data, "NULL")) { + data = NULL; + len = 0; + } + + switch (ssl_transport_api_id) { +#ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES + case MQTT_SSL_DATA_API_CA_CERT: + esp_transport_ssl_set_cert_data_der(ssl, data, len); + break; + case MQTT_SSL_DATA_API_CLIENT_CERT: + esp_transport_ssl_set_client_cert_data_der(ssl, data, len); + break; + case MQTT_SSL_DATA_API_CLIENT_KEY: + esp_transport_ssl_set_client_key_data_der(ssl, data, len); + break; +#endif + case MQTT_SSL_DATA_API_CA_CERT + MQTT_SSL_DATA_API_MAX: + esp_transport_ssl_set_cert_data(ssl, data, len); + break; + case MQTT_SSL_DATA_API_CLIENT_CERT + MQTT_SSL_DATA_API_MAX: + esp_transport_ssl_set_client_cert_data(ssl, data, len); + break; + case MQTT_SSL_DATA_API_CLIENT_KEY + MQTT_SSL_DATA_API_MAX: + esp_transport_ssl_set_client_key_data(ssl, data, len); + break; + default: + return ESP_ERR_INVALID_ARG; + } + return ESP_OK; +} + +static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle_t transport_list, mqtt_config_storage_t *cfg) +{ + esp_transport_handle_t ssl = esp_transport_list_get_transport(transport_list, "mqtts"); + + if (cfg->use_global_ca_store == true) { + esp_transport_ssl_enable_global_ca_store(ssl); + } else { + ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes), + goto esp_mqtt_set_transport_failed); + } + + ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_CERT, cfg->clientcert_buf, cfg->clientcert_bytes), + goto esp_mqtt_set_transport_failed); + ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_KEY, cfg->clientkey_buf, cfg->clientkey_bytes), + goto esp_mqtt_set_transport_failed); + + if (cfg->clientkey_password && cfg->clientkey_password_len) { +#if defined(MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD) && MQTT_ENABLE_SSL + esp_transport_ssl_set_client_key_password(ssl, + cfg->clientkey_password, + cfg->clientkey_password_len); +#else + ESP_LOGE(TAG, "Password protected keys are not available in IDF version %s", IDF_VER); + goto esp_mqtt_set_transport_failed; +#endif + } + + if (cfg->psk_hint_key) { +#if defined(MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION) && MQTT_ENABLE_SSL + esp_transport_ssl_set_psk_key_hint(ssl, cfg->psk_hint_key); +#else + ESP_LOGE(TAG, "PSK authentication is not available in IDF version %s", IDF_VER); + goto _mqtt_set_config_failed; +#endif + } + + if (cfg->alpn_protos) { +#if defined(MQTT_SUPPORTED_FEATURE_ALPN) && MQTT_ENABLE_SSL + ESP_LOGE(TAG, "%p", cfg->alpn_protos); + esp_transport_ssl_set_alpn_protocol(ssl, (const char **)cfg->alpn_protos); +#else + ESP_LOGE(TAG, "APLN is not available in IDF version %s", IDF_VER); + goto _mqtt_set_config_failed; +#endif + } + + return ESP_OK; + + esp_mqtt_set_transport_failed: + return ESP_FAIL; +} +#endif // MQTT_ENABLE_SSL + esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) { - MQTT_API_LOCK_FROM_OTHER_TASK(client); + MQTT_API_LOCK(client); //Copy user configurations to client context esp_err_t err = ESP_OK; mqtt_config_storage_t *cfg; @@ -142,7 +255,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } else { cfg = calloc(1, sizeof(mqtt_config_storage_t)); ESP_MEM_CHECK(TAG, cfg, { - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return ESP_ERR_NO_MEM; }); client->config = cfg; @@ -270,7 +383,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl cfg->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS; } - if(config->alpn_protos) { + if (config->alpn_protos) { for (int i = 0; i < cfg->num_alpn_protos; i++) { free(cfg->alpn_protos[i]); } @@ -292,23 +405,62 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } } + // configure ssl related parameters + cfg->use_global_ca_store = config->use_global_ca_store; + cfg->cacert_buf = config->cert_pem; + cfg->cacert_bytes = config->cert_len; + cfg->clientcert_buf = config->client_cert_pem; + cfg->clientcert_bytes = config->client_cert_len; + cfg->clientkey_buf = config->client_key_pem; + cfg->clientkey_bytes = config->client_key_len; + cfg->psk_hint_key = config->psk_hint_key; + if (config->clientkey_password && config->clientkey_password_len) { cfg->clientkey_password_len = config->clientkey_password_len; cfg->clientkey_password = malloc(cfg->clientkey_password_len); memcpy(cfg->clientkey_password, config->clientkey_password, cfg->clientkey_password_len); } - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + if (config->transport) { + free(client->config->scheme); + if (config->transport == MQTT_TRANSPORT_OVER_WS) { + cfg->scheme = create_string("ws", 2); + ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed); + } else if (config->transport == MQTT_TRANSPORT_OVER_TCP) { + cfg->scheme = create_string("mqtt", 4); + ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed); + } else if (config->transport == MQTT_TRANSPORT_OVER_SSL) { + cfg->scheme = create_string("mqtts", 5); + ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed); + } else if (config->transport == MQTT_TRANSPORT_OVER_WSS) { + cfg->scheme = create_string("wss", 3); + ESP_MEM_CHECK(TAG, cfg->scheme, goto _mqtt_set_config_failed); + } + } + + // Set uri at the end of config to override separately configured uri elements + if (config->uri) { + if (esp_mqtt_client_set_uri(client, cfg->uri) != ESP_OK) { + err = ESP_FAIL; + goto _mqtt_set_config_failed; + } + } + + MQTT_API_UNLOCK(client); + return ESP_OK; _mqtt_set_config_failed: esp_mqtt_destroy_config(client); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return err; } static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) { mqtt_config_storage_t *cfg = client->config; + if (cfg == NULL) { + return ESP_ERR_INVALID_STATE; + } free(cfg->host); free(cfg->uri); free(cfg->path); @@ -331,6 +483,7 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) #endif memset(cfg, 0, sizeof(mqtt_config_storage_t)); free(client->config); + client->config = NULL; return ESP_OK; } @@ -436,20 +589,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co free(client); return NULL; } - client->api_lock = xSemaphoreCreateMutex(); + client->api_lock = xSemaphoreCreateRecursiveMutex(); if (!client->api_lock) { free(client->event.error_handle); free(client); return NULL; } - esp_mqtt_set_config(client, config); -#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP - esp_event_loop_args_t no_task_loop = { - .queue_size = 1, - .task_name = NULL, - }; - esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle); -#endif + client->transport_list = esp_transport_list_init(); ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed); @@ -457,10 +603,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co ESP_MEM_CHECK(TAG, tcp, goto _mqtt_init_failed); esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT); esp_transport_list_add(client->transport_list, tcp, "mqtt"); - if (config->transport == MQTT_TRANSPORT_OVER_TCP) { - client->config->scheme = create_string("mqtt", 4); - ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed); - } #if MQTT_ENABLE_WS esp_transport_handle_t ws = esp_transport_ws_init(tcp); @@ -470,10 +612,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co esp_transport_ws_set_subprotocol(ws, "mqtt"); #endif esp_transport_list_add(client->transport_list, ws, "ws"); - if (config->transport == MQTT_TRANSPORT_OVER_WS) { - client->config->scheme = create_string("ws", 2); - ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed); - } #endif #if MQTT_ENABLE_SSL @@ -481,51 +619,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co ESP_MEM_CHECK(TAG, ssl, goto _mqtt_init_failed); esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT); -#ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES - if (config->cert_len || config->client_cert_len || config->client_key_len) { - ESP_LOGE(TAG, "Explicit cert-/key-len is not available in IDF version %s", IDF_VER); - goto _mqtt_init_failed; - } -#endif - - if (config->use_global_ca_store == true) { - esp_transport_ssl_enable_global_ca_store(ssl); - } else if (config->cert_pem) { - MQTT_TRANSPORT_SET_CERT_OR_KEY(esp_transport_ssl_set_cert_data, config->cert_pem, config->cert_len); - } - MQTT_TRANSPORT_SET_CERT_OR_KEY(esp_transport_ssl_set_client_cert_data, config->client_cert_pem, config->client_cert_len); - MQTT_TRANSPORT_SET_CERT_OR_KEY(esp_transport_ssl_set_client_key_data, config->client_key_pem, config->client_key_len); -#ifdef MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD - if (client->config->clientkey_password && client->config->clientkey_password_len) { - esp_transport_ssl_set_client_key_password(ssl, - client->config->clientkey_password, - client->config->clientkey_password_len); - } -#endif - - if (config->psk_hint_key) { -#ifdef MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION - esp_transport_ssl_set_psk_key_hint(ssl, config->psk_hint_key); -#else - ESP_LOGE(TAG, "PSK authentication is not available in IDF version %s", IDF_VER); - goto _mqtt_init_failed; -#endif - } - - if (client->config->alpn_protos) { -#ifdef MQTT_SUPPORTED_FEATURE_ALPN - esp_transport_ssl_set_alpn_protocol(ssl, (const char **)client->config->alpn_protos); -#else - ESP_LOGE(TAG, "APLN is not available in IDF version %s", IDF_VER); - goto _mqtt_init_failed; -#endif - } - esp_transport_list_add(client->transport_list, ssl, "mqtts"); - if (config->transport == MQTT_TRANSPORT_OVER_SSL) { - client->config->scheme = create_string("mqtts", 5); - ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed); - } #endif #if MQTT_ENABLE_WSS @@ -536,16 +630,19 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co #endif esp_transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT); esp_transport_list_add(client->transport_list, wss, "wss"); - if (config->transport == MQTT_TRANSPORT_OVER_WSS) { - client->config->scheme = create_string("wss", 3); - ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed); - } #endif - if (client->config->uri) { - if (esp_mqtt_client_set_uri(client, client->config->uri) != ESP_OK) { - goto _mqtt_init_failed; - } + ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed); + + if (esp_mqtt_set_config(client, config) != ESP_OK) { + goto _mqtt_init_failed; } +#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP + esp_event_loop_args_t no_task_loop = { + .queue_size = 1, + .task_name = NULL, + }; + esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle); +#endif client->keepalive_tick = platform_tick_get_ms(); client->reconnect_tick = platform_tick_get_ms(); @@ -625,7 +722,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u } // This API could be also executed when client is active (need to protect config fields) - MQTT_API_LOCK_FROM_OTHER_TASK(client); + MQTT_API_LOCK(client); // set uri overrides actual scheme, host, path if configured previously free(client->config->scheme); free(client->config->host); @@ -663,7 +760,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u free(user_info); } - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return ESP_OK; } @@ -757,15 +854,13 @@ post_data_event: esp_mqtt_dispatch_event(client); if (msg_read_len < msg_total_len) { - // if total data is longer then actual -> read payload only size_t buf_len = client->mqtt_state.in_buffer_length; - esp_transport_handle_t transport = esp_transport_get_payload_transport_handle(client->transport); msg_data = (char *)client->mqtt_state.in_buffer; msg_topic = NULL; msg_topic_len = 0; msg_data_offset += msg_data_len; - msg_data_len = esp_transport_read(transport, (char *)client->mqtt_state.in_buffer, + msg_data_len = esp_transport_read(client-> transport, (char *)client->mqtt_state.in_buffer, msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, client->config->network_timeout_ms); if (msg_data_len <= 0) { @@ -871,8 +966,6 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t buf++; client->mqtt_state.in_buffer_read_len++; } - /* any further reading only the underlying payload */ - t = esp_transport_get_payload_transport_handle(t); if ((client->mqtt_state.in_buffer_read_len == 1) || ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) { do { @@ -1100,8 +1193,8 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; - uint32_t last_retransmit = 0; - int32_t msg_tick = 0; + uint64_t last_retransmit = 0; + outbox_tick_t msg_tick = 0; client->run = true; //get transport by scheme @@ -1122,7 +1215,7 @@ static void esp_mqtt_task(void *pv) MQTT_API_LOCK(client); switch ((int)client->state) { case MQTT_STATE_INIT: - xEventGroupClearBits(client->status_bits, RECONNECT_BIT); + xEventGroupClearBits(client->status_bits, RECONNECT_BIT | DISCONNECT_BIT); client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; esp_mqtt_dispatch_event_with_msgid(client); @@ -1130,6 +1223,9 @@ static void esp_mqtt_task(void *pv) ESP_LOGE(TAG, "There is no transport"); client->run = false; } +#if MQTT_ENABLE_SSL + esp_mqtt_set_ssl_transport_properties(client->transport_list, client->config); +#endif if (esp_transport_connect(client->transport, client->config->host, @@ -1162,6 +1258,11 @@ static void esp_mqtt_task(void *pv) break; case MQTT_STATE_CONNECTED: + // check for disconnection request + if (xEventGroupWaitBits(client->status_bits, DISCONNECT_BIT, true, true, 0) & DISCONNECT_BIT) { + esp_mqtt_abort_connection(client); + break; + } // receive and process data if (mqtt_process_receive(client) == ESP_FAIL) { esp_mqtt_abort_connection(client); @@ -1257,10 +1358,10 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) ESP_LOGE(TAG, "Client was not initialized"); return ESP_ERR_INVALID_ARG; } - MQTT_API_LOCK_FROM_OTHER_TASK(client); + MQTT_API_LOCK(client); if (client->state >= MQTT_STATE_INIT) { ESP_LOGE(TAG, "Client has started"); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return ESP_FAIL; } esp_err_t err = ESP_OK; @@ -1277,13 +1378,21 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) err = ESP_FAIL; } #endif - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return err; } +esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client) +{ + ESP_LOGI(TAG, "Client asked to disconnect"); + xEventGroupSetBits(client->status_bits, DISCONNECT_BIT); + return ESP_OK; +} + esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client) { ESP_LOGI(TAG, "Client force reconnect requested"); + if (client->state != MQTT_STATE_WAIT_TIMEOUT) { ESP_LOGD(TAG, "The client is not waiting for reconnection. Ignore the request"); return ESP_FAIL; @@ -1295,7 +1404,7 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client) esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) { - MQTT_API_LOCK_FROM_OTHER_TASK(client); + MQTT_API_LOCK(client); if (client->run) { // Only send the disconnect message if the client is connected if(client->state == MQTT_STATE_CONNECTED) { @@ -1303,6 +1412,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection); if (client->mqtt_state.outbound_message->length == 0) { ESP_LOGE(TAG, "Disconnect message cannot be created"); + MQTT_API_UNLOCK(client); return ESP_FAIL; } if (mqtt_write_data(client) != ESP_OK) { @@ -1312,12 +1422,12 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) client->run = false; client->state = MQTT_STATE_UNKNOWN; - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY); return ESP_OK; } else { ESP_LOGW(TAG, "Client asked to stop, but was not started"); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return ESP_FAIL; } } @@ -1340,10 +1450,10 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client) int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos) { - MQTT_API_LOCK_FROM_OTHER_TASK(client); + MQTT_API_LOCK(client); if (client->state != MQTT_STATE_CONNECTED) { ESP_LOGE(TAG, "Client has not connected"); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return -1; } client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, @@ -1361,20 +1471,20 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return -1; } ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return client->mqtt_state.pending_msg_id; } int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic) { - MQTT_API_LOCK_FROM_OTHER_TASK(client); + MQTT_API_LOCK(client); if (client->state != MQTT_STATE_CONNECTED) { - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); ESP_LOGE(TAG, "Client has not connected"); return -1; } @@ -1394,12 +1504,12 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return -1; } ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return client->mqtt_state.pending_msg_id; } @@ -1417,7 +1527,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } - MQTT_API_LOCK_FROM_OTHER_TASK(client); + MQTT_API_LOCK(client); mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, topic, data, len, qos, retain, @@ -1425,12 +1535,12 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, if (publish_msg->length == 0) { ESP_LOGE(TAG, "Publish message cannot be created"); - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return -1; } /* We have to set as pending all the qos>0 messages */ + client->mqtt_state.outbound_message = publish_msg; if (qos > 0) { - client->mqtt_state.outbound_message = publish_msg; client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; @@ -1438,9 +1548,10 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { mqtt_enqueue(client); + } else { + int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; + mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment); } - } else { - client->mqtt_state.outbound_message = publish_msg; } /* Skip sending if not connected (rely on resending) */ @@ -1463,22 +1574,14 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, } int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; + client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0; + client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; remaining_len -= data_sent; current_data += data_sent; if (remaining_len > 0) { mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection; ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len); - if (connection->message.fragmented_msg_data_offset) { - // asked to enqueue oversized message (first time only) - connection->message.fragmented_msg_data_offset = 0; - connection->message.fragmented_msg_total_length = 0; - if (qos > 0) { - // internally enqueue all big messages, as they dont fit 'pending msg' structure - mqtt_enqueue_oversized(client, (uint8_t *)current_data, remaining_len); - } - } - if (remaining_len > connection->buffer_length) { // Continue with sending memcpy(connection->buffer, current_data, connection->buffer_length); @@ -1502,14 +1605,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms()); outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED); } - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); return pending_msg_id; cannot_publish: + // clear out possible fragmented publish if failed or skipped + client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; if (qos == 0) { ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected"); } - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + MQTT_API_UNLOCK(client); + return ret; }