forked from espressif/esp-mqtt
Merge branch 'bugfix/mqtt_fix_close_if_never_connected_crash' into 'master'
MQTT Client: Check for connection before sending disconnect message See merge request idf/esp-mqtt!32
This commit is contained in:
@@ -122,7 +122,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
|||||||
|
|
||||||
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
||||||
{
|
{
|
||||||
MQTT_API_LOCK(client);
|
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
||||||
//Copy user configurations to client context
|
//Copy user configurations to client context
|
||||||
esp_err_t err = ESP_OK;
|
esp_err_t err = ESP_OK;
|
||||||
mqtt_config_storage_t *cfg;
|
mqtt_config_storage_t *cfg;
|
||||||
@@ -131,7 +131,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
|||||||
} else {
|
} else {
|
||||||
cfg = calloc(1, sizeof(mqtt_config_storage_t));
|
cfg = calloc(1, sizeof(mqtt_config_storage_t));
|
||||||
ESP_MEM_CHECK(TAG, cfg, {
|
ESP_MEM_CHECK(TAG, cfg, {
|
||||||
MQTT_API_UNLOCK(client);
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return ESP_ERR_NO_MEM;
|
return ESP_ERR_NO_MEM;
|
||||||
});
|
});
|
||||||
client->config = cfg;
|
client->config = cfg;
|
||||||
@@ -229,14 +229,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
|||||||
|
|
||||||
if (config->event_handle) {
|
if (config->event_handle) {
|
||||||
cfg->event_handle = config->event_handle;
|
cfg->event_handle = config->event_handle;
|
||||||
} else {
|
|
||||||
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
|
|
||||||
esp_event_loop_args_t no_task_loop = {
|
|
||||||
.queue_size = 1,
|
|
||||||
.task_name = NULL,
|
|
||||||
};
|
|
||||||
esp_event_loop_create(&no_task_loop, &cfg->event_loop_handle);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config->refresh_connection_after_ms) {
|
if (config->refresh_connection_after_ms) {
|
||||||
@@ -247,11 +239,11 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
|||||||
if (config->disable_auto_reconnect == cfg->auto_reconnect) {
|
if (config->disable_auto_reconnect == cfg->auto_reconnect) {
|
||||||
cfg->auto_reconnect = !config->disable_auto_reconnect;
|
cfg->auto_reconnect = !config->disable_auto_reconnect;
|
||||||
}
|
}
|
||||||
MQTT_API_UNLOCK(client);
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
_mqtt_set_config_failed:
|
_mqtt_set_config_failed:
|
||||||
esp_mqtt_destroy_config(client);
|
esp_mqtt_destroy_config(client);
|
||||||
MQTT_API_UNLOCK(client);
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -269,6 +261,11 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
|
|||||||
free(client->connect_info.username);
|
free(client->connect_info.username);
|
||||||
free(client->connect_info.password);
|
free(client->connect_info.password);
|
||||||
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
|
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
|
||||||
|
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
|
||||||
|
if (client->config->event_loop_handle) {
|
||||||
|
esp_event_loop_delete(client->config->event_loop_handle);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
free(client->config);
|
free(client->config);
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
@@ -312,6 +309,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
|||||||
ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len);
|
ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
client->mqtt_state.in_buffer_read_len = 0;
|
||||||
connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
|
connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
|
||||||
switch (connect_rsp_code) {
|
switch (connect_rsp_code) {
|
||||||
case CONNECTION_ACCEPTED:
|
case CONNECTION_ACCEPTED:
|
||||||
@@ -359,7 +357,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
esp_mqtt_set_config(client, config);
|
esp_mqtt_set_config(client, config);
|
||||||
MQTT_API_LOCK(client);
|
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
|
||||||
|
esp_event_loop_args_t no_task_loop = {
|
||||||
|
.queue_size = 1,
|
||||||
|
.task_name = NULL,
|
||||||
|
};
|
||||||
|
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
|
||||||
|
#endif
|
||||||
client->transport_list = esp_transport_list_init();
|
client->transport_list = esp_transport_list_init();
|
||||||
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
|
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
|
||||||
|
|
||||||
@@ -446,7 +450,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
|||||||
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
|
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
|
||||||
client->status_bits = xEventGroupCreate();
|
client->status_bits = xEventGroupCreate();
|
||||||
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
|
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
|
||||||
MQTT_API_UNLOCK(client);
|
|
||||||
return client;
|
return client;
|
||||||
_mqtt_init_failed:
|
_mqtt_init_failed:
|
||||||
esp_mqtt_client_destroy(client);
|
esp_mqtt_client_destroy(client);
|
||||||
@@ -498,6 +501,8 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
|||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This API could be also executed when client is active (need to protect config fields)
|
||||||
|
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
||||||
// set uri overrides actual scheme, host, path if configured previously
|
// set uri overrides actual scheme, host, path if configured previously
|
||||||
free(client->config->scheme);
|
free(client->config->scheme);
|
||||||
free(client->config->host);
|
free(client->config->host);
|
||||||
@@ -535,6 +540,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
|||||||
free(user_info);
|
free(user_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1099,24 +1105,28 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
|
|||||||
ESP_LOGE(TAG, "Client was not initialized");
|
ESP_LOGE(TAG, "Client was not initialized");
|
||||||
return ESP_ERR_INVALID_ARG;
|
return ESP_ERR_INVALID_ARG;
|
||||||
}
|
}
|
||||||
|
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
||||||
if (client->state >= MQTT_STATE_INIT) {
|
if (client->state >= MQTT_STATE_INIT) {
|
||||||
ESP_LOGE(TAG, "Client has started");
|
ESP_LOGE(TAG, "Client has started");
|
||||||
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
esp_err_t err = ESP_OK;
|
||||||
#if MQTT_CORE_SELECTION_ENABLED
|
#if MQTT_CORE_SELECTION_ENABLED
|
||||||
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
|
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
|
||||||
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
|
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
|
||||||
ESP_LOGE(TAG, "Error create mqtt task");
|
ESP_LOGE(TAG, "Error create mqtt task");
|
||||||
return ESP_FAIL;
|
err = ESP_FAIL;
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
ESP_LOGD(TAG, "Core selection disabled");
|
ESP_LOGD(TAG, "Core selection disabled");
|
||||||
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
|
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
|
||||||
ESP_LOGE(TAG, "Error create mqtt task");
|
ESP_LOGE(TAG, "Error create mqtt task");
|
||||||
return ESP_FAIL;
|
err = ESP_FAIL;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
return ESP_OK;
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
|
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
|
||||||
@@ -1133,21 +1143,25 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
|
|||||||
|
|
||||||
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
||||||
{
|
{
|
||||||
if (client->run) {
|
|
||||||
// Notify the broker we are disconnecting
|
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
||||||
|
if (client->run) {
|
||||||
|
// Only send the disconnect message if the client is connected
|
||||||
|
if(client->state == MQTT_STATE_CONNECTED) {
|
||||||
|
// Notify the broker we are disconnecting
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
|
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error sending disconnect message");
|
ESP_LOGE(TAG, "Error sending disconnect message");
|
||||||
}
|
}
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
}
|
||||||
|
|
||||||
client->run = false;
|
client->run = false;
|
||||||
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
|
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
|
||||||
client->state = MQTT_STATE_UNKNOWN;
|
client->state = MQTT_STATE_UNKNOWN;
|
||||||
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
} else {
|
} else {
|
||||||
ESP_LOGW(TAG, "Client asked to stop, but was not started");
|
ESP_LOGW(TAG, "Client asked to stop, but was not started");
|
||||||
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1166,11 +1180,12 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
|
|||||||
|
|
||||||
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
|
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
|
||||||
{
|
{
|
||||||
|
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
||||||
if (client->state != MQTT_STATE_CONNECTED) {
|
if (client->state != MQTT_STATE_CONNECTED) {
|
||||||
ESP_LOGE(TAG, "Client has not connected");
|
ESP_LOGE(TAG, "Client has not connected");
|
||||||
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic, qos,
|
topic, qos,
|
||||||
&client->mqtt_state.pending_msg_id);
|
&client->mqtt_state.pending_msg_id);
|
||||||
@@ -1178,6 +1193,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
|||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
mqtt_enqueue(client); //move pending msg to outbox (if have)
|
mqtt_enqueue(client); //move pending msg to outbox (if have)
|
||||||
|
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
|
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
|
||||||
@@ -1192,11 +1208,12 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
|||||||
|
|
||||||
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
|
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
|
||||||
{
|
{
|
||||||
|
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
||||||
if (client->state != MQTT_STATE_CONNECTED) {
|
if (client->state != MQTT_STATE_CONNECTED) {
|
||||||
|
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
||||||
ESP_LOGE(TAG, "Client has not connected");
|
ESP_LOGE(TAG, "Client has not connected");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic,
|
topic,
|
||||||
&client->mqtt_state.pending_msg_id);
|
&client->mqtt_state.pending_msg_id);
|
||||||
@@ -1205,6 +1222,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
mqtt_enqueue(client);
|
mqtt_enqueue(client);
|
||||||
|
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
|
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
|
||||||
|
Reference in New Issue
Block a user