diff --git a/mqtt_client.c b/mqtt_client.c index f015de9..997bfa9 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -206,18 +206,18 @@ static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle esp_transport_ssl_enable_global_ca_store(ssl); } else if (cfg->crt_bundle_attach != NULL) { #ifdef MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE - esp_transport_ssl_crt_bundle_attach(ssl, cfg->crt_bundle_attach); -#else - ESP_LOGE(TAG, "Certificate bundle feature is not available in IDF version %s", IDF_VER); - goto esp_mqtt_set_transport_failed; + esp_transport_ssl_crt_bundle_attach(ssl, cfg->crt_bundle_attach); +#else + ESP_LOGE(TAG, "Certificate bundle feature is not available in IDF version %s", IDF_VER); + goto esp_mqtt_set_transport_failed; #endif } else { ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes), goto esp_mqtt_set_transport_failed); - + } - + if (cfg->use_secure_element) { #ifdef MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT @@ -1004,7 +1004,7 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int return false; } -static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) +static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) { ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); @@ -1018,16 +1018,14 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem msg.remaining_data = remaining_data; msg.remaining_len = remaining_len; //Copy to queue buffer - outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); - + return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); //unlock } -static void mqtt_enqueue(esp_mqtt_client_handle_t client) +static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client) { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - //lock mutex if (client->mqtt_state.pending_msg_count > 0) { outbox_message_t msg = { 0 }; msg.data = client->mqtt_state.outbound_message->data; @@ -1036,9 +1034,9 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client) msg.msg_type = client->mqtt_state.pending_msg_type; msg.msg_qos = client->mqtt_state.pending_publish_qos; //Copy to queue buffer - outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); + return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } - //unlock + return NULL; } @@ -1635,8 +1633,12 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; - mqtt_enqueue(client); //move pending msg to outbox (if have) - outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); + //move pending msg to outbox (if have) + if (!mqtt_enqueue(client)) { + MQTT_API_UNLOCK(client); + return -1; + } + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); @@ -1673,8 +1675,11 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; - mqtt_enqueue(client); - outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); + if (!mqtt_enqueue(client)) { + MQTT_API_UNLOCK(client); + return -1; + } + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic); @@ -1719,10 +1724,14 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons client->mqtt_state.pending_msg_count ++; // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { - mqtt_enqueue(client); + if (!mqtt_enqueue(client)) { + return -1; + } } else { int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; - mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment); + if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { + return -1; + } client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; } }