mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-08-02 04:05:08 +02:00
Merge branch 'bugfix/set_error_on_subscribe_failure' into 'master'
Adds error code to MQTT_EVENT_SUBSCRIBED in case of failure See merge request espressif/esp-mqtt!143
This commit is contained in:
@@ -46,12 +46,12 @@ typedef enum esp_mqtt_event_id_t {
|
|||||||
MQTT_EVENT_DISCONNECTED, /*!< disconnected event */
|
MQTT_EVENT_DISCONNECTED, /*!< disconnected event */
|
||||||
MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context:
|
MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context:
|
||||||
- msg_id message id
|
- msg_id message id
|
||||||
- data pointer to the received
|
- error_handle `error_type` in case subscribing failed
|
||||||
data
|
- data pointer to broker response, check for errors.
|
||||||
- data_len length of the data for this
|
- data_len length of the data for this
|
||||||
event
|
event
|
||||||
*/
|
*/
|
||||||
MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event */
|
MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event, additional context: msg_id */
|
||||||
MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */
|
MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */
|
||||||
MQTT_EVENT_DATA, /*!< data event, additional context:
|
MQTT_EVENT_DATA, /*!< data event, additional context:
|
||||||
- msg_id message id
|
- msg_id message id
|
||||||
@@ -112,6 +112,7 @@ typedef enum esp_mqtt_error_type_t {
|
|||||||
MQTT_ERROR_TYPE_NONE = 0,
|
MQTT_ERROR_TYPE_NONE = 0,
|
||||||
MQTT_ERROR_TYPE_TCP_TRANSPORT,
|
MQTT_ERROR_TYPE_TCP_TRANSPORT,
|
||||||
MQTT_ERROR_TYPE_CONNECTION_REFUSED,
|
MQTT_ERROR_TYPE_CONNECTION_REFUSED,
|
||||||
|
MQTT_ERROR_TYPE_SUBSCRIBE_FAILED
|
||||||
} esp_mqtt_error_type_t;
|
} esp_mqtt_error_type_t;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@@ -1,5 +1,7 @@
|
|||||||
#include "esp_log.h"
|
#include "mqtt_client.h"
|
||||||
#include "mqtt_client_priv.h"
|
#include "mqtt_client_priv.h"
|
||||||
|
#include "esp_log.h"
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
|
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
|
||||||
#ifdef ESP_EVENT_ANY_ID
|
#ifdef ESP_EVENT_ANY_ID
|
||||||
@@ -588,9 +590,10 @@ void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
|
|||||||
client->config = NULL;
|
client->config = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout) {
|
static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout)
|
||||||
uint64_t next = last_tick + timeout;
|
{
|
||||||
return (int64_t)(next - platform_tick_get_ms()) <= 0;
|
uint64_t next = last_tick + timeout;
|
||||||
|
return (int64_t)(next - platform_tick_get_ms()) <= 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
|
static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
|
||||||
@@ -608,7 +611,7 @@ static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
|
|||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (has_timed_out(client->keepalive_tick, keepalive_ms/2)) {
|
if (has_timed_out(client->keepalive_tick, keepalive_ms / 2)) {
|
||||||
if (esp_mqtt_client_ping(client) == ESP_FAIL) {
|
if (esp_mqtt_client_ping(client) == ESP_FAIL) {
|
||||||
ESP_LOGE(TAG, "Can't send ping, disconnected");
|
ESP_LOGE(TAG, "Can't send ping, disconnected");
|
||||||
esp_mqtt_abort_connection(client);
|
esp_mqtt_abort_connection(client);
|
||||||
@@ -649,11 +652,11 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
|||||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||||
#ifdef MQTT_PROTOCOL_5
|
#ifdef MQTT_PROTOCOL_5
|
||||||
client->mqtt_state.outbound_message = mqtt5_msg_connect(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt5_msg_connect(&client->mqtt_state.mqtt_connection,
|
||||||
&client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);
|
&client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
|
||||||
&client->connect_info);
|
&client->connect_info);
|
||||||
}
|
}
|
||||||
if (client->mqtt_state.outbound_message->length == 0) {
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
ESP_LOGE(TAG, "Connect message cannot be created");
|
ESP_LOGE(TAG, "Connect message cannot be created");
|
||||||
@@ -664,11 +667,11 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
|||||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||||
#ifdef MQTT_PROTOCOL_5
|
#ifdef MQTT_PROTOCOL_5
|
||||||
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.outbound_message->data,
|
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.outbound_message->data,
|
||||||
client->mqtt_state.outbound_message->length);
|
client->mqtt_state.outbound_message->length);
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
|
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
|
||||||
client->mqtt_state.outbound_message->length);
|
client->mqtt_state.outbound_message->length);
|
||||||
}
|
}
|
||||||
ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X",
|
ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X",
|
||||||
client->mqtt_state.pending_msg_type,
|
client->mqtt_state.pending_msg_type,
|
||||||
@@ -768,11 +771,11 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
|||||||
{
|
{
|
||||||
esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client),
|
esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client),
|
||||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||||
// if supporting multiple queued events, we keep track of them
|
// if supporting multiple queued events, we keep track of them
|
||||||
// using atomic variable, so need to make sure it won't get allocated in PSRAM
|
// using atomic variable, so need to make sure it won't get allocated in PSRAM
|
||||||
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
|
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
|
||||||
#else
|
#else
|
||||||
MALLOC_CAP_DEFAULT);
|
MALLOC_CAP_DEFAULT);
|
||||||
#endif
|
#endif
|
||||||
ESP_MEM_CHECK(TAG, client, return NULL);
|
ESP_MEM_CHECK(TAG, client, return NULL);
|
||||||
if (!create_client_data(client)) {
|
if (!create_client_data(client)) {
|
||||||
@@ -1049,8 +1052,8 @@ post_data_event:
|
|||||||
msg_topic_len = 0;
|
msg_topic_len = 0;
|
||||||
msg_data_offset += msg_data_len;
|
msg_data_offset += msg_data_len;
|
||||||
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
|
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
|
||||||
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
|
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
|
||||||
client->config->network_timeout_ms);
|
client->config->network_timeout_ms);
|
||||||
if (ret <= 0) {
|
if (ret <= 0) {
|
||||||
return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL;
|
return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL;
|
||||||
}
|
}
|
||||||
@@ -1079,7 +1082,15 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
|
|||||||
ESP_LOGE(TAG, "Failed to acquire suback data");
|
ESP_LOGE(TAG, "Failed to acquire suback data");
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
client->event.error_handle->esp_tls_stack_err = 0;
|
||||||
|
client->event.error_handle->esp_tls_last_esp_err = 0;
|
||||||
|
client->event.error_handle->esp_tls_cert_verify_flags = 0;
|
||||||
|
client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE;
|
||||||
|
client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED;
|
||||||
// post data event
|
// post data event
|
||||||
|
if ((uint8_t)*msg_data >= 0x80) {
|
||||||
|
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
|
||||||
|
}
|
||||||
client->event.data_len = msg_data_len;
|
client->event.data_len = msg_data_len;
|
||||||
client->event.total_data_len = msg_data_len;
|
client->event.total_data_len = msg_data_len;
|
||||||
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
||||||
@@ -1425,7 +1436,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
|
|||||||
// set duplicate flag for QoS-1 and QoS-2 messages
|
// set duplicate flag for QoS-1 and QoS-2 messages
|
||||||
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) {
|
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) {
|
||||||
mqtt_set_dup(client->mqtt_state.outbound_message->data);
|
mqtt_set_dup(client->mqtt_state.outbound_message->data);
|
||||||
ESP_LOGD(TAG,"Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id);
|
ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to resend the data
|
// try to resend the data
|
||||||
@@ -1477,9 +1488,9 @@ static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_time
|
|||||||
{
|
{
|
||||||
return
|
return
|
||||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||||
atomic_load(&client->queued_events) > 0 ? 10: max_timeout;
|
atomic_load(&client->queued_events) > 0 ? 10 : max_timeout;
|
||||||
#else
|
#else
|
||||||
max_timeout;
|
max_timeout;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1487,7 +1498,7 @@ static inline void run_event_loop(esp_mqtt_client_handle_t client)
|
|||||||
{
|
{
|
||||||
#if MQTT_EVENT_QUEUE_SIZE > 1
|
#if MQTT_EVENT_QUEUE_SIZE > 1
|
||||||
if (atomic_load(&client->queued_events) > 0) {
|
if (atomic_load(&client->queued_events) > 0) {
|
||||||
atomic_fetch_sub(&client->queued_events, 1);
|
atomic_fetch_sub(&client->queued_events, 1);
|
||||||
#else
|
#else
|
||||||
{
|
{
|
||||||
#endif
|
#endif
|
||||||
@@ -1599,7 +1610,7 @@ static void esp_mqtt_task(void *pv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (client->config->refresh_connection_after_ms &&
|
if (client->config->refresh_connection_after_ms &&
|
||||||
has_timed_out(client->refresh_connection_tick, client->config->refresh_connection_after_ms)) {
|
has_timed_out(client->refresh_connection_tick, client->config->refresh_connection_after_ms)) {
|
||||||
ESP_LOGD(TAG, "Refreshing the connection...");
|
ESP_LOGD(TAG, "Refreshing the connection...");
|
||||||
esp_mqtt_abort_connection(client);
|
esp_mqtt_abort_connection(client);
|
||||||
client->state = MQTT_STATE_INIT;
|
client->state = MQTT_STATE_INIT;
|
||||||
@@ -1805,16 +1816,16 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic, qos,
|
topic, qos,
|
||||||
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
|
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
|
||||||
if (client->mqtt_state.outbound_message->length) {
|
if (client->mqtt_state.outbound_message->length) {
|
||||||
client->mqtt5_config->subscribe_property_info = NULL;
|
client->mqtt5_config->subscribe_property_info = NULL;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic, qos,
|
topic, qos,
|
||||||
&client->mqtt_state.pending_msg_id);
|
&client->mqtt_state.pending_msg_id);
|
||||||
}
|
}
|
||||||
if (client->mqtt_state.outbound_message->length == 0) {
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
ESP_LOGE(TAG, "Subscribe message cannot be created");
|
ESP_LOGE(TAG, "Subscribe message cannot be created");
|
||||||
@@ -1857,16 +1868,16 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||||
#ifdef MQTT_PROTOCOL_5
|
#ifdef MQTT_PROTOCOL_5
|
||||||
client->mqtt_state.outbound_message = mqtt5_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt5_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic,
|
topic,
|
||||||
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
|
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
|
||||||
if (client->mqtt_state.outbound_message->length) {
|
if (client->mqtt_state.outbound_message->length) {
|
||||||
client->mqtt5_config->unsubscribe_property_info = NULL;
|
client->mqtt5_config->unsubscribe_property_info = NULL;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
|
||||||
topic,
|
topic,
|
||||||
&client->mqtt_state.pending_msg_id);
|
&client->mqtt_state.pending_msg_id);
|
||||||
}
|
}
|
||||||
if (client->mqtt_state.outbound_message->length == 0) {
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
MQTT_API_UNLOCK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
@@ -1901,18 +1912,18 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
|
|||||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||||
#ifdef MQTT_PROTOCOL_5
|
#ifdef MQTT_PROTOCOL_5
|
||||||
client->mqtt_state.outbound_message = mqtt5_msg_publish(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt5_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||||
topic, data, len,
|
topic, data, len,
|
||||||
qos, retain,
|
qos, retain,
|
||||||
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
|
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
|
||||||
if (client->mqtt_state.outbound_message->length) {
|
if (client->mqtt_state.outbound_message->length) {
|
||||||
client->mqtt5_config->publish_property_info = NULL;
|
client->mqtt5_config->publish_property_info = NULL;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||||
topic, data, len,
|
topic, data, len,
|
||||||
qos, retain,
|
qos, retain,
|
||||||
&pending_msg_id);
|
&pending_msg_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (client->mqtt_state.outbound_message->length == 0) {
|
if (client->mqtt_state.outbound_message->length == 0) {
|
||||||
|
Reference in New Issue
Block a user