Finishing mqtt client

This commit is contained in:
Tuan PM
2018-02-16 22:48:22 +07:00
parent 3115db19ae
commit 33e2324098
6 changed files with 54 additions and 21 deletions

View File

@ -20,7 +20,7 @@
#include "esp_log.h"
#include "mqtt_client.h"
static const char *TAG = "MQTT_SAMPLE";
static const char *TAG = "MQTTS_SAMPLE";
static EventGroupHandle_t wifi_event_group;
const static int CONNECTED_BIT = BIT0;
@ -66,7 +66,6 @@ static void wifi_init(void)
ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******");
ESP_ERROR_CHECK(esp_wifi_start());
ESP_LOGI(TAG, "Waiting for wifi");
vTaskDelay(10000/portTICK_RATE_MS);
xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY);
}
@ -107,6 +106,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
@ -123,6 +124,7 @@ static void mqtt_app_start(void)
.cert_pem = (const char *)iot_eclipse_org_pem_start,
};
ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size());
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_start(client);
}
@ -143,4 +145,5 @@ void app_main()
nvs_flash_init();
wifi_init();
mqtt_app_start();
}

View File

@ -60,9 +60,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
vTaskDelay(500/portTICK_RATE_MS);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
@ -109,6 +108,7 @@ static void wifi_init(void)
ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config));
ESP_LOGI(TAG, "start the WIFI SSID:[%s] password:[%s]", CONFIG_WIFI_SSID, "******");
ESP_ERROR_CHECK(esp_wifi_start());
ESP_LOGI(TAG, "Waiting for wifi");
xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY);
}

View File

@ -27,6 +27,8 @@ typedef struct {
void *user_context;
char *data;
int data_len;
int total_data_len;
int current_data_offset;
char *topic;
int topic_len;
int msg_id;

View File

@ -6,6 +6,7 @@
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "freertos/queue.h"
#include "freertos/event_groups.h"
#include "lwip/err.h"
#include "lwip/sockets.h"

View File

@ -31,6 +31,7 @@ typedef struct {
mbedtls_net_context client_fd;
void *cert_pem_data;
int cert_pem_len;
bool ssl_initialized;
} transport_ssl_t;
static int ssl_connect(transport_handle_t t, const char *host, int port, int timeout_ms)
@ -42,6 +43,7 @@ static int ssl_connect(transport_handle_t t, const char *host, int port, int tim
if (!ssl) {
return -1;
}
ssl->ssl_initialized = true;
mbedtls_ssl_init(&ssl->ctx);
mbedtls_ctr_drbg_init(&ssl->ctr_drbg);
mbedtls_ssl_config_init(&ssl->conf);
@ -196,8 +198,8 @@ static int ssl_close(transport_handle_t t)
{
int ret = -1;
transport_ssl_t *ssl = transport_get_data(t);
if (ssl->client_fd.fd >= 0) {
if (ssl->ssl_initialized) {
ESP_LOGD(TAG, "Cleanup mbedtls");
mbedtls_ssl_close_notify(&ssl->ctx);
mbedtls_ssl_session_reset(&ssl->ctx);
mbedtls_net_free(&ssl->client_fd);
@ -206,6 +208,7 @@ static int ssl_close(transport_handle_t t)
mbedtls_ctr_drbg_free(&ssl->ctr_drbg);
mbedtls_entropy_free(&ssl->entropy);
mbedtls_ssl_free(&ssl->ctx);
ssl->ssl_initialized = false;
}
return ret;
}

View File

@ -67,8 +67,10 @@ struct esp_mqtt_client {
esp_mqtt_event_t event;
bool run;
outbox_handle_t outbox;
EventGroupHandle_t status_bits;
};
const static int STOPPED_BIT = BIT0;
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config);
@ -303,14 +305,17 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
client->mqtt_state.out_buffer_length = buffer_size;
client->mqtt_state.connect_info = &client->connect_info;
client->outbox = outbox_init();
client->status_bits = xEventGroupCreate();
return client;
}
esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
{
esp_mqtt_client_stop(client);
esp_mqtt_destroy_config(client);
transport_list_destroy(client->transport_list);
outbox_destroy(client->outbox);
vEventGroupDelete(client->status_bits);
free(client->mqtt_state.in_buffer);
free(client->mqtt_state.out_buffer);
free(client);
@ -399,10 +404,14 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
{
const char *mqtt_topic, *mqtt_data;
uint16_t mqtt_topic_length, mqtt_data_length, mqtt_data_total_length, mqtt_data_offset;
uint16_t mqtt_topic_length, mqtt_data_length, total_mqtt_len = 0;
uint16_t mqtt_len, mqtt_offset = 0;
int len_read;
do
{
@ -411,17 +420,28 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
mqtt_data_length = length;
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
mqtt_data_offset += mqtt_data_length;
ESP_LOGI(TAG, "Get data len= %d, topic len=%d", mqtt_data_length, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
esp_mqtt_dispatch_event(client);
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
break;
if (total_mqtt_len == 0) {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
} else {
mqtt_len = len_read;
}
int len_read = transport_read(client->transport,
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_data_length, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data;
client->event.data_len = mqtt_len;
client->event.total_data_len = total_mqtt_len;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
mqtt_offset += mqtt_len;
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length)
break;
len_read = transport_read(client->transport,
(char *)client->mqtt_state.in_buffer,
client->mqtt_state.in_buffer_length,
client->config->network_timeout_ms);
@ -429,9 +449,10 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
ESP_LOGE(TAG, "Read error or timeout: %d", errno);
break;
}
// client->mqtt_state.message_length_read += len_read;
client->mqtt_state.message_length_read += len_read;
} while (1);
}
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
@ -482,7 +503,6 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
}
if (read_len == 0) {
ESP_LOGW(TAG, "Read Timeout %d", platform_tick_get_ms());
return ESP_OK;
}
@ -529,7 +549,6 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
// deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
@ -585,6 +604,7 @@ static void esp_mqtt_task(void *pv)
}
client->state = MQTT_STATE_INIT;
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
while (client->run) {
switch ((int)client->state) {
@ -641,6 +661,8 @@ static void esp_mqtt_task(void *pv)
break;
}
}
transport_close(client->transport);
xEventGroupSetBits(client->status_bits, STOPPED_BIT);
vTaskDelete(NULL);
}
@ -655,6 +677,7 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
ESP_LOGE(TAG, "Error create mqtt task");
return ESP_FAIL;
}
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
return ESP_OK;
}
@ -662,6 +685,7 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
{
client->run = false;
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
return ESP_OK;
}
@ -731,7 +755,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
{
int pending_msg_id;
uint16_t pending_msg_id = 0;
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;