forked from espressif/esp-mqtt
Merge branch 'bugfix/report_enqueue_error' into 'master'
Fix: Return an error when fail to enqueue See merge request espressif/esp-mqtt!103
This commit is contained in:
@ -206,18 +206,18 @@ static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle
|
|||||||
esp_transport_ssl_enable_global_ca_store(ssl);
|
esp_transport_ssl_enable_global_ca_store(ssl);
|
||||||
} else if (cfg->crt_bundle_attach != NULL) {
|
} else if (cfg->crt_bundle_attach != NULL) {
|
||||||
#ifdef MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
|
#ifdef MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
|
||||||
esp_transport_ssl_crt_bundle_attach(ssl, cfg->crt_bundle_attach);
|
esp_transport_ssl_crt_bundle_attach(ssl, cfg->crt_bundle_attach);
|
||||||
#else
|
#else
|
||||||
ESP_LOGE(TAG, "Certificate bundle feature is not available in IDF version %s", IDF_VER);
|
ESP_LOGE(TAG, "Certificate bundle feature is not available in IDF version %s", IDF_VER);
|
||||||
goto esp_mqtt_set_transport_failed;
|
goto esp_mqtt_set_transport_failed;
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes),
|
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes),
|
||||||
goto esp_mqtt_set_transport_failed);
|
goto esp_mqtt_set_transport_failed);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (cfg->use_secure_element) {
|
if (cfg->use_secure_element) {
|
||||||
#ifdef MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT
|
#ifdef MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT
|
||||||
@ -1004,7 +1004,7 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
|
static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
|
||||||
{
|
{
|
||||||
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
|
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
|
||||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||||
@ -1018,16 +1018,14 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem
|
|||||||
msg.remaining_data = remaining_data;
|
msg.remaining_data = remaining_data;
|
||||||
msg.remaining_len = remaining_len;
|
msg.remaining_len = remaining_len;
|
||||||
//Copy to queue buffer
|
//Copy to queue buffer
|
||||||
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||||
|
|
||||||
//unlock
|
//unlock
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mqtt_enqueue(esp_mqtt_client_handle_t client)
|
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client)
|
||||||
{
|
{
|
||||||
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
||||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||||
//lock mutex
|
|
||||||
if (client->mqtt_state.pending_msg_count > 0) {
|
if (client->mqtt_state.pending_msg_count > 0) {
|
||||||
outbox_message_t msg = { 0 };
|
outbox_message_t msg = { 0 };
|
||||||
msg.data = client->mqtt_state.outbound_message->data;
|
msg.data = client->mqtt_state.outbound_message->data;
|
||||||
@ -1036,9 +1034,9 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client)
|
|||||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||||
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
||||||
//Copy to queue buffer
|
//Copy to queue buffer
|
||||||
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||||
}
|
}
|
||||||
//unlock
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1635,8 +1633,12 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
|||||||
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
mqtt_enqueue(client); //move pending msg to outbox (if have)
|
//move pending msg to outbox (if have)
|
||||||
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
if (!mqtt_enqueue(client)) {
|
||||||
|
MQTT_API_UNLOCK(client);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
|
||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
|
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
|
||||||
@ -1673,8 +1675,11 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
mqtt_enqueue(client);
|
if (!mqtt_enqueue(client)) {
|
||||||
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
|
MQTT_API_UNLOCK(client);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error
|
||||||
|
|
||||||
if (mqtt_write_data(client) != ESP_OK) {
|
if (mqtt_write_data(client) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
|
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
|
||||||
@ -1719,10 +1724,14 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
|
|||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
||||||
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
||||||
mqtt_enqueue(client);
|
if (!mqtt_enqueue(client)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||||
mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment);
|
if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user