mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-11-03 16:42:33 +01:00
Add error handling for mqtt, fixed some issues
This commit is contained in:
136
mqtt_client.c
136
mqtt_client.c
@@ -84,8 +84,11 @@ static char *create_string(const char *ptr, int len);
|
||||
static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
||||
{
|
||||
//Copy user configurations to client context
|
||||
esp_err_t err = ESP_OK;
|
||||
mqtt_config_storage_t *cfg = calloc(1, sizeof(mqtt_config_storage_t));
|
||||
mem_assert(cfg);
|
||||
ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM);
|
||||
|
||||
client->config = cfg;
|
||||
|
||||
cfg->task_prio = config->task_prio;
|
||||
if (cfg->task_prio <= 0) {
|
||||
@@ -96,42 +99,49 @@ static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_
|
||||
if (cfg->task_stack == 0) {
|
||||
cfg->task_stack = MQTT_TASK_STACK;
|
||||
}
|
||||
|
||||
if (config->host[0]) {
|
||||
err = ESP_ERR_NO_MEM;
|
||||
if (config->host) {
|
||||
cfg->host = strdup(config->host);
|
||||
ESP_MEM_CHECK(TAG, cfg->host, goto _mqtt_set_config_failed);
|
||||
}
|
||||
cfg->port = config->port;
|
||||
|
||||
if (config->username[0]) {
|
||||
if (config->username) {
|
||||
client->connect_info.username = strdup(config->username);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.username, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->password[0]) {
|
||||
if (config->password) {
|
||||
client->connect_info.password = strdup(config->password);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.password, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->client_id[0]) {
|
||||
if (config->client_id) {
|
||||
client->connect_info.client_id = strdup(config->client_id);
|
||||
} else {
|
||||
client->connect_info.client_id = platform_create_id_string();
|
||||
}
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
|
||||
ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id);
|
||||
|
||||
if (config->uri[0]) {
|
||||
if (config->uri) {
|
||||
cfg->uri = strdup(config->uri);
|
||||
ESP_MEM_CHECK(TAG, cfg->uri, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->lwt_topic[0]) {
|
||||
if (config->lwt_topic) {
|
||||
client->connect_info.will_topic = strdup(config->lwt_topic);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_topic, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->lwt_msg_len) {
|
||||
client->connect_info.will_message = malloc(config->lwt_msg_len);
|
||||
mem_assert(client->connect_info.will_message);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
|
||||
memcpy(client->connect_info.will_message, config->lwt_msg, config->lwt_msg_len);
|
||||
client->connect_info.will_length = config->lwt_msg_len;
|
||||
} else if (config->lwt_msg[0]) {
|
||||
} else if (config->lwt_msg) {
|
||||
client->connect_info.will_message = strdup(config->lwt_msg);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
|
||||
client->connect_info.will_length = strlen(config->lwt_msg);
|
||||
}
|
||||
|
||||
@@ -154,40 +164,25 @@ static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_
|
||||
cfg->auto_reconnect = false;
|
||||
}
|
||||
|
||||
client->config = cfg;
|
||||
return ESP_OK;
|
||||
|
||||
return err;
|
||||
_mqtt_set_config_failed:
|
||||
esp_mqtt_destroy_config(client);
|
||||
return err;
|
||||
}
|
||||
|
||||
static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
mqtt_config_storage_t *cfg = client->config;
|
||||
if (cfg->host) {
|
||||
free(cfg->host);
|
||||
}
|
||||
if (cfg->uri) {
|
||||
free(cfg->uri);
|
||||
}
|
||||
if (cfg->path) {
|
||||
free(cfg->path);
|
||||
}
|
||||
if (cfg->scheme) {
|
||||
free(cfg->scheme);
|
||||
}
|
||||
if (client->connect_info.will_topic) {
|
||||
free(client->connect_info.will_topic);
|
||||
}
|
||||
if (client->connect_info.will_message) {
|
||||
free(client->connect_info.will_message);
|
||||
}
|
||||
if (client->connect_info.client_id) {
|
||||
free(client->connect_info.client_id);
|
||||
}
|
||||
if (client->connect_info.username) {
|
||||
free(client->connect_info.username);
|
||||
}
|
||||
if (client->connect_info.password) {
|
||||
free(client->connect_info.password);
|
||||
}
|
||||
free(cfg->host);
|
||||
free(cfg->uri);
|
||||
free(cfg->path);
|
||||
free(cfg->scheme);
|
||||
free(client->connect_info.will_topic);
|
||||
free(client->connect_info.will_message);
|
||||
free(client->connect_info.client_id);
|
||||
free(client->connect_info.username);
|
||||
free(client->connect_info.password);
|
||||
free(client->config);
|
||||
return ESP_OK;
|
||||
}
|
||||
@@ -267,30 +262,36 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
|
||||
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
|
||||
{
|
||||
esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client));
|
||||
mem_assert(client);
|
||||
ESP_MEM_CHECK(TAG, client, return NULL);
|
||||
|
||||
esp_mqtt_set_config(client, config);
|
||||
|
||||
|
||||
client->transport_list = transport_list_init();
|
||||
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
|
||||
|
||||
transport_handle_t tcp = transport_tcp_init();
|
||||
ESP_MEM_CHECK(TAG, tcp, goto _mqtt_init_failed);
|
||||
transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
|
||||
transport_list_add(client->transport_list, tcp, "mqtt");
|
||||
if (config->transport == MQTT_TRANSPORT_OVER_TCP) {
|
||||
client->config->scheme = create_string("mqtt", 4);
|
||||
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
|
||||
}
|
||||
|
||||
#if MQTT_ENABLE_WS
|
||||
transport_handle_t ws = transport_ws_init(tcp);
|
||||
ESP_MEM_CHECK(TAG, ws, goto _mqtt_init_failed);
|
||||
transport_set_default_port(ws, MQTT_WS_DEFAULT_PORT);
|
||||
transport_list_add(client->transport_list, ws, "ws");
|
||||
if (config->transport == MQTT_TRANSPORT_OVER_WS) {
|
||||
client->config->scheme = create_string("ws", 2);
|
||||
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
|
||||
}
|
||||
#endif
|
||||
|
||||
#if MQTT_ENABLE_SSL
|
||||
transport_handle_t ssl = transport_ssl_init();
|
||||
ESP_MEM_CHECK(TAG, ssl, goto _mqtt_init_failed);
|
||||
transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
|
||||
if (config->cert_pem) {
|
||||
transport_ssl_set_cert_data(ssl, config->cert_pem, strlen(config->cert_pem));
|
||||
@@ -298,25 +299,29 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
||||
transport_list_add(client->transport_list, ssl, "mqtts");
|
||||
if (config->transport == MQTT_TRANSPORT_OVER_SSL) {
|
||||
client->config->scheme = create_string("mqtts", 5);
|
||||
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
|
||||
}
|
||||
#endif
|
||||
|
||||
#if MQTT_ENABLE_WSS
|
||||
transport_handle_t wss = transport_ws_init(ssl);
|
||||
ESP_MEM_CHECK(TAG, wss, goto _mqtt_init_failed);
|
||||
transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT);
|
||||
transport_list_add(client->transport_list, wss, "wss");
|
||||
if (config->transport == MQTT_TRANSPORT_OVER_WSS) {
|
||||
client->config->scheme = create_string("wss", 3);
|
||||
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
|
||||
}
|
||||
#endif
|
||||
if (client->config->uri) {
|
||||
if (esp_mqtt_client_set_uri(client, client->config->uri) != ESP_OK) {
|
||||
return NULL;
|
||||
goto _mqtt_init_failed;
|
||||
}
|
||||
}
|
||||
|
||||
if (client->config->scheme == NULL) {
|
||||
client->config->scheme = create_string("mqtt", 4);
|
||||
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
|
||||
}
|
||||
|
||||
client->keepalive_tick = platform_tick_get_ms();
|
||||
@@ -328,15 +333,21 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
||||
}
|
||||
|
||||
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
|
||||
mem_assert(client->mqtt_state.in_buffer);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed);
|
||||
client->mqtt_state.in_buffer_length = buffer_size;
|
||||
client->mqtt_state.out_buffer = (uint8_t *)malloc(buffer_size);
|
||||
mem_assert(client->mqtt_state.out_buffer);
|
||||
ESP_MEM_CHECK(TAG, client->mqtt_state.out_buffer, goto _mqtt_init_failed);
|
||||
|
||||
client->mqtt_state.out_buffer_length = buffer_size;
|
||||
client->mqtt_state.connect_info = &client->connect_info;
|
||||
client->outbox = outbox_init();
|
||||
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
|
||||
client->status_bits = xEventGroupCreate();
|
||||
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
|
||||
return client;
|
||||
_mqtt_init_failed:
|
||||
esp_mqtt_client_destroy(client);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
|
||||
@@ -359,7 +370,7 @@ static char *create_string(const char *ptr, int len)
|
||||
return NULL;
|
||||
}
|
||||
ret = calloc(1, len + 1);
|
||||
mem_assert(ret);
|
||||
ESP_MEM_CHECK(TAG, ret, return NULL);
|
||||
memcpy(ret, ptr, len);
|
||||
return ret;
|
||||
}
|
||||
@@ -396,10 +407,8 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
||||
}
|
||||
}
|
||||
|
||||
char *port = create_string(uri + puri.field_data[UF_PORT].off, puri.field_data[UF_PORT].len);
|
||||
if (port) {
|
||||
client->config->port = atoi(port);
|
||||
free(port);
|
||||
if (puri.field_data[UF_PORT].len) {
|
||||
client->config->port = strtol((const char*)(uri + puri.field_data[UF_PORT].off), NULL, 10);
|
||||
}
|
||||
|
||||
char *user_info = create_string(uri + puri.field_data[UF_USERINFO].off, puri.field_data[UF_USERINFO].len);
|
||||
@@ -442,7 +451,7 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
|
||||
return client->config->event_handle(&client->event);
|
||||
}
|
||||
return ESP_FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -455,7 +464,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
||||
|
||||
do
|
||||
{
|
||||
if (total_mqtt_len == 0){
|
||||
if (total_mqtt_len == 0) {
|
||||
mqtt_topic_length = length;
|
||||
mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length);
|
||||
mqtt_data_length = length;
|
||||
@@ -478,14 +487,15 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
||||
esp_mqtt_dispatch_event(client);
|
||||
|
||||
mqtt_offset += mqtt_len;
|
||||
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length)
|
||||
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
|
||||
break;
|
||||
}
|
||||
|
||||
len_read = transport_read(client->transport,
|
||||
(char *)client->mqtt_state.in_buffer,
|
||||
client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ?
|
||||
client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
|
||||
client->config->network_timeout_ms);
|
||||
(char *)client->mqtt_state.in_buffer,
|
||||
client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ?
|
||||
client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
|
||||
client->config->network_timeout_ms);
|
||||
if (len_read <= 0) {
|
||||
ESP_LOGE(TAG, "Read error or timeout: %d", errno);
|
||||
break;
|
||||
@@ -517,7 +527,7 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int
|
||||
static void mqtt_enqueue(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
//lock mutex
|
||||
if (client->mqtt_state.pending_msg_count > 0) {
|
||||
//Copy to queue buffer
|
||||
@@ -596,8 +606,8 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
}
|
||||
|
||||
break;
|
||||
@@ -616,8 +626,8 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBREL, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGREQ:
|
||||
@@ -642,7 +652,7 @@ static void esp_mqtt_task(void *pv)
|
||||
client->transport = transport_list_get_transport(client->transport_list, client->config->scheme);
|
||||
|
||||
if (client->transport == NULL) {
|
||||
ESP_LOGE(TAG, "There are no transports valid, stop mqtt client");
|
||||
ESP_LOGE(TAG, "There are no transports valid, stop mqtt client, config scheme = %s", client->config->scheme);
|
||||
client->run = false;
|
||||
}
|
||||
//default port
|
||||
@@ -711,7 +721,7 @@ static void esp_mqtt_task(void *pv)
|
||||
client->reconnect_tick = platform_tick_get_ms();
|
||||
ESP_LOGD(TAG, "Reconnecting...");
|
||||
}
|
||||
vTaskDelay(client->wait_timeout_ms/2/portTICK_RATE_MS);
|
||||
vTaskDelay(client->wait_timeout_ms / 2 / portTICK_RATE_MS);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user