diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 71026fc..aa2b1ec 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -46,12 +46,12 @@ typedef enum esp_mqtt_event_id_t { MQTT_EVENT_DISCONNECTED, /*!< disconnected event */ MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context: - msg_id message id - - data pointer to the received - data + - error_handle `error_type` in case subscribing failed + - data pointer to broker response, check for errors. - data_len length of the data for this event */ - MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event */ + MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event, additional context: msg_id */ MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */ MQTT_EVENT_DATA, /*!< data event, additional context: - msg_id message id @@ -112,6 +112,7 @@ typedef enum esp_mqtt_error_type_t { MQTT_ERROR_TYPE_NONE = 0, MQTT_ERROR_TYPE_TCP_TRANSPORT, MQTT_ERROR_TYPE_CONNECTION_REFUSED, + MQTT_ERROR_TYPE_SUBSCRIBE_FAILED } esp_mqtt_error_type_t; /** diff --git a/mqtt_client.c b/mqtt_client.c index 2abe1cd..5518cf1 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1,5 +1,7 @@ -#include "esp_log.h" +#include "mqtt_client.h" #include "mqtt_client_priv.h" +#include "esp_log.h" +#include _Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type"); #ifdef ESP_EVENT_ANY_ID @@ -588,9 +590,10 @@ void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) client->config = NULL; } -static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout) { - uint64_t next = last_tick + timeout; - return (int64_t)(next - platform_tick_get_ms()) <= 0; +static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout) +{ + uint64_t next = last_tick + timeout; + return (int64_t)(next - platform_tick_get_ms()) <= 0; } static esp_err_t process_keepalive(esp_mqtt_client_handle_t client) @@ -608,7 +611,7 @@ static esp_err_t process_keepalive(esp_mqtt_client_handle_t client) return ESP_OK; } - if (has_timed_out(client->keepalive_tick, keepalive_ms/2)) { + if (has_timed_out(client->keepalive_tick, keepalive_ms / 2)) { if (esp_mqtt_client_ping(client) == ESP_FAIL) { ESP_LOGE(TAG, "Can't send ping, disconnected"); esp_mqtt_abort_connection(client); @@ -649,11 +652,11 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->mqtt_state.outbound_message = mqtt5_msg_connect(&client->mqtt_state.mqtt_connection, - &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info); + &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info); #endif } else { client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, - &client->connect_info); + &client->connect_info); } if (client->mqtt_state.outbound_message->length == 0) { ESP_LOGE(TAG, "Connect message cannot be created"); @@ -664,11 +667,11 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); + client->mqtt_state.outbound_message->length); #endif } else { client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); + client->mqtt_state.outbound_message->length); } ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X", client->mqtt_state.pending_msg_type, @@ -768,11 +771,11 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co { esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client), #if MQTT_EVENT_QUEUE_SIZE > 1 - // if supporting multiple queued events, we keep track of them - // using atomic variable, so need to make sure it won't get allocated in PSRAM - MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT); + // if supporting multiple queued events, we keep track of them + // using atomic variable, so need to make sure it won't get allocated in PSRAM + MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT); #else - MALLOC_CAP_DEFAULT); + MALLOC_CAP_DEFAULT); #endif ESP_MEM_CHECK(TAG, client, return NULL); if (!create_client_data(client)) { @@ -1049,8 +1052,8 @@ post_data_event: msg_topic_len = 0; msg_data_offset += msg_data_len; int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, - msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, - client->config->network_timeout_ms); + msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, + client->config->network_timeout_ms); if (ret <= 0) { return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL; } @@ -1079,7 +1082,15 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) ESP_LOGE(TAG, "Failed to acquire suback data"); return ESP_FAIL; } + client->event.error_handle->esp_tls_stack_err = 0; + client->event.error_handle->esp_tls_last_esp_err = 0; + client->event.error_handle->esp_tls_cert_verify_flags = 0; + client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE; + client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED; // post data event + if ((uint8_t)*msg_data >= 0x80) { + client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED; + } client->event.data_len = msg_data_len; client->event.total_data_len = msg_data_len; client->event.event_id = MQTT_EVENT_SUBSCRIBED; @@ -1425,7 +1436,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item // set duplicate flag for QoS-1 and QoS-2 messages if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) { mqtt_set_dup(client->mqtt_state.outbound_message->data); - ESP_LOGD(TAG,"Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id); + ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id); } // try to resend the data @@ -1477,9 +1488,9 @@ static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_time { return #if MQTT_EVENT_QUEUE_SIZE > 1 - atomic_load(&client->queued_events) > 0 ? 10: max_timeout; + atomic_load(&client->queued_events) > 0 ? 10 : max_timeout; #else - max_timeout; + max_timeout; #endif } @@ -1487,7 +1498,7 @@ static inline void run_event_loop(esp_mqtt_client_handle_t client) { #if MQTT_EVENT_QUEUE_SIZE > 1 if (atomic_load(&client->queued_events) > 0) { - atomic_fetch_sub(&client->queued_events, 1); + atomic_fetch_sub(&client->queued_events, 1); #else { #endif @@ -1599,7 +1610,7 @@ static void esp_mqtt_task(void *pv) } if (client->config->refresh_connection_after_ms && - has_timed_out(client->refresh_connection_tick, client->config->refresh_connection_after_ms)) { + has_timed_out(client->refresh_connection_tick, client->config->refresh_connection_after_ms)) { ESP_LOGD(TAG, "Refreshing the connection..."); esp_mqtt_abort_connection(client); client->state = MQTT_STATE_INIT; @@ -1805,16 +1816,16 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic return -1; } client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection, - topic, qos, - &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); + topic, qos, + &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); if (client->mqtt_state.outbound_message->length) { client->mqtt5_config->subscribe_property_info = NULL; } #endif } else { client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, - topic, qos, - &client->mqtt_state.pending_msg_id); + topic, qos, + &client->mqtt_state.pending_msg_id); } if (client->mqtt_state.outbound_message->length == 0) { ESP_LOGE(TAG, "Subscribe message cannot be created"); @@ -1857,16 +1868,16 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->mqtt_state.outbound_message = mqtt5_msg_unsubscribe(&client->mqtt_state.mqtt_connection, - topic, - &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); + topic, + &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); if (client->mqtt_state.outbound_message->length) { client->mqtt5_config->unsubscribe_property_info = NULL; } #endif } else { client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection, - topic, - &client->mqtt_state.pending_msg_id); + topic, + &client->mqtt_state.pending_msg_id); } if (client->mqtt_state.outbound_message->length == 0) { MQTT_API_UNLOCK(client); @@ -1901,18 +1912,18 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->mqtt_state.outbound_message = mqtt5_msg_publish(&client->mqtt_state.mqtt_connection, - topic, data, len, - qos, retain, - &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); + topic, data, len, + qos, retain, + &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); if (client->mqtt_state.outbound_message->length) { client->mqtt5_config->publish_property_info = NULL; } #endif } else { client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, - topic, data, len, - qos, retain, - &pending_msg_id); + topic, data, len, + qos, retain, + &pending_msg_id); } if (client->mqtt_state.outbound_message->length == 0) {