fixed possible race conditions in public API of mqtt_client

- Modified most of the public API to lock even if checking for connection state
- Updated `esp_mqtt_set_config` locks to be used also from callbacks (in mqtt-task context)
- Moved initialization of event_loop out of esp_mqtt_set_config
This commit is contained in:
David Cermak
2019-06-18 10:19:44 +02:00
parent 7223302deb
commit 82fa03a508

View File

@ -122,7 +122,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
MQTT_API_LOCK(client);
MQTT_API_LOCK_FROM_OTHER_TASK(client);
//Copy user configurations to client context
esp_err_t err = ESP_OK;
mqtt_config_storage_t *cfg;
@ -131,7 +131,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
} else {
cfg = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, cfg, {
MQTT_API_UNLOCK(client);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return ESP_ERR_NO_MEM;
});
client->config = cfg;
@ -229,14 +229,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
if (config->event_handle) {
cfg->event_handle = config->event_handle;
} else {
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = 1,
.task_name = NULL,
};
esp_event_loop_create(&no_task_loop, &cfg->event_loop_handle);
#endif
}
if (config->refresh_connection_after_ms) {
@ -247,11 +239,11 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
if (config->disable_auto_reconnect == cfg->auto_reconnect) {
cfg->auto_reconnect = !config->disable_auto_reconnect;
}
MQTT_API_UNLOCK(client);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return ESP_OK;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
MQTT_API_UNLOCK(client);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return err;
}
@ -269,6 +261,11 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
free(client->connect_info.username);
free(client->connect_info.password);
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if (client->config->event_loop_handle) {
esp_event_loop_delete(client->config->event_loop_handle);
}
#endif
free(client->config);
return ESP_OK;
}
@ -359,7 +356,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
return NULL;
}
esp_mqtt_set_config(client, config);
MQTT_API_LOCK(client);
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = 1,
.task_name = NULL,
};
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
#endif
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
@ -446,7 +449,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
MQTT_API_UNLOCK(client);
return client;
_mqtt_init_failed:
esp_mqtt_client_destroy(client);
@ -498,6 +500,8 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
return ESP_FAIL;
}
// This API could be also executed when client is active (need to protect config fields)
MQTT_API_LOCK_FROM_OTHER_TASK(client);
// set uri overrides actual scheme, host, path if configured previously
free(client->config->scheme);
free(client->config->host);
@ -535,6 +539,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
free(user_info);
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return ESP_OK;
}
@ -1099,24 +1104,28 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK_FROM_OTHER_TASK(client);
if (client->state >= MQTT_STATE_INIT) {
ESP_LOGE(TAG, "Client has started");
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return ESP_FAIL;
}
esp_err_t err = ESP_OK;
#if MQTT_CORE_SELECTION_ENABLED
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
return ESP_FAIL;
err = ESP_FAIL;
}
#else
ESP_LOGD(TAG, "Core selection disabled");
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
return ESP_FAIL;
err = ESP_FAIL;
}
#endif
return ESP_OK;
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return err;
}
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
@ -1133,24 +1142,25 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
{
MQTT_API_LOCK_FROM_OTHER_TASK(client);
if (client->run) {
// Only send the disconnect message if the client is connected
if(client->state == MQTT_STATE_CONNECTED) {
// Notify the broker we are disconnecting
MQTT_API_LOCK_FROM_OTHER_TASK(client);
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending disconnect message");
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
}
client->run = false;
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
client->state = MQTT_STATE_UNKNOWN;
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return ESP_OK;
} else {
ESP_LOGW(TAG, "Client asked to stop, but was not started");
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return ESP_FAIL;
}
}
@ -1169,11 +1179,12 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
{
MQTT_API_LOCK_FROM_OTHER_TASK(client);
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return -1;
}
MQTT_API_LOCK_FROM_OTHER_TASK(client);
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
&client->mqtt_state.pending_msg_id);
@ -1195,11 +1206,12 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
{
MQTT_API_LOCK_FROM_OTHER_TASK(client);
if (client->state != MQTT_STATE_CONNECTED) {
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
MQTT_API_LOCK_FROM_OTHER_TASK(client);
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id);