mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-30 10:48:06 +02:00
client locking: used recursive mutex instead to avoid getting the code too complex
This commit is contained in:
@ -17,13 +17,9 @@
|
|||||||
#ifdef MQTT_DISABLE_API_LOCKS
|
#ifdef MQTT_DISABLE_API_LOCKS
|
||||||
# define MQTT_API_LOCK(c)
|
# define MQTT_API_LOCK(c)
|
||||||
# define MQTT_API_UNLOCK(c)
|
# define MQTT_API_UNLOCK(c)
|
||||||
# define MQTT_API_LOCK_FROM_OTHER_TASK(c)
|
|
||||||
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c)
|
|
||||||
#else
|
#else
|
||||||
# define MQTT_API_LOCK(c) xSemaphoreTake(c->api_lock, portMAX_DELAY)
|
# define MQTT_API_LOCK(c) xSemaphoreTakeRecursive(c->api_lock, portMAX_DELAY)
|
||||||
# define MQTT_API_UNLOCK(c) xSemaphoreGive(c->api_lock)
|
# define MQTT_API_UNLOCK(c) xSemaphoreGiveRecursive(c->api_lock)
|
||||||
# define MQTT_API_LOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreTake(c->api_lock, portMAX_DELAY); } }
|
|
||||||
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreGive(c->api_lock); } }
|
|
||||||
#endif /* MQTT_USE_API_LOCKS */
|
#endif /* MQTT_USE_API_LOCKS */
|
||||||
|
|
||||||
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
|
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
|
||||||
@ -189,7 +185,7 @@ static inline esp_err_t esp_mqtt_set_cert_key_data(esp_transport_handle_t ssl, e
|
|||||||
|
|
||||||
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
||||||
{
|
{
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK(client);
|
||||||
//Copy user configurations to client context
|
//Copy user configurations to client context
|
||||||
esp_err_t err = ESP_OK;
|
esp_err_t err = ESP_OK;
|
||||||
mqtt_config_storage_t *cfg;
|
mqtt_config_storage_t *cfg;
|
||||||
@ -198,7 +194,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
|||||||
} else {
|
} else {
|
||||||
cfg = calloc(1, sizeof(mqtt_config_storage_t));
|
cfg = calloc(1, sizeof(mqtt_config_storage_t));
|
||||||
ESP_MEM_CHECK(TAG, cfg, {
|
ESP_MEM_CHECK(TAG, cfg, {
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return ESP_ERR_NO_MEM;
|
return ESP_ERR_NO_MEM;
|
||||||
});
|
});
|
||||||
client->config = cfg;
|
client->config = cfg;
|
||||||
@ -403,8 +399,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
|||||||
goto _mqtt_set_config_failed;
|
goto _mqtt_set_config_failed;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
|
||||||
|
|
||||||
// Set uri at the end of config to override separately configured uri elements
|
// Set uri at the end of config to override separately configured uri elements
|
||||||
if (config->uri) {
|
if (config->uri) {
|
||||||
if (esp_mqtt_client_set_uri(client, cfg->uri) != ESP_OK) {
|
if (esp_mqtt_client_set_uri(client, cfg->uri) != ESP_OK) {
|
||||||
@ -413,10 +407,12 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MQTT_API_UNLOCK(client);
|
||||||
|
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
_mqtt_set_config_failed:
|
_mqtt_set_config_failed:
|
||||||
esp_mqtt_destroy_config(client);
|
esp_mqtt_destroy_config(client);
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -550,7 +546,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
|||||||
free(client);
|
free(client);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
client->api_lock = xSemaphoreCreateMutex();
|
client->api_lock = xSemaphoreCreateRecursiveMutex();
|
||||||
if (!client->api_lock) {
|
if (!client->api_lock) {
|
||||||
free(client->event.error_handle);
|
free(client->event.error_handle);
|
||||||
free(client);
|
free(client);
|
||||||
@ -682,7 +678,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This API could be also executed when client is active (need to protect config fields)
|
// This API could be also executed when client is active (need to protect config fields)
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK(client);
|
||||||
// set uri overrides actual scheme, host, path if configured previously
|
// set uri overrides actual scheme, host, path if configured previously
|
||||||
free(client->config->scheme);
|
free(client->config->scheme);
|
||||||
free(client->config->host);
|
free(client->config->host);
|
||||||
@ -720,7 +716,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
|||||||
free(user_info);
|
free(user_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1319,10 +1315,10 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
|
|||||||
ESP_LOGE(TAG, "Client was not initialized");
|
ESP_LOGE(TAG, "Client was not initialized");
|
||||||
return ESP_ERR_INVALID_ARG;
|
return ESP_ERR_INVALID_ARG;
|
||||||
}
|
}
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK(client);
|
||||||
if (client->state >= MQTT_STATE_INIT) {
|
if (client->state >= MQTT_STATE_INIT) {
|
||||||
ESP_LOGE(TAG, "Client has started");
|
ESP_LOGE(TAG, "Client has started");
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
esp_err_t err = ESP_OK;
|
esp_err_t err = ESP_OK;
|
||||||
@ -1339,7 +1335,7 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
|
|||||||
err = ESP_FAIL;
|
err = ESP_FAIL;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1365,7 +1361,7 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
|
|||||||
|
|
||||||
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
||||||
{
|
{
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK(client);
|
||||||
if (client->run) {
|
if (client->run) {
|
||||||
// Only send the disconnect message if the client is connected
|
// Only send the disconnect message if the client is connected
|
||||||
if(client->state == MQTT_STATE_CONNECTED) {
|
if(client->state == MQTT_STATE_CONNECTED) {
|
||||||
@ -1373,7 +1369,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
|||||||
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
|
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
|
||||||
if (client->mqtt_state.outbound_message->length == 0) {
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
ESP_LOGE(TAG, "Disconnect message cannot be created");
|
ESP_LOGE(TAG, "Disconnect message cannot be created");
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
@ -1383,12 +1379,12 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
|
|||||||
|
|
||||||
client->run = false;
|
client->run = false;
|
||||||
client->state = MQTT_STATE_UNKNOWN;
|
client->state = MQTT_STATE_UNKNOWN;
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
|
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
} else {
|
} else {
|
||||||
ESP_LOGW(TAG, "Client asked to stop, but was not started");
|
ESP_LOGW(TAG, "Client asked to stop, but was not started");
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1411,10 +1407,10 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
|
|||||||
|
|
||||||
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
|
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
|
||||||
{
|
{
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK(client);
|
||||||
if (client->state != MQTT_STATE_CONNECTED) {
|
if (client->state != MQTT_STATE_CONNECTED) {
|
||||||
ESP_LOGE(TAG, "Client has not connected");
|
ESP_LOGE(TAG, "Client has not connected");
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||||
@ -1432,20 +1428,20 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
|||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
|
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return client->mqtt_state.pending_msg_id;
|
return client->mqtt_state.pending_msg_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
|
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
|
||||||
{
|
{
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK(client);
|
||||||
if (client->state != MQTT_STATE_CONNECTED) {
|
if (client->state != MQTT_STATE_CONNECTED) {
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
ESP_LOGE(TAG, "Client has not connected");
|
ESP_LOGE(TAG, "Client has not connected");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -1465,12 +1461,12 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
|
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
|
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return client->mqtt_state.pending_msg_id;
|
return client->mqtt_state.pending_msg_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1488,7 +1484,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
len = strlen(data);
|
len = strlen(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTT_API_LOCK_FROM_OTHER_TASK(client);
|
MQTT_API_LOCK(client);
|
||||||
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||||
topic, data, len,
|
topic, data, len,
|
||||||
qos, retain,
|
qos, retain,
|
||||||
@ -1496,7 +1492,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
|
|
||||||
if (publish_msg->length == 0) {
|
if (publish_msg->length == 0) {
|
||||||
ESP_LOGE(TAG, "Publish message cannot be created");
|
ESP_LOGE(TAG, "Publish message cannot be created");
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
/* We have to set as pending all the qos>0 messages */
|
/* We have to set as pending all the qos>0 messages */
|
||||||
@ -1573,14 +1569,14 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
|
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
|
||||||
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
|
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
|
||||||
}
|
}
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return pending_msg_id;
|
return pending_msg_id;
|
||||||
|
|
||||||
cannot_publish:
|
cannot_publish:
|
||||||
if (qos == 0) {
|
if (qos == 0) {
|
||||||
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
|
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
|
||||||
}
|
}
|
||||||
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user