diff --git a/mqtt_client.c b/mqtt_client.c index a95f4b3..22490eb 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -122,7 +122,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) { - MQTT_API_LOCK(client); + MQTT_API_LOCK_FROM_OTHER_TASK(client); //Copy user configurations to client context esp_err_t err = ESP_OK; mqtt_config_storage_t *cfg; @@ -131,7 +131,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } else { cfg = calloc(1, sizeof(mqtt_config_storage_t)); ESP_MEM_CHECK(TAG, cfg, { - MQTT_API_UNLOCK(client); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return ESP_ERR_NO_MEM; }); client->config = cfg; @@ -229,14 +229,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl if (config->event_handle) { cfg->event_handle = config->event_handle; - } else { -#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP - esp_event_loop_args_t no_task_loop = { - .queue_size = 1, - .task_name = NULL, - }; - esp_event_loop_create(&no_task_loop, &cfg->event_loop_handle); -#endif } if (config->refresh_connection_after_ms) { @@ -247,11 +239,11 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl if (config->disable_auto_reconnect == cfg->auto_reconnect) { cfg->auto_reconnect = !config->disable_auto_reconnect; } - MQTT_API_UNLOCK(client); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return ESP_OK; _mqtt_set_config_failed: esp_mqtt_destroy_config(client); - MQTT_API_UNLOCK(client); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return err; } @@ -269,6 +261,11 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) free(client->connect_info.username); free(client->connect_info.password); memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t)); +#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP + if (client->config->event_loop_handle) { + esp_event_loop_delete(client->config->event_loop_handle); + } +#endif free(client->config); return ESP_OK; } @@ -312,6 +309,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len); return ESP_FAIL; } + client->mqtt_state.in_buffer_read_len = 0; connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer); switch (connect_rsp_code) { case CONNECTION_ACCEPTED: @@ -359,7 +357,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co return NULL; } esp_mqtt_set_config(client, config); - MQTT_API_LOCK(client); +#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP + esp_event_loop_args_t no_task_loop = { + .queue_size = 1, + .task_name = NULL, + }; + esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle); +#endif client->transport_list = esp_transport_list_init(); ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed); @@ -446,7 +450,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed); client->status_bits = xEventGroupCreate(); ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed); - MQTT_API_UNLOCK(client); return client; _mqtt_init_failed: esp_mqtt_client_destroy(client); @@ -498,6 +501,8 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u return ESP_FAIL; } + // This API could be also executed when client is active (need to protect config fields) + MQTT_API_LOCK_FROM_OTHER_TASK(client); // set uri overrides actual scheme, host, path if configured previously free(client->config->scheme); free(client->config->host); @@ -535,6 +540,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u free(user_info); } + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return ESP_OK; } @@ -1099,24 +1105,28 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) ESP_LOGE(TAG, "Client was not initialized"); return ESP_ERR_INVALID_ARG; } + MQTT_API_LOCK_FROM_OTHER_TASK(client); if (client->state >= MQTT_STATE_INIT) { ESP_LOGE(TAG, "Client has started"); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return ESP_FAIL; } + esp_err_t err = ESP_OK; #if MQTT_CORE_SELECTION_ENABLED ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE); if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) { ESP_LOGE(TAG, "Error create mqtt task"); - return ESP_FAIL; + err = ESP_FAIL; } #else ESP_LOGD(TAG, "Core selection disabled"); if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) { ESP_LOGE(TAG, "Error create mqtt task"); - return ESP_FAIL; + err = ESP_FAIL; } #endif - return ESP_OK; + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + return err; } esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client) @@ -1133,21 +1143,25 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client) esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) { + MQTT_API_LOCK_FROM_OTHER_TASK(client); if (client->run) { - // Notify the broker we are disconnecting - MQTT_API_LOCK_FROM_OTHER_TASK(client); - client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection); - if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error sending disconnect message"); + // Only send the disconnect message if the client is connected + if(client->state == MQTT_STATE_CONNECTED) { + // Notify the broker we are disconnecting + client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection); + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error sending disconnect message"); + } } - MQTT_API_UNLOCK_FROM_OTHER_TASK(client); client->run = false; xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY); client->state = MQTT_STATE_UNKNOWN; + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return ESP_OK; } else { ESP_LOGW(TAG, "Client asked to stop, but was not started"); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return ESP_FAIL; } } @@ -1166,11 +1180,12 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client) int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos) { + MQTT_API_LOCK_FROM_OTHER_TASK(client); if (client->state != MQTT_STATE_CONNECTED) { ESP_LOGE(TAG, "Client has not connected"); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return -1; } - MQTT_API_LOCK_FROM_OTHER_TASK(client); client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, topic, qos, &client->mqtt_state.pending_msg_id); @@ -1178,6 +1193,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; mqtt_enqueue(client); //move pending msg to outbox (if have) + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); @@ -1192,11 +1208,12 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic) { + MQTT_API_LOCK_FROM_OTHER_TASK(client); if (client->state != MQTT_STATE_CONNECTED) { + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); ESP_LOGE(TAG, "Client has not connected"); return -1; } - MQTT_API_LOCK_FROM_OTHER_TASK(client); client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection, topic, &client->mqtt_state.pending_msg_id); @@ -1205,6 +1222,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; mqtt_enqueue(client); + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);