From 122875bf8a959719b753fb0b04f3c9a271d6f978 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Wed, 17 Aug 2022 13:41:35 -0300 Subject: [PATCH 1/3] refactor: Group access to output buffer in mqtt_connection_t - Moves mqtt_connect_info to mqtt_connection_t. - Removes outbound_message in favor of accessing it trough connection. --- lib/include/mqtt_client_priv.h | 6 +- lib/include/mqtt_msg.h | 27 ++- lib/mqtt5_msg.c | 164 ++++++++-------- lib/mqtt_msg.c | 117 +++++++----- mqtt5_client.c | 35 ++-- mqtt_client.c | 336 ++++++++++++++++----------------- 6 files changed, 343 insertions(+), 342 deletions(-) diff --git a/lib/include/mqtt_client_priv.h b/lib/include/mqtt_client_priv.h index bf54f13..2bf08ca 100644 --- a/lib/include/mqtt_client_priv.h +++ b/lib/include/mqtt_client_priv.h @@ -48,13 +48,10 @@ extern "C" { typedef struct mqtt_state { uint8_t *in_buffer; - uint8_t *out_buffer; int in_buffer_length; - int out_buffer_length; size_t message_length; size_t in_buffer_read_len; - mqtt_message_t *outbound_message; - mqtt_connection_t mqtt_connection; + mqtt_connection_t connection; uint16_t pending_msg_id; int pending_msg_type; int pending_publish_qos; @@ -106,7 +103,6 @@ struct esp_mqtt_client { esp_transport_handle_t transport; mqtt_config_storage_t *config; mqtt_state_t mqtt_state; - mqtt_connect_info_t connect_info; mqtt_client_state_t state; uint64_t refresh_connection_tick; int64_t keepalive_tick; diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 60fcf8c..f3ad000 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -68,16 +68,6 @@ typedef struct mqtt_message { size_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */ } mqtt_message_t; -typedef struct mqtt_connection { - mqtt_message_t message; -#if MQTT_MSG_ID_INCREMENTAL - uint16_t last_message_id; /*!< last used id if incremental message id configured */ -#endif - uint8_t *buffer; - size_t buffer_length; - -} mqtt_connection_t; - typedef struct mqtt_connect_info { char *client_id; char *username; @@ -90,9 +80,18 @@ typedef struct mqtt_connect_info { int will_retain; int clean_session; esp_mqtt_protocol_ver_t protocol_ver; - } mqtt_connect_info_t; +typedef struct mqtt_connection { + mqtt_message_t outbound_message; +#if MQTT_MSG_ID_INCREMENTAL + uint16_t last_message_id; /*!< last used id if incremental message id configured */ +#endif + uint8_t *buffer; + size_t buffer_length; + mqtt_connect_info_t information; + +} mqtt_connection_t; static inline int mqtt_get_type(const uint8_t *buffer) { @@ -123,7 +122,6 @@ static inline int mqtt_get_retain(const uint8_t *buffer) return (buffer[0] & 0x01); } -void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length); bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length); size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len); char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length); @@ -132,6 +130,9 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length); uint16_t mqtt_get_id(uint8_t *buffer, size_t length); int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length); +esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size); +void mqtt_connection_destroy(mqtt_connection_t *connection); + mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info); mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id); mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id); @@ -143,8 +144,6 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char * mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection); - - #ifdef __cplusplus } #endif diff --git a/lib/mqtt5_msg.c b/lib/mqtt5_msg.c index 16a479b..b6a483f 100644 --- a/lib/mqtt5_msg.c +++ b/lib/mqtt5_msg.c @@ -69,8 +69,8 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope generate_variable_len(len, &len_bytes, encoded_lens); int offset = len_bytes - 1; - connection->message.length += offset; - if (connection->message.length > connection->buffer_length) { + connection->outbound_message.length += offset; + if (connection->outbound_message.length > connection->buffer_length) { return -1; } @@ -89,33 +89,33 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope static int append_property(mqtt_connection_t *connection, uint8_t property_type, uint8_t len_occupy, const char *data, size_t data_len) { - if ((connection->message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) { + if ((connection->outbound_message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) { return -1; } - size_t origin_message_len = connection->message.length; + size_t origin_message_len = connection->outbound_message.length; if (property_type) { - connection->buffer[connection->message.length ++] = property_type; + connection->buffer[connection->outbound_message.length ++] = property_type; } if (len_occupy == 0) { uint8_t encoded_lens[4] = {0}, len_bytes = 0; generate_variable_len(data_len, &len_bytes, encoded_lens); for (int j = 0; j < len_bytes; j ++) { - connection->buffer[connection->message.length ++] = encoded_lens[j]; + connection->buffer[connection->outbound_message.length ++] = encoded_lens[j]; } } else { for (int i = 1; i <= len_occupy; i ++) { - connection->buffer[connection->message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff; + connection->buffer[connection->outbound_message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff; } } if (data) { - memcpy(connection->buffer + connection->message.length, data, data_len); - connection->message.length += data_len; + memcpy(connection->buffer + connection->outbound_message.length, data, data_len); + connection->outbound_message.length += data_len; } - return connection->message.length - origin_message_len; + return connection->outbound_message.length - origin_message_len; } static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id) @@ -130,36 +130,36 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag #endif } - if (connection->message.length + 2 > connection->buffer_length) { + if (connection->outbound_message.length + 2 > connection->buffer_length) { return 0; } - MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], message_id) + MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], message_id) return message_id; } static int init_message(mqtt_connection_t *connection) { - connection->message.length = MQTT5_MAX_FIXED_HEADER_SIZE; + connection->outbound_message.length = MQTT5_MAX_FIXED_HEADER_SIZE; return MQTT5_MAX_FIXED_HEADER_SIZE; } static mqtt_message_t *fail_message(mqtt_connection_t *connection) { - connection->message.data = connection->buffer; - connection->message.length = 0; - return &connection->message; + connection->outbound_message.data = connection->buffer; + connection->outbound_message.length = 0; + return &connection->outbound_message; } static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain) { - int message_length = connection->message.length - MQTT5_MAX_FIXED_HEADER_SIZE; + int message_length = connection->outbound_message.length - MQTT5_MAX_FIXED_HEADER_SIZE; int total_length = message_length; uint8_t encoded_lens[4] = {0}, len_bytes = 0; // Check if we have fragmented message and update total_len - if (connection->message.fragmented_msg_total_length) { - total_length = connection->message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE; + if (connection->outbound_message.fragmented_msg_total_length) { + total_length = connection->outbound_message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE; } // Encode MQTT message length @@ -171,10 +171,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int } // Save the header bytes - connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) + connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) int offs = MQTT5_MAX_FIXED_HEADER_SIZE - 1 - len_bytes; - connection->message.data = connection->buffer + offs; - connection->message.fragmented_msg_data_offset -= offs; + connection->outbound_message.data = connection->buffer + offs; + connection->outbound_message.fragmented_msg_data_offset -= offs; // type byte connection->buffer[offs ++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); // length bytes @@ -182,7 +182,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int connection->buffer[offs ++] = encoded_lens[j]; } - return &connection->message; + return &connection->outbound_message; } static esp_err_t mqtt5_msg_set_user_property(mqtt5_user_property_handle_t *user_property, char *key, size_t key_len, char *value, size_t value_len) @@ -465,24 +465,24 @@ char *mqtt5_get_puback_data(uint8_t *buffer, size_t *length, mqtt5_user_property mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info, esp_mqtt5_connection_property_storage_t *property, esp_mqtt5_connection_will_property_storage_t *will_property) { init_message(connection); - connection->buffer[connection->message.length ++] = 0; // Variable header length MSB + connection->buffer[connection->outbound_message.length ++] = 0; // Variable header length MSB /* Defaults to protocol version 5 values */ - connection->buffer[connection->message.length ++] = 4; // Variable header length LSB - memcpy(&connection->buffer[connection->message.length], "MQTT", 4); // Protocol name - connection->message.length += 4; - connection->buffer[connection->message.length ++] = 5; // Protocol version + connection->buffer[connection->outbound_message.length ++] = 4; // Variable header length LSB + memcpy(&connection->buffer[connection->outbound_message.length], "MQTT", 4); // Protocol name + connection->outbound_message.length += 4; + connection->buffer[connection->outbound_message.length ++] = 5; // Protocol version - int flags_offset = connection->message.length; - connection->buffer[connection->message.length ++] = 0; // Flags - MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], info->keepalive) // Keep-alive + int flags_offset = connection->outbound_message.length; + connection->buffer[connection->outbound_message.length ++] = 0; // Flags + MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], info->keepalive) // Keep-alive if (info->clean_session) { connection->buffer[flags_offset] |= MQTT5_CONNECT_FLAG_CLEAN_SESSION; } //Add properties - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property->session_expiry_interval) { APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, property->session_expiry_interval), fail_message(connection)); } @@ -508,7 +508,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection)); } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); if (info->client_id != NULL && info->client_id[0] != '\0') { APPEND_CHECK(append_property(connection, 0, 2, info->client_id, strlen(info->client_id)), fail_message(connection)); @@ -518,8 +518,8 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in //Add will properties if (info->will_topic != NULL && info->will_topic[0] != '\0') { - properties_offset = connection->message.length; - connection->message.length ++; + properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (will_property->will_delay_interval) { APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_WILL_DELAY_INTERVAL, 4, NULL, will_property->will_delay_interval), fail_message(connection)); } @@ -545,7 +545,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection)); } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); APPEND_CHECK(append_property(connection, 0, 2, info->will_topic, strlen(info->will_topic)), fail_message(connection)); APPEND_CHECK(append_property(connection, 0, 2, info->will_message, info->will_length), fail_message(connection)); @@ -742,8 +742,8 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top *message_id = 0; } - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property) { if (property->payload_format_indicator) { @@ -788,20 +788,20 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_CONTENT_TYPE, 2, property->content_type, strlen(property->content_type)), fail_message(connection)); } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); - if (connection->message.length + data_length > connection->buffer_length) { + if (connection->outbound_message.length + data_length > connection->buffer_length) { // Not enough size in buffer -> fragment this message - connection->message.fragmented_msg_data_offset = connection->message.length; - memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length); - connection->message.length = connection->buffer_length; - connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset; + connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length; + memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length); + connection->outbound_message.length = connection->buffer_length; + connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset; } else { if (data != NULL) { - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; + memcpy(connection->buffer + connection->outbound_message.length, data, data_length); + connection->outbound_message.length += data_length; } - connection->message.fragmented_msg_total_length = 0; + connection->outbound_message.fragmented_msg_total_length = 0; } return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } @@ -858,8 +858,8 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt return fail_message(connection); } - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property) { if (property->subscribe_id) { @@ -873,7 +873,7 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt } } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); for (int topic_number = 0; topic_number < size; ++topic_number) { if (topic_list[topic_number].filter[0] == '\0') { @@ -897,23 +897,23 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection)); } - if (connection->message.length + 1 > connection->buffer_length) { + if (connection->outbound_message.length + 1 > connection->buffer_length) { return fail_message(connection); } - connection->buffer[connection->message.length] = 0; + connection->buffer[connection->outbound_message.length] = 0; if (property) { if (property->retain_handle > 0 && property->retain_handle < 3) { - connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4; + connection->buffer[connection->outbound_message.length] |= (property->retain_handle & 3) << 4; } if (property->no_local_flag) { - connection->buffer[connection->message.length] |= (property->no_local_flag << 2); + connection->buffer[connection->outbound_message.length] |= (property->no_local_flag << 2); } if (property->retain_as_published_flag) { - connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3); + connection->buffer[connection->outbound_message.length] |= (property->retain_as_published_flag << 3); } } - connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3); - connection->message.length ++; + connection->buffer[connection->outbound_message.length] |= (topic_list[topic_number].qos & 3); + connection->outbound_message.length ++; } return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); } @@ -921,10 +921,10 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info) { init_message(connection); - int reason_offset = connection->message.length; - connection->buffer[connection->message.length ++] = 0; - int properties_offset = connection->message.length; - connection->message.length ++; + int reason_offset = connection->outbound_message.length; + connection->buffer[connection->outbound_message.length ++] = 0; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (disconnect_property_info) { if (disconnect_property_info->session_expiry_interval) { APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, disconnect_property_info->session_expiry_interval), fail_message(connection)); @@ -940,7 +940,7 @@ mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_di connection->buffer[reason_offset] = disconnect_property_info->disconnect_reason; } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); } @@ -956,8 +956,8 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char return fail_message(connection); } - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property) { if (property->user_property) { mqtt5_user_property_item_t item; @@ -968,7 +968,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); if (property && property->is_share_subscribe) { uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name); char *shared_topic = calloc(1, shared_topic_size); @@ -996,10 +996,10 @@ mqtt_message_t *mqtt5_msg_puback(mqtt_connection_t *connection, uint16_t message if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); } @@ -1009,10 +1009,10 @@ mqtt_message_t *mqtt5_msg_pubrec(mqtt_connection_t *connection, uint16_t message if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); } @@ -1022,10 +1022,10 @@ mqtt_message_t *mqtt5_msg_pubrel(mqtt_connection_t *connection, uint16_t message if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); } @@ -1035,9 +1035,9 @@ mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t messag if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); } diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index 8b78ae3..a7a91e7 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -48,14 +48,14 @@ enum mqtt_connect_flag { static int append_string(mqtt_connection_t *connection, const char *string, int len) { - if (connection->message.length + len + 2 > connection->buffer_length) { + if (connection->outbound_message.length + len + 2 > connection->buffer_length) { return -1; } - connection->buffer[connection->message.length++] = len >> 8; - connection->buffer[connection->message.length++] = len & 0xff; - memcpy(connection->buffer + connection->message.length, string, len); - connection->message.length += len; + connection->buffer[connection->outbound_message.length++] = len >> 8; + connection->buffer[connection->outbound_message.length++] = len & 0xff; + memcpy(connection->buffer + connection->outbound_message.length, string, len); + connection->outbound_message.length += len; return len + 2; } @@ -72,38 +72,38 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag #endif } - if (connection->message.length + 2 > connection->buffer_length) { + if (connection->outbound_message.length + 2 > connection->buffer_length) { return 0; } - connection->buffer[connection->message.length++] = message_id >> 8; - connection->buffer[connection->message.length++] = message_id & 0xff; + connection->buffer[connection->outbound_message.length++] = message_id >> 8; + connection->buffer[connection->outbound_message.length++] = message_id & 0xff; return message_id; } -static int init_message(mqtt_connection_t *connection) +static int set_message_header_size(mqtt_connection_t *connection) { - connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; + connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE; return MQTT_MAX_FIXED_HEADER_SIZE; } static mqtt_message_t *fail_message(mqtt_connection_t *connection) { - connection->message.data = connection->buffer; - connection->message.length = 0; - return &connection->message; + connection->outbound_message.data = connection->buffer; + connection->outbound_message.length = 0; + return &connection->outbound_message; } static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain) { - int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE; int total_length = message_length; int encoded_length = 0; uint8_t encoded_lens[4] = {0}; // Check if we have fragmented message and update total_len - if (connection->message.fragmented_msg_total_length) { - total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE; + if (connection->outbound_message.fragmented_msg_total_length) { + total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE; } // Encode MQTT message length @@ -124,10 +124,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int } // Save the header bytes - connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) + connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes; - connection->message.data = connection->buffer + offs; - connection->message.fragmented_msg_data_offset -= offs; + connection->outbound_message.data = connection->buffer + offs; + connection->outbound_message.fragmented_msg_data_offset -= offs; // type byte connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); // length bytes @@ -135,14 +135,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int connection->buffer[offs++] = encoded_lens[j]; } - return &connection->message; -} - -void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length) -{ - memset(connection, 0, sizeof(mqtt_connection_t)); - connection->buffer = buffer; - connection->buffer_length = buffer_length; + return &connection->outbound_message; } size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len) @@ -347,7 +340,7 @@ uint16_t mqtt_get_id(uint8_t *buffer, size_t length) mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info) { - init_message(connection); + set_message_header_size(connection); int header_len; if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) { @@ -356,11 +349,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE; } - if (connection->message.length + header_len > connection->buffer_length) { + if (connection->outbound_message.length + header_len > connection->buffer_length) { return fail_message(connection); } - char *variable_header = (char *)(connection->buffer + connection->message.length); - connection->message.length += header_len; + char *variable_header = (char *)(connection->buffer + connection->outbound_message.length); + connection->outbound_message.length += header_len; int header_idx = 0; variable_header[header_idx++] = 0; // Variable header length MSB @@ -445,7 +438,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id) { - init_message(connection); + set_message_header_size(connection); if (topic == NULL || topic[0] == '\0') { return fail_message(connection); @@ -468,16 +461,16 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi } if (data != NULL) { - if (connection->message.length + data_length > connection->buffer_length) { + if (connection->outbound_message.length + data_length > connection->buffer_length) { // Not enough size in buffer -> fragment this message - connection->message.fragmented_msg_data_offset = connection->message.length; - memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length); - connection->message.length = connection->buffer_length; - connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset; + connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length; + memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length); + connection->outbound_message.length = connection->buffer_length; + connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset; } else { - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; - connection->message.fragmented_msg_total_length = 0; + memcpy(connection->buffer + connection->outbound_message.length, data, data_length); + connection->outbound_message.length += data_length; + connection->outbound_message.fragmented_msg_total_length = 0; } } return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); @@ -485,7 +478,7 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -494,7 +487,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -503,7 +496,7 @@ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -512,7 +505,7 @@ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -521,7 +514,7 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) { - init_message(connection); + set_message_header_size(connection); if ((*message_id = append_message_id(connection, 0)) == 0) { return fail_message(connection); @@ -536,11 +529,11 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt return fail_message(connection); } - if (connection->message.length + 1 > connection->buffer_length) { + if (connection->outbound_message.length + 1 > connection->buffer_length) { return fail_message(connection); } - connection->buffer[connection->message.length] = topic_list[topic_number].qos; - connection->message.length ++; + connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos; + connection->outbound_message.length ++; } return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); @@ -548,7 +541,7 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id) { - init_message(connection); + set_message_header_size(connection); if (topic == NULL || topic[0] == '\0') { return fail_message(connection); @@ -567,19 +560,19 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char * mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection) { - init_message(connection); + set_message_header_size(connection); return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); } mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection) { - init_message(connection); + set_message_header_size(connection); return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); } mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection) { - init_message(connection); + set_message_header_size(connection); return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); } @@ -622,3 +615,23 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length) return 0; } } + +esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size) +{ + memset(connection, 0, sizeof(mqtt_connection_t)); + connection->buffer = (uint8_t *)calloc(0, buffer_size); + if (!connection->buffer) { + return ESP_ERR_NO_MEM; + } + connection->buffer_length = buffer_size; + return ESP_OK; +} + +void mqtt_connection_destroy(mqtt_connection_t *connection) +{ + if (connection) { + free(connection->buffer); + } +} + + diff --git a/mqtt5_client.c b/mqtt5_client.c index 82d4f41..851bb5a 100644 --- a/mqtt5_client.c +++ b/mqtt5_client.c @@ -18,7 +18,7 @@ static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_ void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client) { - bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data); + bool msg_dup = mqtt5_get_dup(client->mqtt_state.connection.outbound_message.data); if (msg_dup == false) { client->send_publish_packet_count ++; ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count); @@ -35,7 +35,7 @@ void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBCOMP return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); size_t msg_data_len = client->mqtt_state.in_buffer_read_len; client->event.data = mqtt5_get_pubcomp_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property); @@ -47,7 +47,7 @@ void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); size_t msg_data_len = client->mqtt_state.in_buffer_read_len; client->event.data = mqtt5_get_puback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property); @@ -59,7 +59,7 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_UNSUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); size_t msg_data_len = client->mqtt_state.in_buffer_read_len; client->event.data = mqtt5_get_unsuback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property); @@ -71,7 +71,7 @@ void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_suback(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_SUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); } } @@ -81,13 +81,14 @@ esp_err_t esp_mqtt5_parse_connack(esp_mqtt5_client_handle_t client, int *connect size_t len = client->mqtt_state.in_buffer_read_len; client->mqtt_state.in_buffer_read_len = 0; uint8_t ack_flag = 0; - if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) { + if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->mqtt_state. + connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) { ESP_LOGE(TAG, "Failed to parse CONNACK packet"); return ESP_FAIL; } if (*connect_rsp_code == MQTT_CONNECTION_ACCEPTED) { ESP_LOGD(TAG, "Connected"); - client->event.session_present = ack_flag & 0x01; + client->event.session_present = ack_flag & 0x01; return ESP_OK; } esp_mqtt5_print_error_code(client, *connect_rsp_code); @@ -116,7 +117,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t * *msg_topic = esp_mqtt5_client_get_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, msg_topic_len); if (!*msg_topic) { ESP_LOGE(TAG, "%s: esp_mqtt5_client_get_topic_alias() failed", __func__); - return ESP_FAIL; + return ESP_FAIL; } } else { if (esp_mqtt5_client_update_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, *msg_topic, *msg_topic_len) != ESP_OK) { @@ -139,7 +140,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t * esp_err_t esp_mqtt5_create_default_config(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { client->event.property = calloc(1, sizeof(esp_mqtt5_event_property_t)); ESP_MEM_CHECK(TAG, client->event.property, return ESP_FAIL) client->mqtt5_config = calloc(1, sizeof(mqtt5_config_storage_t)); @@ -304,7 +305,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q void esp_mqtt5_client_destory(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { if (client->mqtt5_config) { free(client->mqtt5_config->will_property_info.content_type); free(client->mqtt5_config->will_property_info.response_topic); @@ -416,7 +417,7 @@ esp_err_t esp_mqtt5_client_set_publish_property(esp_mqtt5_client_handle_t client MQTT_API_LOCK(client); /* Check protocol version */ - if(client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -445,7 +446,7 @@ esp_err_t esp_mqtt5_client_set_subscribe_property(esp_mqtt5_client_handle_t clie MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -482,7 +483,7 @@ esp_err_t esp_mqtt5_client_set_unsubscribe_property(esp_mqtt5_client_handle_t cl MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -513,7 +514,7 @@ esp_err_t esp_mqtt5_client_set_disconnect_property(esp_mqtt5_client_handle_t cli MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -556,7 +557,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -572,7 +573,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client return ESP_FAIL; } else { client->mqtt5_config->connect_property_info.maximum_packet_size = connect_property->maximum_packet_size; - } + } } else { client->mqtt5_config->connect_property_info.maximum_packet_size = client->mqtt_state.in_buffer_length; } @@ -757,4 +758,4 @@ void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_pro } } free(user_property); -} \ No newline at end of file +} diff --git a/mqtt_client.c b/mqtt_client.c index 295e806..6719a49 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -4,7 +4,7 @@ #include "esp_log.h" #include #include "esp_heap_caps.h" - +#include "mqtt_msg.h" _Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type"); #ifdef ESP_EVENT_ANY_ID @@ -396,70 +396,70 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl err = ESP_ERR_NO_MEM; ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.hostname, &client->config->host), goto _mqtt_set_config_failed); ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.path, &client->config->path), goto _mqtt_set_config_failed); - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->connect_info.username), goto _mqtt_set_config_failed); - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->connect_info.password), goto _mqtt_set_config_failed); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->mqtt_state.connection.information.username), goto _mqtt_set_config_failed); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->mqtt_state.connection.information.password), goto _mqtt_set_config_failed); if (!config->credentials.set_null_client_id) { if (config->credentials.client_id) { - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->connect_info.client_id), goto _mqtt_set_config_failed); - } else if (client->connect_info.client_id == NULL) { - client->connect_info.client_id = platform_create_id_string(); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->mqtt_state.connection.information.client_id), goto _mqtt_set_config_failed); + } else if (client->mqtt_state.connection.information.client_id == NULL) { + client->mqtt_state.connection.information.client_id = platform_create_id_string(); } - ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed); - ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id); + ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.client_id, goto _mqtt_set_config_failed); + ESP_LOGD(TAG, "MQTT client_id=%s", client->mqtt_state.connection.information.client_id); } ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.uri, &client->config->uri), goto _mqtt_set_config_failed); - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->connect_info.will_topic), goto _mqtt_set_config_failed); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->mqtt_state.connection.information.will_topic), goto _mqtt_set_config_failed); if (config->session.last_will.msg_len && config->session.last_will.msg) { - free(client->connect_info.will_message); - client->connect_info.will_message = malloc(config->session.last_will.msg_len); - ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); - memcpy(client->connect_info.will_message, config->session.last_will.msg, config->session.last_will.msg_len); - client->connect_info.will_length = config->session.last_will.msg_len; + free(client->mqtt_state.connection.information.will_message); + client->mqtt_state.connection.information.will_message = malloc(config->session.last_will.msg_len); + ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed); + memcpy(client->mqtt_state.connection.information.will_message, config->session.last_will.msg, config->session.last_will.msg_len); + client->mqtt_state.connection.information.will_length = config->session.last_will.msg_len; } else if (config->session.last_will.msg) { - free(client->connect_info.will_message); - client->connect_info.will_message = strdup(config->session.last_will.msg); - ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); - client->connect_info.will_length = strlen(config->session.last_will.msg); + free(client->mqtt_state.connection.information.will_message); + client->mqtt_state.connection.information.will_message = strdup(config->session.last_will.msg); + ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed); + client->mqtt_state.connection.information.will_length = strlen(config->session.last_will.msg); } if (config->session.last_will.qos) { - client->connect_info.will_qos = config->session.last_will.qos; + client->mqtt_state.connection.information.will_qos = config->session.last_will.qos; } if (config->session.last_will.retain) { - client->connect_info.will_retain = config->session.last_will.retain; + client->mqtt_state.connection.information.will_retain = config->session.last_will.retain; } - if (config->session.disable_clean_session == client->connect_info.clean_session) { - client->connect_info.clean_session = !config->session.disable_clean_session; - if (!client->connect_info.clean_session && config->credentials.set_null_client_id) { + if (config->session.disable_clean_session == client->mqtt_state.connection.information.clean_session) { + client->mqtt_state.connection.information.clean_session = !config->session.disable_clean_session; + if (!client->mqtt_state.connection.information.clean_session && config->credentials.set_null_client_id) { ESP_LOGE(TAG, "Clean Session flag must be true if client has a null id"); } } if (config->session.keepalive) { - client->connect_info.keepalive = config->session.keepalive; + client->mqtt_state.connection.information.keepalive = config->session.keepalive; } - if (client->connect_info.keepalive == 0) { - client->connect_info.keepalive = MQTT_KEEPALIVE_TICK; + if (client->mqtt_state.connection.information.keepalive == 0) { + client->mqtt_state.connection.information.keepalive = MQTT_KEEPALIVE_TICK; } if (config->session.disable_keepalive) { // internal `keepalive` value (in connect_info) is in line with 3.1.2.10 Keep Alive from mqtt spec: // * keepalive=0: Keep alive mechanism disabled (server not to disconnect the client on its inactivity) // * period in seconds to send a Control packet if inactive - client->connect_info.keepalive = 0; + client->mqtt_state.connection.information.keepalive = 0; } if (config->session.protocol_ver) { - client->connect_info.protocol_ver = config->session.protocol_ver; + client->mqtt_state.connection.information.protocol_ver = config->session.protocol_ver; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_UNDEFINED) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_UNDEFINED) { #ifdef MQTT_PROTOCOL_311 - client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1_1; + client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1_1; #else - client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1; + client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1; #endif - } else if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + } else if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifndef MQTT_PROTOCOL_5 ESP_LOGE(TAG, "Please first enable MQTT_PROTOCOL_5 feature in menuconfig"); goto _mqtt_set_config_failed; @@ -485,9 +485,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } else { client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS; } - if (config->network.transport) { - client->config->transport = config->network.transport; - } if (config->broker.verification.alpn_protos) { for (int i = 0; i < client->config->num_alpn_protos; i++) { @@ -592,15 +589,15 @@ void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) } free(client->config->alpn_protos); free(client->config->clientkey_password); - free(client->connect_info.will_topic); - free(client->connect_info.will_message); - free(client->connect_info.client_id); - free(client->connect_info.username); - free(client->connect_info.password); + free(client->mqtt_state.connection.information.will_topic); + free(client->mqtt_state.connection.information.will_message); + free(client->mqtt_state.connection.information.client_id); + free(client->mqtt_state.connection.information.username); + free(client->mqtt_state.connection.information.password); #ifdef MQTT_PROTOCOL_5 esp_mqtt5_client_destory(client); #endif - memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t)); + memset(&client->mqtt_state.connection.information, 0, sizeof(mqtt_connect_info_t)); #ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP if (client->config->event_loop_handle) { esp_event_loop_delete(client->config->event_loop_handle); @@ -620,8 +617,8 @@ static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout) static esp_err_t process_keepalive(esp_mqtt_client_handle_t client) { - if (client->connect_info.keepalive > 0) { - const uint64_t keepalive_ms = client->connect_info.keepalive * 1000; + if (client->mqtt_state.connection.information.keepalive > 0) { + const uint64_t keepalive_ms = client->mqtt_state.connection.information.keepalive * 1000; if (client->wait_for_ping_resp == true ) { if (has_timed_out(client->keepalive_tick, keepalive_ms)) { @@ -648,10 +645,10 @@ static esp_err_t process_keepalive(esp_mqtt_client_handle_t client) static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client) { - int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length; + int wlen = 0, widx = 0, len = client->mqtt_state.connection.outbound_message.length; while (len > 0) { wlen = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data + widx, + (char *)client->mqtt_state.connection.outbound_message.data + widx, len, client->config->network_timeout_ms); if (wlen < 0) { @@ -674,29 +671,29 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m { int read_len, connect_rsp_code = 0; client->wait_for_ping_resp = false; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_connect(&client->mqtt_state.mqtt_connection, - &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info); + mqtt5_msg_connect(&client->mqtt_state.connection, + &client->mqtt_state.connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, - &client->connect_info); + mqtt_msg_connect(&client->mqtt_state.connection, + &client->mqtt_state.connection.information); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Connect message cannot be created"); return ESP_FAIL; } - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); + client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.connection.outbound_message.data, + client->mqtt_state.connection.outbound_message.length); #endif } else { - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.connection.outbound_message.data, + client->mqtt_state.connection.outbound_message.length); } ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X", client->mqtt_state.pending_msg_type, @@ -724,7 +721,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len); return ESP_FAIL; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) { client->send_publish_packet_count = 0; @@ -807,6 +804,24 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co if (!create_client_data(client)) { goto _mqtt_init_failed; } + int buffer_size = config->buffer.size; + if (buffer_size <= 0) { + buffer_size = MQTT_BUFFER_SIZE_BYTE; + } + + // use separate value for output buffer size if configured + int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size; + if (mqtt_connection_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) { + goto _mqtt_init_failed; + } + + client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size); + ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed); + client->mqtt_state.in_buffer_length = buffer_size; + client->outbox = outbox_init(); + ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed); + client->status_bits = xEventGroupCreate(); + ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed); if (esp_mqtt_set_config(client, config) != ESP_OK) { goto _mqtt_init_failed; @@ -826,27 +841,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co client->reconnect_tick = platform_tick_get_ms(); client->refresh_connection_tick = platform_tick_get_ms(); client->wait_for_ping_resp = false; - int buffer_size = config->buffer.size; - if (buffer_size <= 0) { - buffer_size = MQTT_BUFFER_SIZE_BYTE; - } - // use separate value for output buffer size if configured - int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size; - - client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size); - ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed); - client->mqtt_state.in_buffer_length = buffer_size; - client->mqtt_state.out_buffer = (uint8_t *)malloc(out_buffer_size); - ESP_MEM_CHECK(TAG, client->mqtt_state.out_buffer, goto _mqtt_init_failed); - - client->mqtt_state.out_buffer_length = out_buffer_size; - client->outbox = outbox_init(); - ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed); - client->status_bits = xEventGroupCreate(); - ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed); - - mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, - client->mqtt_state.out_buffer_length); #ifdef MQTT_PROTOCOL_5 if (esp_mqtt5_create_default_config(client) != ESP_OK) { goto _mqtt_init_failed; @@ -877,7 +871,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client) vEventGroupDelete(client->status_bits); } free(client->mqtt_state.in_buffer); - free(client->mqtt_state.out_buffer); + mqtt_connection_destroy(&client->mqtt_state.connection); if (client->api_lock) { vSemaphoreDelete(client->api_lock); } @@ -944,9 +938,9 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u if (pass) { pass[0] = 0; //terminal username pass ++; - client->connect_info.password = strdup(pass); + client->mqtt_state.connection.information.password = strdup(pass); } - client->connect_info.username = strdup(user_info); + client->mqtt_state.connection.information.username = strdup(user_info); free(user_info); } @@ -958,7 +952,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->event.msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); #endif @@ -982,16 +976,16 @@ esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mq static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) { client->event.client = client; - client->event.protocol_ver = client->connect_info.protocol_ver; + client->event.protocol_ver = client->mqtt_state.connection.information.protocol_ver; esp_err_t ret = ESP_FAIL; -#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP - esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY); - ret = esp_event_loop_run(client->config->event_loop_handle, 0); + #ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP + esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY); + ret = esp_event_loop_run(client->config->event_loop_handle, 0); #else return ESP_FAIL; #endif - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 esp_mqtt5_client_delete_user_property(client->event.property->user_property); client->event.property->user_property = NULL; @@ -1009,7 +1003,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) size_t msg_data_offset = 0; char *msg_topic = NULL, *msg_data = NULL; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 if (esp_mqtt5_get_publish_data(client, msg_buf, msg_read_len, &msg_topic, &msg_topic_len, &msg_data, &msg_data_len) != ESP_OK) { ESP_LOGE(TAG, "%s: esp_mqtt5_get_publish_data() failed", __func__); @@ -1034,7 +1028,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) } // post data event client->event.retain = mqtt_get_retain(msg_buf); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->event.msg_id = mqtt5_get_id(msg_buf, msg_read_len); #endif @@ -1082,7 +1076,7 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) size_t msg_data_len = client->mqtt_state.in_buffer_read_len; char *msg_data = NULL; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property); #else @@ -1135,16 +1129,16 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_ { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - outbox_message_t msg = { 0 }; - msg.data = client->mqtt_state.outbound_message->data; - msg.len = client->mqtt_state.outbound_message->length; - msg.msg_id = client->mqtt_state.pending_msg_id; - msg.msg_type = client->mqtt_state.pending_msg_type; - msg.msg_qos = client->mqtt_state.pending_publish_qos; - msg.remaining_data = remaining_data; - msg.remaining_len = remaining_len; - //Copy to queue buffer - return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); + outbox_message_t msg = { 0 }; + msg.data = client->mqtt_state.connection.outbound_message.data; + msg.len = client->mqtt_state.connection.outbound_message.length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; + msg.remaining_data = remaining_data; + msg.remaining_len = remaining_len; + //Copy to queue buffer + return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } @@ -1323,23 +1317,23 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_FAIL; } if (msg_qos == 1) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_puback(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_puback(&client->mqtt_state.connection, msg_id); } } else if (msg_qos == 2) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_pubrec(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_pubrec(&client->mqtt_state.connection, msg_id); } } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created"); return ESP_FAIL; } @@ -1355,7 +1349,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; case MQTT_MSG_TYPE_PUBACK: #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_decrement_packet_counter(client); } #endif @@ -1370,15 +1364,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; case MQTT_MSG_TYPE_PUBREC: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREC return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); - client->mqtt_state.outbound_message = mqtt5_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_pubrel(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_pubrel(&client->mqtt_state.connection, msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish response message PUBREL cannot be created"); return ESP_FAIL; } @@ -1388,15 +1382,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; case MQTT_MSG_TYPE_PUBREL: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREL return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); - client->mqtt_state.outbound_message = mqtt5_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_pubcomp(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_pubcomp(&client->mqtt_state.connection, msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created"); return ESP_FAIL; } @@ -1406,7 +1400,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) case MQTT_MSG_TYPE_PUBCOMP: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_decrement_packet_counter(client); } #endif @@ -1438,11 +1432,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item) { // decode queued data - client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id, + client->mqtt_state.connection.outbound_message.data = outbox_item_get_data(item, &client->mqtt_state.connection.outbound_message.length, &client->mqtt_state.pending_msg_id, &client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos); // set duplicate flag for QoS-1 and QoS-2 messages if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) { - mqtt_set_dup(client->mqtt_state.outbound_message->data); + mqtt_set_dup(client->mqtt_state.connection.outbound_message.data); ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id); } @@ -1580,7 +1574,7 @@ static void esp_mqtt_task(void *pv) break; } client->event.event_id = MQTT_EVENT_CONNECTED; - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); } client->state = MQTT_STATE_CONNECTED; @@ -1733,19 +1727,19 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client) static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client) { // Notify the broker we are disconnecting - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_disconnect(&client->mqtt_state.mqtt_connection, &client->mqtt5_config->disconnect_property_info); - if (client->mqtt_state.outbound_message->length) { + mqtt5_msg_disconnect(&client->mqtt_state.connection, &client->mqtt5_config->disconnect_property_info); + if (client->mqtt_state.connection.outbound_message.length) { esp_mqtt5_client_delete_user_property(client->mqtt5_config->disconnect_property_info.user_property); client->mqtt5_config->disconnect_property_info.user_property = NULL; memset(&client->mqtt5_config->disconnect_property_info, 0, sizeof(esp_mqtt5_disconnect_property_config_t)); } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection); + mqtt_msg_disconnect(&client->mqtt_state.connection); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Disconnect message cannot be created"); return ESP_FAIL; } @@ -1793,8 +1787,8 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client) { - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - if (client->mqtt_state.outbound_message->length == 0) { + mqtt_msg_pingreq(&client->mqtt_state.connection); + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Ping message cannot be created"); return ESP_FAIL; } @@ -1820,7 +1814,7 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, MQTT_API_UNLOCK(client); return -1; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 int max_qos = topic_list[0].qos; for (int topic_number = 0; topic_number < size; ++topic_number) { @@ -1833,25 +1827,25 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, MQTT_API_UNLOCK(client); return -1; } - client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection, + mqtt5_msg_subscribe(&client->mqtt_state.connection, topic_list, size, &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); - if (client->mqtt_state.outbound_message->length) { + if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->subscribe_property_info = NULL; } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, + mqtt_msg_subscribe(&client->mqtt_state.connection, topic_list, size, &client->mqtt_state.pending_msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Subscribe message cannot be created"); MQTT_API_UNLOCK(client); return -1; } - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); //move pending msg to outbox (if have) if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); @@ -1888,28 +1882,28 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top ESP_LOGE(TAG, "Client has not connected"); return -1; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_unsubscribe(&client->mqtt_state.mqtt_connection, + mqtt5_msg_unsubscribe(&client->mqtt_state.connection, topic, &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); - if (client->mqtt_state.outbound_message->length) { + if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->unsubscribe_property_info = NULL; } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection, + mqtt_msg_unsubscribe(&client->mqtt_state.connection, topic, &client->mqtt_state.pending_msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { MQTT_API_UNLOCK(client); ESP_LOGE(TAG, "Unubscribe message cannot be created"); return -1; } ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); return -1; @@ -1927,48 +1921,54 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top return client->mqtt_state.pending_msg_id; } -static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, - int len, int qos, int retain, bool store) +static int make_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, + int len, int qos, int retain) { uint16_t pending_msg_id = 0; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_publish(&client->mqtt_state.mqtt_connection, + mqtt5_msg_publish(&client->mqtt_state.connection, topic, data, len, qos, retain, &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); - if (client->mqtt_state.outbound_message->length) { + if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->publish_property_info = NULL; } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, + mqtt_msg_publish(&client->mqtt_state.connection, topic, data, len, qos, retain, &pending_msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish message cannot be created"); return -1; } + return pending_msg_id; +} +static inline int mqtt_client_enqueue_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, + int len, int qos, int retain, bool store) +{ + int pending_msg_id = make_publish(client, topic, data, len, qos, retain); /* We have to set as pending all the qos>0 messages */ //TODO: client->mqtt_state.outbound_message = publish_msg; if (qos > 0 || store) { - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer - if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { + if (client->mqtt_state.connection.outbound_message.fragmented_msg_total_length == 0) { if (!mqtt_enqueue(client, NULL, 0)) { return -1; } } else { - int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; + int first_fragment = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset; if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { return -1; } - client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; + client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0; } } return pending_msg_id; @@ -1990,7 +1990,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, #endif #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) { ESP_LOGI(TAG, "MQTT5 publish check fail"); MQTT_API_UNLOCK(client); @@ -2008,7 +2008,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } - int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false); + int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false); if (pending_msg_id < 0) { MQTT_API_UNLOCK(client); return -1; @@ -2043,27 +2043,19 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, goto cannot_publish; } - int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; - client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0; - client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; + int data_sent = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset; + client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset = 0; + client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0; remaining_len -= data_sent; current_data += data_sent; if (remaining_len > 0) { - mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection; + mqtt_connection_t *connection = &client->mqtt_state.connection; ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len); - if (remaining_len > connection->buffer_length) { - // Continue with sending - memcpy(connection->buffer, current_data, connection->buffer_length); - connection->message.length = connection->buffer_length; - sending = true; - } else { - memcpy(connection->buffer, current_data, remaining_len); - connection->message.length = remaining_len; - sending = true; - } - connection->message.data = connection->buffer; - client->mqtt_state.outbound_message = &connection->message; + int write_len = remaining_len > connection->buffer_length ? connection->buffer_length : remaining_len; + memcpy(connection->buffer, current_data, write_len); + connection->outbound_message.length = write_len; + sending = true; } else { // Message was sent correctly sending = false; @@ -2085,7 +2077,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, cannot_publish: // clear out possible fragmented publish if failed or skipped - client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; + client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0; if (qos == 0) { ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected"); } @@ -2112,7 +2104,7 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, MQTT_API_LOCK(client); #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) { ESP_LOGI(TAG, "esp_mqtt_client_enqueue check fail"); MQTT_API_UNLOCK(client); @@ -2120,7 +2112,7 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, } } #endif - int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store); + int ret = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, store); MQTT_API_UNLOCK(client); if (ret == 0 && store == false) { // messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error From 21a5491d537b80190634262ccb7c8bb3159564a1 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Mon, 27 Mar 2023 09:11:48 +0200 Subject: [PATCH 2/3] Removes unused outbox functions. --- lib/include/mqtt_outbox.h | 2 -- lib/mqtt_outbox.c | 25 ------------------------- 2 files changed, 27 deletions(-) diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index fbeb5a5..e180fca 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -42,8 +42,6 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); -esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); -esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item); int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout); /** diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index f60a471..1915e59 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -121,19 +121,7 @@ esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) } return ESP_FAIL; } -esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id) -{ - outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - if (item->msg_id == msg_id) { - STAILQ_REMOVE(outbox, item, outbox_item, next); - free(item->buffer); - free(item); - } - } - return ESP_OK; -} esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending) { outbox_item_handle_t item = outbox_get(outbox, msg_id); @@ -162,19 +150,6 @@ esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick return ESP_FAIL; } -esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type) -{ - outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - if (item->msg_type == msg_type) { - STAILQ_REMOVE(outbox, item, outbox_item, next); - free(item->buffer); - free(item); - } - - } - return ESP_OK; -} int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) { int msg_id = -1; From 372ab7b37451e7f13bf8433cb92ebd81b3e3d45b Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Thu, 30 Mar 2023 15:16:14 +0200 Subject: [PATCH 3/3] feat: Introduces outbox limit A memory limit for the outbox can be configured. User will not be able to publish or enqueue if the new message goes beyond the configured limit. --- host_test/CMakeLists.txt | 3 +- host_test/main/CMakeLists.txt | 7 + host_test/main/test_mqtt_client.cpp | 191 ++++++++++++++++------------ include/mqtt_client.h | 11 +- lib/include/mqtt_client_priv.h | 2 + lib/include/mqtt_msg.h | 4 +- lib/include/mqtt_outbox.h | 4 +- lib/mqtt_msg.c | 4 +- lib/mqtt_outbox.c | 56 ++++---- mqtt_client.c | 69 ++++++---- 10 files changed, 210 insertions(+), 141 deletions(-) diff --git a/host_test/CMakeLists.txt b/host_test/CMakeLists.txt index 5b63550..7c3cdb1 100644 --- a/host_test/CMakeLists.txt +++ b/host_test/CMakeLists.txt @@ -11,7 +11,6 @@ list(APPEND EXTRA_COMPONENT_DIRS "$ENV{IDF_PATH}/tools/mocks/lwip/" "$ENV{IDF_PATH}/tools/mocks/esp-tls/" "$ENV{IDF_PATH}/tools/mocks/http_parser/" - "$ENV{IDF_PATH}/tools/mocks/tcp_transport/" - ) + "$ENV{IDF_PATH}/tools/mocks/tcp_transport/") project(host_mqtt_client_test) diff --git a/host_test/main/CMakeLists.txt b/host_test/main/CMakeLists.txt index e50cbb6..682bc22 100644 --- a/host_test/main/CMakeLists.txt +++ b/host_test/main/CMakeLists.txt @@ -2,6 +2,13 @@ idf_component_register(SRCS "test_mqtt_client.cpp" INCLUDE_DIRS "$ENV{IDF_PATH}/tools/catch" REQUIRES cmock mqtt esp_timer esp_hw_support http_parser log) +target_compile_options(${COMPONENT_LIB} PUBLIC -fsanitize=address -fconcepts) +target_link_options(${COMPONENT_LIB} PUBLIC -fsanitize=address) + +idf_component_get_property(mqtt mqtt COMPONENT_LIB) +target_compile_options(${mqtt} PUBLIC -fsanitize=address -fconcepts) +target_link_options(${mqtt} PUBLIC -fsanitize=address) + if(CONFIG_GCOV_ENABLED) target_compile_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage) target_link_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage) diff --git a/host_test/main/test_mqtt_client.cpp b/host_test/main/test_mqtt_client.cpp index 55d0f2b..47dbde5 100644 --- a/host_test/main/test_mqtt_client.cpp +++ b/host_test/main/test_mqtt_client.cpp @@ -3,10 +3,15 @@ * * SPDX-License-Identifier: Apache-2.0 */ +#include +#include +#include +#include #include "esp_transport.h" #define CATCH_CONFIG_MAIN // This tells the catch header to generate a main #include "catch.hpp" +#include "mqtt_client.h" extern "C" { #include "Mockesp_event.h" #include "Mockesp_mac.h" @@ -30,99 +35,121 @@ extern "C" { } } -#include "mqtt_client.h" - -struct ClientInitializedFixture { - esp_mqtt_client_handle_t client; - ClientInitializedFixture() - { - [[maybe_unused]] auto protect = TEST_PROTECT(); - int mtx; - int transport_list; - int transport; - int event_group; - uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55}; - esp_timer_get_time_IgnoreAndReturn(0); - xQueueTakeMutexRecursive_IgnoreAndReturn(true); - xQueueGiveMutexRecursive_IgnoreAndReturn(true); - xQueueCreateMutex_ExpectAnyArgsAndReturn( - reinterpret_cast(&mtx)); - xEventGroupCreate_IgnoreAndReturn(reinterpret_cast(&event_group)); - esp_transport_list_init_IgnoreAndReturn(reinterpret_cast(&transport_list)); - esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast(&transport)); - esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast(&transport)); - esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast(&transport)); - esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK); - esp_transport_list_add_IgnoreAndReturn(ESP_OK); - esp_transport_set_default_port_IgnoreAndReturn(ESP_OK); - http_parser_parse_url_IgnoreAndReturn(0); - http_parser_url_init_ExpectAnyArgs(); - esp_event_loop_create_IgnoreAndReturn(ESP_OK); - esp_read_mac_IgnoreAndReturn(ESP_OK); - esp_read_mac_ReturnThruPtr_mac(mac); - esp_transport_list_destroy_IgnoreAndReturn(ESP_OK); - esp_transport_destroy_IgnoreAndReturn(ESP_OK); - vEventGroupDelete_Ignore(); - vQueueDelete_Ignore(); - - esp_mqtt_client_config_t config{}; - client = esp_mqtt_client_init(&config); - } - ~ClientInitializedFixture() - { - esp_mqtt_client_destroy(client); - } -}; -TEST_CASE_METHOD(ClientInitializedFixture, "Client set uri") +auto random_string(std::size_t n) { - struct http_parser_url ret_uri = { - .field_set = 1, - .port = 0, - .field_data = { { 0, 1} } - }; - SECTION("User set a correct URI") { - http_parser_parse_url_StopIgnore(); - http_parser_parse_url_ExpectAnyArgsAndReturn(0); - http_parser_parse_url_ReturnThruPtr_u(&ret_uri); - auto res = esp_mqtt_client_set_uri(client, " "); - REQUIRE(res == ESP_OK); - } - SECTION("Incorrect URI from user") { - http_parser_parse_url_StopIgnore(); - http_parser_parse_url_ExpectAnyArgsAndReturn(1); - http_parser_parse_url_ReturnThruPtr_u(&ret_uri); - auto res = esp_mqtt_client_set_uri(client, " "); - REQUIRE(res == ESP_FAIL); - } + static constexpr std::string_view char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ123456790"; + std::string str; + std::sample(char_set.begin(), char_set.end(), std::back_inserter(str), n, + std::mt19937 {std::random_device{}()}); + return str; } -TEST_CASE_METHOD(ClientInitializedFixture, "Client Start") + +using unique_mqtt_client = std::unique_ptr < std::remove_pointer_t, decltype([](esp_mqtt_client_handle_t client) { - SECTION("Successful start") { + esp_mqtt_client_destroy(client); +}) >; + +SCENARIO("MQTT Client Operation") +{ + // [[maybe_unused]] auto protect = TEST_PROTECT(); + // Set expectations for the mocked calls. + int mtx = 0; + int transport_list = 0; + int transport = 0; + int event_group = 0; + uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55}; + esp_timer_get_time_IgnoreAndReturn(0); + xQueueTakeMutexRecursive_IgnoreAndReturn(true); + xQueueGiveMutexRecursive_IgnoreAndReturn(true); + xQueueCreateMutex_ExpectAnyArgsAndReturn( + reinterpret_cast(&mtx)); + xEventGroupCreate_IgnoreAndReturn(reinterpret_cast(&event_group)); + esp_transport_list_init_IgnoreAndReturn(reinterpret_cast(&transport_list)); + esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast(&transport)); + esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast(&transport)); + esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast(&transport)); + esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK); + esp_transport_list_add_IgnoreAndReturn(ESP_OK); + esp_transport_set_default_port_IgnoreAndReturn(ESP_OK); + http_parser_url_init_Ignore(); + esp_event_loop_create_IgnoreAndReturn(ESP_OK); + esp_read_mac_IgnoreAndReturn(ESP_OK); + esp_read_mac_ReturnThruPtr_mac(mac); + esp_transport_list_destroy_IgnoreAndReturn(ESP_OK); + vEventGroupDelete_Ignore(); + vQueueDelete_Ignore(); + GIVEN("An a minimal config") { esp_mqtt_client_config_t config{}; config.broker.address.uri = "mqtt://1.1.1.1"; struct http_parser_url ret_uri = { - .field_set = 1 | (1<<1), + .field_set = 1 | (1 << 1), .port = 0, .field_data = { { 0, 4 } /*mqtt*/, { 7, 1 } } // at least *scheme* and *host* }; - http_parser_parse_url_StopIgnore(); http_parser_parse_url_ExpectAnyArgsAndReturn(0); http_parser_parse_url_ReturnThruPtr_u(&ret_uri); xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdTRUE); - auto res = esp_mqtt_set_config(client, &config); - REQUIRE(res == ESP_OK); - res = esp_mqtt_client_start(client); - REQUIRE(res == ESP_OK); - } - SECTION("Failed on initialization") { - xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE); - auto res = esp_mqtt_client_start(nullptr); - REQUIRE(res == ESP_ERR_INVALID_ARG); - } - SECTION("Client already started") {} - SECTION("Failed to start task") { - xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE); - auto res = esp_mqtt_client_start(client); - REQUIRE(res == ESP_FAIL); + SECTION("Client with minimal config") { + auto client = unique_mqtt_client{esp_mqtt_client_init(&config)}; + REQUIRE(client != nullptr); + SECTION("User will set a new uri") { + struct http_parser_url ret_uri = { + .field_set = 1, + .port = 0, + .field_data = { { 0, 1} } + }; + SECTION("User set a correct URI") { + http_parser_parse_url_StopIgnore(); + http_parser_parse_url_ExpectAnyArgsAndReturn(0); + http_parser_parse_url_ReturnThruPtr_u(&ret_uri); + auto res = esp_mqtt_client_set_uri(client.get(), " "); + REQUIRE(res == ESP_OK); + } + SECTION("Incorrect URI from user") { + http_parser_parse_url_StopIgnore(); + http_parser_parse_url_ExpectAnyArgsAndReturn(1); + http_parser_parse_url_ReturnThruPtr_u(&ret_uri); + auto res = esp_mqtt_client_set_uri(client.get(), " "); + REQUIRE(res == ESP_FAIL); + } + } + SECTION("After Start Client Is Cleanly destroyed") { + REQUIRE(esp_mqtt_client_start(client.get()) == ESP_OK); + // Only need to start the client, destroy is called automatically at the end of + // scope + } + } + SECTION("Client with all allocating configuration set") { + auto host = random_string(20); + auto path = random_string(10); + auto username = random_string(10); + auto client_id = random_string(10); + auto password = random_string(10); + auto lw_topic = random_string(10); + auto lw_msg = random_string(10); + + config.broker = {.address = { + .hostname = host.data(), + .path = path.data() + } + }; + config.credentials = { + .username = username.data(), + .client_id = client_id.data(), + .authentication = { + .password = password.data() + } + }; + config.session = { + .last_will { + .topic = lw_topic.data(), + .msg = lw_msg.data() + } + }; + auto client = unique_mqtt_client{esp_mqtt_client_init(&config)}; + REQUIRE(client != nullptr); + + } } } + diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 520a8c9..8d85125 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -352,6 +352,9 @@ typedef struct esp_mqtt_client_config_t { int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by ``buffer_size`` */ } buffer; /*!< Buffer size configuration.*/ + struct outbox_config_t { + uint64_t limit; /*!< Size limit for the outbox in bytes.*/ + } outbox; } esp_mqtt_client_config_t; /** @@ -430,7 +433,6 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client); */ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); - #ifdef __cplusplus #define esp_mqtt_client_subscribe esp_mqtt_client_subscribe_single @@ -449,6 +451,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); * * @return message_id of the subscribe message on success * -1 on failure + * -2 in case of full outbox. */ #define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \ char *: esp_mqtt_client_subscribe_single, \ @@ -473,6 +476,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); * * @return message_id of the subscribe message on success * -1 on failure + * -2 in case of full outbox. */ int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos); @@ -493,6 +497,7 @@ int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, * * @return message_id of the subscribe message on success * -1 on failure + * -2 in case of full outbox. */ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, const esp_mqtt_topic_t *topic_list, int size); @@ -536,7 +541,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, * @param retain retain flag * * @return message_id of the publish message (for QoS 0 message_id will always - * be zero) on success. -1 on failure. + * be zero) on success. -1 on failure, -2 in case of full outbox. */ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); @@ -561,7 +566,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, * @param store if true, all messages are enqueued; otherwise only QoS 1 and * QoS 2 are enqueued * - * @return message_id if queued successfully, -1 otherwise + * @return message_id if queued successfully, -1 on failure, -2 in case of full outbox. */ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, diff --git a/lib/include/mqtt_client_priv.h b/lib/include/mqtt_client_priv.h index 2bf08ca..0b5ee48 100644 --- a/lib/include/mqtt_client_priv.h +++ b/lib/include/mqtt_client_priv.h @@ -7,6 +7,7 @@ #ifndef _MQTT_CLIENT_PRIV_H_ #define _MQTT_CLIENT_PRIV_H_ +#include #include #include #include @@ -88,6 +89,7 @@ typedef struct { bool use_secure_element; void *ds_data; int message_retransmit_timeout; + uint64_t outbox_limit; esp_transport_handle_t transport; } mqtt_config_storage_t; diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index f3ad000..f09590d 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -130,8 +130,8 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length); uint16_t mqtt_get_id(uint8_t *buffer, size_t length); int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length); -esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size); -void mqtt_connection_destroy(mqtt_connection_t *connection); +esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size); +void mqtt_msg_buffer_destroy(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info); mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id); diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index e180fca..241b335 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -14,7 +14,7 @@ extern "C" { struct outbox_item; -typedef struct outbox_list_t *outbox_handle_t; +typedef struct outbox_t *outbox_handle_t; typedef struct outbox_item *outbox_item_handle_t; typedef struct outbox_message *outbox_message_handle_t; typedef long long outbox_tick_t; @@ -54,7 +54,7 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); pending_state_t outbox_item_get_pending(outbox_item_handle_t item); esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick); -int outbox_get_size(outbox_handle_t outbox); +uint64_t outbox_get_size(outbox_handle_t outbox); void outbox_destroy(outbox_handle_t outbox); void outbox_delete_all_items(outbox_handle_t outbox); diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index a7a91e7..781bbea 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -616,7 +616,7 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length) } } -esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size) +esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size) { memset(connection, 0, sizeof(mqtt_connection_t)); connection->buffer = (uint8_t *)calloc(0, buffer_size); @@ -627,7 +627,7 @@ esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size) return ESP_OK; } -void mqtt_connection_destroy(mqtt_connection_t *connection) +void mqtt_msg_buffer_destroy(mqtt_connection_t *connection) { if (connection) { free(connection->buffer); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 1915e59..ef2d889 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -1,4 +1,5 @@ #include "mqtt_outbox.h" +#include #include #include #include "mqtt_config.h" @@ -22,12 +23,19 @@ typedef struct outbox_item { STAILQ_HEAD(outbox_list_t, outbox_item); +struct outbox_t { + uint64_t size; + struct outbox_list_t *list; +}; outbox_handle_t outbox_init(void) { - outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t)); + outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t)); ESP_MEM_CHECK(TAG, outbox, return NULL); - STAILQ_INIT(outbox); + outbox->list = calloc(1, sizeof(struct outbox_list_t)); + ESP_MEM_CHECK(TAG, outbox->list, return NULL); //TODO: Free outbox on failure + outbox->size = 0; + STAILQ_INIT(outbox->list); return outbox; } @@ -50,7 +58,8 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl if (message->remaining_data) { memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len); } - STAILQ_INSERT_TAIL(outbox, item, next); + STAILQ_INSERT_TAIL(outbox->list, item, next); + outbox->size += item->len; ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox)); return item; } @@ -58,7 +67,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) { outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (item->msg_id == msg_id) { return item; } @@ -69,7 +78,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick) { outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (item->pending == pending) { if (tick) { *tick = item->tick; @@ -83,9 +92,10 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete) { outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (item == item_to_delete) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); + outbox->size -= item->len; free(item->buffer); free(item); return ESP_OK; @@ -109,9 +119,10 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) { outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { + STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) { if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); + outbox->size -= item->len; free(item->buffer); free(item); ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox)); @@ -154,10 +165,11 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t { int msg_id = -1; outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { + STAILQ_FOREACH(item, outbox->list, next) { if (current_tick - item->tick > timeout) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); free(item->buffer); + outbox->size -= item->len; msg_id = item->msg_id; free(item); return msg_id; @@ -171,10 +183,11 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou { int deleted_items = 0; outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { + STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) { if (current_tick - item->tick > timeout) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_REMOVE(outbox->list, item, outbox_item, next); free(item->buffer); + outbox->size -= item->len; free(item); deleted_items ++; } @@ -183,23 +196,17 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou return deleted_items; } -int outbox_get_size(outbox_handle_t outbox) +uint64_t outbox_get_size(outbox_handle_t outbox) { - int siz = 0; - outbox_item_handle_t item; - STAILQ_FOREACH(item, outbox, next) { - // Suppressing "use after free" warning as this could happen only if queue is in inconsistent state - // which never happens if STAILQ interface used - siz += item->len; // NOLINT(clang-analyzer-unix.Malloc) - } - return siz; + return outbox->size; } void outbox_delete_all_items(outbox_handle_t outbox) { outbox_item_handle_t item, tmp; - STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - STAILQ_REMOVE(outbox, item, outbox_item, next); + STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) { + STAILQ_REMOVE(outbox->list, item, outbox_item, next); + outbox->size -= item->len; free(item->buffer); free(item); } @@ -207,6 +214,7 @@ void outbox_delete_all_items(outbox_handle_t outbox) void outbox_destroy(outbox_handle_t outbox) { outbox_delete_all_items(outbox); + free(outbox->list); free(outbox); } diff --git a/mqtt_client.c b/mqtt_client.c index 6719a49..1792206 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1,10 +1,12 @@ -#include "mqtt_client.h" -#include "esp_transport.h" -#include "mqtt_client_priv.h" -#include "esp_log.h" #include +#include "esp_err.h" +#include "esp_log.h" #include "esp_heap_caps.h" +#include "esp_transport.h" +#include "mqtt_client.h" +#include "mqtt_client_priv.h" #include "mqtt_msg.h" +#include "mqtt_outbox.h" _Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type"); #ifdef ESP_EVENT_ANY_ID @@ -564,6 +566,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl goto _mqtt_set_config_failed; } } + client->config->outbox_limit = config->outbox.limit; esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict(client->config, config); MQTT_API_UNLOCK(client); @@ -811,7 +814,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co // use separate value for output buffer size if configured int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size; - if (mqtt_connection_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) { + if (mqtt_msg_buffer_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) { goto _mqtt_init_failed; } @@ -871,7 +874,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client) vEventGroupDelete(client->status_bits); } free(client->mqtt_state.in_buffer); - mqtt_connection_destroy(&client->mqtt_state.connection); + mqtt_msg_buffer_destroy(&client->mqtt_state.connection); if (client->api_lock) { vSemaphoreDelete(client->api_lock); } @@ -983,7 +986,7 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY); ret = esp_event_loop_run(client->config->event_loop_handle, 0); #else - return ESP_FAIL; + return ESP_FAIL; #endif if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 @@ -1277,7 +1280,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) // If the message was valid, get the type, quality of service and id of the message msg_type = mqtt_get_type(client->mqtt_state.in_buffer); msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, read_len); #endif @@ -1456,7 +1459,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item } } else if (client->mqtt_state.pending_publish_qos > 0) { #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_increment_packet_counter(client); } #endif @@ -1808,6 +1811,10 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, ESP_LOGE(TAG, "Client was not initialized"); return -1; } + + if (client->config->outbox_limit > 0 && outbox_get_size(client->outbox) > client->config->outbox_limit) { + return -2; + } MQTT_API_LOCK(client); if (client->state != MQTT_STATE_CONNECTED) { ESP_LOGE(TAG, "Client has not connected"); @@ -1828,16 +1835,16 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, return -1; } mqtt5_msg_subscribe(&client->mqtt_state.connection, - topic_list, size, - &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); + topic_list, size, + &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->subscribe_property_info = NULL; } #endif } else { mqtt_msg_subscribe(&client->mqtt_state.connection, - topic_list, size, - &client->mqtt_state.pending_msg_id); + topic_list, size, + &client->mqtt_state.pending_msg_id); } if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Subscribe message cannot be created"); @@ -1885,16 +1892,16 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 mqtt5_msg_unsubscribe(&client->mqtt_state.connection, - topic, - &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); + topic, + &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->unsubscribe_property_info = NULL; } #endif } else { mqtt_msg_unsubscribe(&client->mqtt_state.connection, - topic, - &client->mqtt_state.pending_msg_id); + topic, + &client->mqtt_state.pending_msg_id); } if (client->mqtt_state.connection.outbound_message.length == 0) { MQTT_API_UNLOCK(client); @@ -1928,18 +1935,18 @@ static int make_publish(esp_mqtt_client_handle_t client, const char *topic, cons if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 mqtt5_msg_publish(&client->mqtt_state.connection, - topic, data, len, - qos, retain, - &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); + topic, data, len, + qos, retain, + &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->publish_property_info = NULL; } #endif } else { mqtt_msg_publish(&client->mqtt_state.connection, - topic, data, len, - qos, retain, - &pending_msg_id); + topic, data, len, + qos, retain, + &pending_msg_id); } if (client->mqtt_state.connection.outbound_message.length == 0) { @@ -2008,6 +2015,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } + if (client->config->outbox_limit > 0 && qos > 0) { + if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) { + MQTT_API_UNLOCK(client); + return -2; + } + } + int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false); if (pending_msg_id < 0) { MQTT_API_UNLOCK(client); @@ -2064,7 +2078,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, if (qos > 0) { #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_increment_packet_counter(client); } #endif @@ -2102,6 +2116,13 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } + if (client->config->outbox_limit > 0) { + if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) { + return -2; + } + + } + MQTT_API_LOCK(client); #ifdef MQTT_PROTOCOL_5 if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {