From 33e2324098cb249eb3e725e64c71e14eb5a9de83 Mon Sep 17 00:00:00 2001 From: Tuan PM Date: Fri, 16 Feb 2018 22:48:22 +0700 Subject: [PATCH] Finishing mqtt client --- examples/mqtt_ssl/main/app_main.c | 7 +++-- examples/mqtt_tcp/main/app_main.c | 6 ++-- include/mqtt_client.h | 2 ++ lib/include/platform_esp32_idf.h | 1 + lib/transport_ssl.c | 7 +++-- mqtt_client.c | 52 ++++++++++++++++++++++--------- 6 files changed, 54 insertions(+), 21 deletions(-) diff --git a/examples/mqtt_ssl/main/app_main.c b/examples/mqtt_ssl/main/app_main.c index a0d32c2..6fcc312 100755 --- a/examples/mqtt_ssl/main/app_main.c +++ b/examples/mqtt_ssl/main/app_main.c @@ -20,7 +20,7 @@ #include "esp_log.h" #include "mqtt_client.h" -static const char *TAG = "MQTT_SAMPLE"; +static const char *TAG = "MQTTS_SAMPLE"; static EventGroupHandle_t wifi_event_group; const static int CONNECTED_BIT = BIT0; @@ -66,7 +66,6 @@ static void wifi_init(void) ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******"); ESP_ERROR_CHECK(esp_wifi_start()); ESP_LOGI(TAG, "Waiting for wifi"); - vTaskDelay(10000/portTICK_RATE_MS); xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY); } @@ -107,6 +106,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); @@ -123,6 +124,7 @@ static void mqtt_app_start(void) .cert_pem = (const char *)iot_eclipse_org_pem_start, }; + ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_start(client); } @@ -143,4 +145,5 @@ void app_main() nvs_flash_init(); wifi_init(); mqtt_app_start(); + } diff --git a/examples/mqtt_tcp/main/app_main.c b/examples/mqtt_tcp/main/app_main.c index 8e8799e..05ee57a 100755 --- a/examples/mqtt_tcp/main/app_main.c +++ b/examples/mqtt_tcp/main/app_main.c @@ -60,9 +60,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - vTaskDelay(500/portTICK_RATE_MS); - msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); - ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); @@ -109,6 +108,7 @@ static void wifi_init(void) ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config)); ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******"); ESP_ERROR_CHECK(esp_wifi_start()); + ESP_LOGI(TAG, "Waiting for wifi"); xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY); } diff --git a/include/mqtt_client.h b/include/mqtt_client.h index e101064..4909ebe 100755 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -27,6 +27,8 @@ typedef struct { void *user_context; char *data; int data_len; + int total_data_len; + int current_data_offset; char *topic; int topic_len; int msg_id; diff --git a/lib/include/platform_esp32_idf.h b/lib/include/platform_esp32_idf.h index ea7bcda..069d240 100644 --- a/lib/include/platform_esp32_idf.h +++ b/lib/include/platform_esp32_idf.h @@ -6,6 +6,7 @@ #include "freertos/task.h" #include "freertos/semphr.h" #include "freertos/queue.h" +#include "freertos/event_groups.h" #include "lwip/err.h" #include "lwip/sockets.h" diff --git a/lib/transport_ssl.c b/lib/transport_ssl.c index 4959eea..6137488 100644 --- a/lib/transport_ssl.c +++ b/lib/transport_ssl.c @@ -31,6 +31,7 @@ typedef struct { mbedtls_net_context client_fd; void *cert_pem_data; int cert_pem_len; + bool ssl_initialized; } transport_ssl_t; static int ssl_connect(transport_handle_t t, const char *host, int port, int timeout_ms) @@ -42,6 +43,7 @@ static int ssl_connect(transport_handle_t t, const char *host, int port, int tim if (!ssl) { return -1; } + ssl->ssl_initialized = true; mbedtls_ssl_init(&ssl->ctx); mbedtls_ctr_drbg_init(&ssl->ctr_drbg); mbedtls_ssl_config_init(&ssl->conf); @@ -196,8 +198,8 @@ static int ssl_close(transport_handle_t t) { int ret = -1; transport_ssl_t *ssl = transport_get_data(t); - if (ssl->client_fd.fd >= 0) { - + if (ssl->ssl_initialized) { + ESP_LOGD(TAG, "Cleanup mbedtls"); mbedtls_ssl_close_notify(&ssl->ctx); mbedtls_ssl_session_reset(&ssl->ctx); mbedtls_net_free(&ssl->client_fd); @@ -206,6 +208,7 @@ static int ssl_close(transport_handle_t t) mbedtls_ctr_drbg_free(&ssl->ctr_drbg); mbedtls_entropy_free(&ssl->entropy); mbedtls_ssl_free(&ssl->ctx); + ssl->ssl_initialized = false; } return ret; } diff --git a/mqtt_client.c b/mqtt_client.c index 38c1415..4f9760b 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -67,8 +67,10 @@ struct esp_mqtt_client { esp_mqtt_event_t event; bool run; outbox_handle_t outbox; + EventGroupHandle_t status_bits; }; +const static int STOPPED_BIT = BIT0; static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config); @@ -303,14 +305,17 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co client->mqtt_state.out_buffer_length = buffer_size; client->mqtt_state.connect_info = &client->connect_info; client->outbox = outbox_init(); + client->status_bits = xEventGroupCreate(); return client; } esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client) { + esp_mqtt_client_stop(client); esp_mqtt_destroy_config(client); transport_list_destroy(client->transport_list); outbox_destroy(client->outbox); + vEventGroupDelete(client->status_bits); free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); free(client); @@ -399,10 +404,14 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) return ESP_FAIL; } + + static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { const char *mqtt_topic, *mqtt_data; - uint16_t mqtt_topic_length, mqtt_data_length, mqtt_data_total_length, mqtt_data_offset; + uint16_t mqtt_topic_length, mqtt_data_length, total_mqtt_len = 0; + uint16_t mqtt_len, mqtt_offset = 0; + int len_read; do { @@ -411,17 +420,28 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i mqtt_data_length = length; mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); - mqtt_data_offset += mqtt_data_length; - - ESP_LOGI(TAG, "Get data len= %d, topic len=%d", mqtt_data_length, mqtt_topic_length); - client->event.event_id = MQTT_EVENT_DATA; - esp_mqtt_dispatch_event(client); - - if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) { - break; + if (total_mqtt_len == 0) { + total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; + mqtt_len = mqtt_data_length; + } else { + mqtt_len = len_read; } - int len_read = transport_read(client->transport, + ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_data_length, mqtt_topic_length); + client->event.event_id = MQTT_EVENT_DATA; + client->event.data = (char *)mqtt_data; + client->event.data_len = mqtt_len; + client->event.total_data_len = total_mqtt_len; + client->event.current_data_offset = mqtt_offset; + client->event.topic = (char *)mqtt_topic; + client->event.topic_len = mqtt_topic_length; + esp_mqtt_dispatch_event(client); + + mqtt_offset += mqtt_len; + if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) + break; + + len_read = transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, client->config->network_timeout_ms); @@ -429,9 +449,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i ESP_LOGE(TAG, "Read error or timeout: %d", errno); break; } - // client->mqtt_state.message_length_read += len_read; + client->mqtt_state.message_length_read += len_read; } while (1); + } static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) @@ -482,7 +503,6 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) } if (read_len == 0) { - ESP_LOGW(TAG, "Read Timeout %d", platform_tick_get_ms()); return ESP_OK; } @@ -529,7 +549,6 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); - // deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); break; case MQTT_MSG_TYPE_PUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { @@ -585,6 +604,7 @@ static void esp_mqtt_task(void *pv) } client->state = MQTT_STATE_INIT; + xEventGroupClearBits(client->status_bits, STOPPED_BIT); while (client->run) { switch ((int)client->state) { @@ -641,6 +661,8 @@ static void esp_mqtt_task(void *pv) break; } } + transport_close(client->transport); + xEventGroupSetBits(client->status_bits, STOPPED_BIT); vTaskDelete(NULL); } @@ -655,6 +677,7 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) ESP_LOGE(TAG, "Error create mqtt task"); return ESP_FAIL; } + xEventGroupClearBits(client->status_bits, STOPPED_BIT); return ESP_OK; } @@ -662,6 +685,7 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) { client->run = false; + xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY); return ESP_OK; } @@ -731,7 +755,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) { - int pending_msg_id; + uint16_t pending_msg_id = 0; if (client->state != MQTT_STATE_CONNECTED) { ESP_LOGE(TAG, "Client has not connected"); return -1;