diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 3595eb1..64cfb77 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -39,7 +39,11 @@ typedef enum { MQTT_EVENT_ERROR = 0, /*!< on error event, additional context: connection return code, error handle from esp_tls (if supported) */ MQTT_EVENT_CONNECTED, /*!< connected event, additional context: session_present flag */ MQTT_EVENT_DISCONNECTED, /*!< disconnected event */ - MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context: msg_id */ + MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context: + - msg_id message id + - data pointer to the received data + - data_len length of the data for this event + */ MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event */ MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */ MQTT_EVENT_DATA, /*!< data event, additional context: @@ -51,6 +55,8 @@ typedef enum { - current_data_offset offset of the current data for this event - total_data_len total length of the data received - retain retain flag of the message + - qos qos level of the message + - dup dup flag of the message Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is longer than internal buffer. In that case only first event contains topic pointer and length, other contain data only with current data length @@ -154,7 +160,7 @@ typedef struct { esp_mqtt_error_codes_t *error_handle; /*!< esp-mqtt error handle including esp-tls errors as well as internal mqtt errors */ bool retain; /*!< Retained flag of the message associated with this event */ int qos; /*!< qos of the messages associated with this event */ - int dup; /*!< Dup flag of the message associated with this event */ + bool dup; /*!< dup flag of the message associated with this event */ } esp_mqtt_event_t; typedef esp_mqtt_event_t *esp_mqtt_event_handle_t; diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index a1dc612..f409003 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -128,6 +128,7 @@ bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length); size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len); char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length); char *mqtt_get_publish_data(uint8_t *buffer, size_t *length); +char *mqtt_get_suback_data(uint8_t *buffer, size_t *length); uint16_t mqtt_get_id(uint8_t *buffer, size_t length); int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index 5f1c6fe..e7fb1ab 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -272,6 +272,18 @@ char *mqtt_get_publish_data(uint8_t *buffer, size_t *length) return (char *)(buffer + i); } +char *mqtt_get_suback_data(uint8_t *buffer, size_t *length) +{ + // SUBACK payload length = total length - (fixed header (2 bytes) + variable header (2 bytes)) + // This requires the remaining length to be encoded in 1 byte. + if (*length > 4) { + *length -= 4; + return (char *)(buffer + 4); + } + *length = 0; + return NULL; +} + uint16_t mqtt_get_id(uint8_t *buffer, size_t length) { if (length < 1) { diff --git a/mqtt_client.c b/mqtt_client.c index 76a4c9a..2422404 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -982,7 +982,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) client->event.retain = mqtt_get_retain(msg_buf); client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len); client->event.qos = mqtt_get_qos(msg_buf); - client->event.dup = mqtt_get_dup(msg_buff); + client->event.dup = mqtt_get_dup(msg_buf); client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len; post_data_event: @@ -1016,6 +1016,28 @@ post_data_event: return ESP_OK; } +static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) +{ + uint8_t *msg_buf = client->mqtt_state.in_buffer; + size_t msg_data_len = client->mqtt_state.in_buffer_read_len; + char *msg_data = NULL; + + msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len); + if (msg_data_len <= 0) { + ESP_LOGE(TAG, "Failed to acquire suback data"); + return ESP_FAIL; + } + // post data event + client->event.data_len = msg_data_len; + client->event.total_data_len = msg_data_len; + client->event.event_id = MQTT_EVENT_SUBSCRIBED; + client->event.data = msg_data; + client->event.current_data_offset = 0; + esp_mqtt_dispatch_event_with_msgid(client); + + return ESP_OK; +} + static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) { ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count); @@ -1228,9 +1250,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) switch (msg_type) { case MQTT_MSG_TYPE_SUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { - ESP_LOGD(TAG, "Subscribe successful"); - client->event.event_id = MQTT_EVENT_SUBSCRIBED; - esp_mqtt_dispatch_event_with_msgid(client); + ESP_LOGD(TAG, "deliver_suback, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length); + if (deliver_suback(client) != ESP_OK) { + ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id); + return ESP_FAIL; + } } break; case MQTT_MSG_TYPE_UNSUBACK: