add payload to MQTT_EVENT_SUBSCRIBE

+ documentation
+ cleanup logging

Closes https://github.com/espressif/esp-mqtt/issues/200
Merges https://github.com/espressif/esp-mqtt/pull/203
This commit is contained in:
Bert Melis
2021-09-20 14:51:21 +02:00
committed by David Cermak
parent e1d5a9402f
commit de47f1c341
4 changed files with 49 additions and 6 deletions

View File

@ -39,7 +39,11 @@ typedef enum {
MQTT_EVENT_ERROR = 0, /*!< on error event, additional context: connection return code, error handle from esp_tls (if supported) */
MQTT_EVENT_CONNECTED, /*!< connected event, additional context: session_present flag */
MQTT_EVENT_DISCONNECTED, /*!< disconnected event */
MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context: msg_id */
MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context:
- msg_id message id
- data pointer to the received data
- data_len length of the data for this event
*/
MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event */
MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */
MQTT_EVENT_DATA, /*!< data event, additional context:
@ -51,6 +55,8 @@ typedef enum {
- current_data_offset offset of the current data for this event
- total_data_len total length of the data received
- retain retain flag of the message
- qos qos level of the message
- dup dup flag of the message
Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is
longer than internal buffer. In that case only first event contains topic
pointer and length, other contain data only with current data length
@ -154,7 +160,7 @@ typedef struct {
esp_mqtt_error_codes_t *error_handle; /*!< esp-mqtt error handle including esp-tls errors as well as internal mqtt errors */
bool retain; /*!< Retained flag of the message associated with this event */
int qos; /*!< qos of the messages associated with this event */
int dup; /*!< Dup flag of the message associated with this event */
bool dup; /*!< dup flag of the message associated with this event */
} esp_mqtt_event_t;
typedef esp_mqtt_event_t *esp_mqtt_event_handle_t;

View File

@ -128,6 +128,7 @@ bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length);
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len);
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length);
char *mqtt_get_publish_data(uint8_t *buffer, size_t *length);
char *mqtt_get_suback_data(uint8_t *buffer, size_t *length);
uint16_t mqtt_get_id(uint8_t *buffer, size_t length);
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length);

View File

@ -272,6 +272,18 @@ char *mqtt_get_publish_data(uint8_t *buffer, size_t *length)
return (char *)(buffer + i);
}
char *mqtt_get_suback_data(uint8_t *buffer, size_t *length)
{
// SUBACK payload length = total length - (fixed header (2 bytes) + variable header (2 bytes))
// This requires the remaining length to be encoded in 1 byte.
if (*length > 4) {
*length -= 4;
return (char *)(buffer + 4);
}
*length = 0;
return NULL;
}
uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
{
if (length < 1) {

View File

@ -982,7 +982,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.retain = mqtt_get_retain(msg_buf);
client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len);
client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buff);
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
@ -1016,6 +1016,28 @@ post_data_event:
return ESP_OK;
}
static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
char *msg_data = NULL;
msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len);
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Failed to acquire suback data");
return ESP_FAIL;
}
// post data event
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
client->event.data = msg_data;
client->event.current_data_offset = 0;
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_OK;
}
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
{
ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count);
@ -1228,9 +1250,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
ESP_LOGD(TAG, "deliver_suback, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
if (deliver_suback(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id);
return ESP_FAIL;
}
}
break;
case MQTT_MSG_TYPE_UNSUBACK: