From 122875bf8a959719b753fb0b04f3c9a271d6f978 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Wed, 17 Aug 2022 13:41:35 -0300 Subject: [PATCH] refactor: Group access to output buffer in mqtt_connection_t - Moves mqtt_connect_info to mqtt_connection_t. - Removes outbound_message in favor of accessing it trough connection. --- lib/include/mqtt_client_priv.h | 6 +- lib/include/mqtt_msg.h | 27 ++- lib/mqtt5_msg.c | 164 ++++++++-------- lib/mqtt_msg.c | 117 +++++++----- mqtt5_client.c | 35 ++-- mqtt_client.c | 336 ++++++++++++++++----------------- 6 files changed, 343 insertions(+), 342 deletions(-) diff --git a/lib/include/mqtt_client_priv.h b/lib/include/mqtt_client_priv.h index bf54f13..2bf08ca 100644 --- a/lib/include/mqtt_client_priv.h +++ b/lib/include/mqtt_client_priv.h @@ -48,13 +48,10 @@ extern "C" { typedef struct mqtt_state { uint8_t *in_buffer; - uint8_t *out_buffer; int in_buffer_length; - int out_buffer_length; size_t message_length; size_t in_buffer_read_len; - mqtt_message_t *outbound_message; - mqtt_connection_t mqtt_connection; + mqtt_connection_t connection; uint16_t pending_msg_id; int pending_msg_type; int pending_publish_qos; @@ -106,7 +103,6 @@ struct esp_mqtt_client { esp_transport_handle_t transport; mqtt_config_storage_t *config; mqtt_state_t mqtt_state; - mqtt_connect_info_t connect_info; mqtt_client_state_t state; uint64_t refresh_connection_tick; int64_t keepalive_tick; diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 60fcf8c..f3ad000 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -68,16 +68,6 @@ typedef struct mqtt_message { size_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */ } mqtt_message_t; -typedef struct mqtt_connection { - mqtt_message_t message; -#if MQTT_MSG_ID_INCREMENTAL - uint16_t last_message_id; /*!< last used id if incremental message id configured */ -#endif - uint8_t *buffer; - size_t buffer_length; - -} mqtt_connection_t; - typedef struct mqtt_connect_info { char *client_id; char *username; @@ -90,9 +80,18 @@ typedef struct mqtt_connect_info { int will_retain; int clean_session; esp_mqtt_protocol_ver_t protocol_ver; - } mqtt_connect_info_t; +typedef struct mqtt_connection { + mqtt_message_t outbound_message; +#if MQTT_MSG_ID_INCREMENTAL + uint16_t last_message_id; /*!< last used id if incremental message id configured */ +#endif + uint8_t *buffer; + size_t buffer_length; + mqtt_connect_info_t information; + +} mqtt_connection_t; static inline int mqtt_get_type(const uint8_t *buffer) { @@ -123,7 +122,6 @@ static inline int mqtt_get_retain(const uint8_t *buffer) return (buffer[0] & 0x01); } -void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length); bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length); size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len); char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length); @@ -132,6 +130,9 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length); uint16_t mqtt_get_id(uint8_t *buffer, size_t length); int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length); +esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size); +void mqtt_connection_destroy(mqtt_connection_t *connection); + mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info); mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id); mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id); @@ -143,8 +144,6 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char * mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection); - - #ifdef __cplusplus } #endif diff --git a/lib/mqtt5_msg.c b/lib/mqtt5_msg.c index 16a479b..b6a483f 100644 --- a/lib/mqtt5_msg.c +++ b/lib/mqtt5_msg.c @@ -69,8 +69,8 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope generate_variable_len(len, &len_bytes, encoded_lens); int offset = len_bytes - 1; - connection->message.length += offset; - if (connection->message.length > connection->buffer_length) { + connection->outbound_message.length += offset; + if (connection->outbound_message.length > connection->buffer_length) { return -1; } @@ -89,33 +89,33 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope static int append_property(mqtt_connection_t *connection, uint8_t property_type, uint8_t len_occupy, const char *data, size_t data_len) { - if ((connection->message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) { + if ((connection->outbound_message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) { return -1; } - size_t origin_message_len = connection->message.length; + size_t origin_message_len = connection->outbound_message.length; if (property_type) { - connection->buffer[connection->message.length ++] = property_type; + connection->buffer[connection->outbound_message.length ++] = property_type; } if (len_occupy == 0) { uint8_t encoded_lens[4] = {0}, len_bytes = 0; generate_variable_len(data_len, &len_bytes, encoded_lens); for (int j = 0; j < len_bytes; j ++) { - connection->buffer[connection->message.length ++] = encoded_lens[j]; + connection->buffer[connection->outbound_message.length ++] = encoded_lens[j]; } } else { for (int i = 1; i <= len_occupy; i ++) { - connection->buffer[connection->message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff; + connection->buffer[connection->outbound_message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff; } } if (data) { - memcpy(connection->buffer + connection->message.length, data, data_len); - connection->message.length += data_len; + memcpy(connection->buffer + connection->outbound_message.length, data, data_len); + connection->outbound_message.length += data_len; } - return connection->message.length - origin_message_len; + return connection->outbound_message.length - origin_message_len; } static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id) @@ -130,36 +130,36 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag #endif } - if (connection->message.length + 2 > connection->buffer_length) { + if (connection->outbound_message.length + 2 > connection->buffer_length) { return 0; } - MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], message_id) + MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], message_id) return message_id; } static int init_message(mqtt_connection_t *connection) { - connection->message.length = MQTT5_MAX_FIXED_HEADER_SIZE; + connection->outbound_message.length = MQTT5_MAX_FIXED_HEADER_SIZE; return MQTT5_MAX_FIXED_HEADER_SIZE; } static mqtt_message_t *fail_message(mqtt_connection_t *connection) { - connection->message.data = connection->buffer; - connection->message.length = 0; - return &connection->message; + connection->outbound_message.data = connection->buffer; + connection->outbound_message.length = 0; + return &connection->outbound_message; } static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain) { - int message_length = connection->message.length - MQTT5_MAX_FIXED_HEADER_SIZE; + int message_length = connection->outbound_message.length - MQTT5_MAX_FIXED_HEADER_SIZE; int total_length = message_length; uint8_t encoded_lens[4] = {0}, len_bytes = 0; // Check if we have fragmented message and update total_len - if (connection->message.fragmented_msg_total_length) { - total_length = connection->message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE; + if (connection->outbound_message.fragmented_msg_total_length) { + total_length = connection->outbound_message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE; } // Encode MQTT message length @@ -171,10 +171,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int } // Save the header bytes - connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) + connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) int offs = MQTT5_MAX_FIXED_HEADER_SIZE - 1 - len_bytes; - connection->message.data = connection->buffer + offs; - connection->message.fragmented_msg_data_offset -= offs; + connection->outbound_message.data = connection->buffer + offs; + connection->outbound_message.fragmented_msg_data_offset -= offs; // type byte connection->buffer[offs ++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); // length bytes @@ -182,7 +182,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int connection->buffer[offs ++] = encoded_lens[j]; } - return &connection->message; + return &connection->outbound_message; } static esp_err_t mqtt5_msg_set_user_property(mqtt5_user_property_handle_t *user_property, char *key, size_t key_len, char *value, size_t value_len) @@ -465,24 +465,24 @@ char *mqtt5_get_puback_data(uint8_t *buffer, size_t *length, mqtt5_user_property mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info, esp_mqtt5_connection_property_storage_t *property, esp_mqtt5_connection_will_property_storage_t *will_property) { init_message(connection); - connection->buffer[connection->message.length ++] = 0; // Variable header length MSB + connection->buffer[connection->outbound_message.length ++] = 0; // Variable header length MSB /* Defaults to protocol version 5 values */ - connection->buffer[connection->message.length ++] = 4; // Variable header length LSB - memcpy(&connection->buffer[connection->message.length], "MQTT", 4); // Protocol name - connection->message.length += 4; - connection->buffer[connection->message.length ++] = 5; // Protocol version + connection->buffer[connection->outbound_message.length ++] = 4; // Variable header length LSB + memcpy(&connection->buffer[connection->outbound_message.length], "MQTT", 4); // Protocol name + connection->outbound_message.length += 4; + connection->buffer[connection->outbound_message.length ++] = 5; // Protocol version - int flags_offset = connection->message.length; - connection->buffer[connection->message.length ++] = 0; // Flags - MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], info->keepalive) // Keep-alive + int flags_offset = connection->outbound_message.length; + connection->buffer[connection->outbound_message.length ++] = 0; // Flags + MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], info->keepalive) // Keep-alive if (info->clean_session) { connection->buffer[flags_offset] |= MQTT5_CONNECT_FLAG_CLEAN_SESSION; } //Add properties - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property->session_expiry_interval) { APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, property->session_expiry_interval), fail_message(connection)); } @@ -508,7 +508,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection)); } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); if (info->client_id != NULL && info->client_id[0] != '\0') { APPEND_CHECK(append_property(connection, 0, 2, info->client_id, strlen(info->client_id)), fail_message(connection)); @@ -518,8 +518,8 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in //Add will properties if (info->will_topic != NULL && info->will_topic[0] != '\0') { - properties_offset = connection->message.length; - connection->message.length ++; + properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (will_property->will_delay_interval) { APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_WILL_DELAY_INTERVAL, 4, NULL, will_property->will_delay_interval), fail_message(connection)); } @@ -545,7 +545,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection)); } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); APPEND_CHECK(append_property(connection, 0, 2, info->will_topic, strlen(info->will_topic)), fail_message(connection)); APPEND_CHECK(append_property(connection, 0, 2, info->will_message, info->will_length), fail_message(connection)); @@ -742,8 +742,8 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top *message_id = 0; } - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property) { if (property->payload_format_indicator) { @@ -788,20 +788,20 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_CONTENT_TYPE, 2, property->content_type, strlen(property->content_type)), fail_message(connection)); } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); - if (connection->message.length + data_length > connection->buffer_length) { + if (connection->outbound_message.length + data_length > connection->buffer_length) { // Not enough size in buffer -> fragment this message - connection->message.fragmented_msg_data_offset = connection->message.length; - memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length); - connection->message.length = connection->buffer_length; - connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset; + connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length; + memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length); + connection->outbound_message.length = connection->buffer_length; + connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset; } else { if (data != NULL) { - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; + memcpy(connection->buffer + connection->outbound_message.length, data, data_length); + connection->outbound_message.length += data_length; } - connection->message.fragmented_msg_total_length = 0; + connection->outbound_message.fragmented_msg_total_length = 0; } return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } @@ -858,8 +858,8 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt return fail_message(connection); } - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property) { if (property->subscribe_id) { @@ -873,7 +873,7 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt } } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); for (int topic_number = 0; topic_number < size; ++topic_number) { if (topic_list[topic_number].filter[0] == '\0') { @@ -897,23 +897,23 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection)); } - if (connection->message.length + 1 > connection->buffer_length) { + if (connection->outbound_message.length + 1 > connection->buffer_length) { return fail_message(connection); } - connection->buffer[connection->message.length] = 0; + connection->buffer[connection->outbound_message.length] = 0; if (property) { if (property->retain_handle > 0 && property->retain_handle < 3) { - connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4; + connection->buffer[connection->outbound_message.length] |= (property->retain_handle & 3) << 4; } if (property->no_local_flag) { - connection->buffer[connection->message.length] |= (property->no_local_flag << 2); + connection->buffer[connection->outbound_message.length] |= (property->no_local_flag << 2); } if (property->retain_as_published_flag) { - connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3); + connection->buffer[connection->outbound_message.length] |= (property->retain_as_published_flag << 3); } } - connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3); - connection->message.length ++; + connection->buffer[connection->outbound_message.length] |= (topic_list[topic_number].qos & 3); + connection->outbound_message.length ++; } return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); } @@ -921,10 +921,10 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info) { init_message(connection); - int reason_offset = connection->message.length; - connection->buffer[connection->message.length ++] = 0; - int properties_offset = connection->message.length; - connection->message.length ++; + int reason_offset = connection->outbound_message.length; + connection->buffer[connection->outbound_message.length ++] = 0; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (disconnect_property_info) { if (disconnect_property_info->session_expiry_interval) { APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, disconnect_property_info->session_expiry_interval), fail_message(connection)); @@ -940,7 +940,7 @@ mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_di connection->buffer[reason_offset] = disconnect_property_info->disconnect_reason; } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); } @@ -956,8 +956,8 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char return fail_message(connection); } - int properties_offset = connection->message.length; - connection->message.length ++; + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; if (property) { if (property->user_property) { mqtt5_user_property_item_t item; @@ -968,7 +968,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char } } - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); if (property && property->is_share_subscribe) { uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name); char *shared_topic = calloc(1, shared_topic_size); @@ -996,10 +996,10 @@ mqtt_message_t *mqtt5_msg_puback(mqtt_connection_t *connection, uint16_t message if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); } @@ -1009,10 +1009,10 @@ mqtt_message_t *mqtt5_msg_pubrec(mqtt_connection_t *connection, uint16_t message if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); } @@ -1022,10 +1022,10 @@ mqtt_message_t *mqtt5_msg_pubrel(mqtt_connection_t *connection, uint16_t message if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); } @@ -1035,9 +1035,9 @@ mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t messag if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } - connection->buffer[connection->message.length ++] = 0; // Regard it is success - int properties_offset = connection->message.length; - connection->message.length ++; - APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); + connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success + int properties_offset = connection->outbound_message.length; + connection->outbound_message.length ++; + APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection)); return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); } diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index 8b78ae3..a7a91e7 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -48,14 +48,14 @@ enum mqtt_connect_flag { static int append_string(mqtt_connection_t *connection, const char *string, int len) { - if (connection->message.length + len + 2 > connection->buffer_length) { + if (connection->outbound_message.length + len + 2 > connection->buffer_length) { return -1; } - connection->buffer[connection->message.length++] = len >> 8; - connection->buffer[connection->message.length++] = len & 0xff; - memcpy(connection->buffer + connection->message.length, string, len); - connection->message.length += len; + connection->buffer[connection->outbound_message.length++] = len >> 8; + connection->buffer[connection->outbound_message.length++] = len & 0xff; + memcpy(connection->buffer + connection->outbound_message.length, string, len); + connection->outbound_message.length += len; return len + 2; } @@ -72,38 +72,38 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag #endif } - if (connection->message.length + 2 > connection->buffer_length) { + if (connection->outbound_message.length + 2 > connection->buffer_length) { return 0; } - connection->buffer[connection->message.length++] = message_id >> 8; - connection->buffer[connection->message.length++] = message_id & 0xff; + connection->buffer[connection->outbound_message.length++] = message_id >> 8; + connection->buffer[connection->outbound_message.length++] = message_id & 0xff; return message_id; } -static int init_message(mqtt_connection_t *connection) +static int set_message_header_size(mqtt_connection_t *connection) { - connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; + connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE; return MQTT_MAX_FIXED_HEADER_SIZE; } static mqtt_message_t *fail_message(mqtt_connection_t *connection) { - connection->message.data = connection->buffer; - connection->message.length = 0; - return &connection->message; + connection->outbound_message.data = connection->buffer; + connection->outbound_message.length = 0; + return &connection->outbound_message; } static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain) { - int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE; int total_length = message_length; int encoded_length = 0; uint8_t encoded_lens[4] = {0}; // Check if we have fragmented message and update total_len - if (connection->message.fragmented_msg_total_length) { - total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE; + if (connection->outbound_message.fragmented_msg_total_length) { + total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE; } // Encode MQTT message length @@ -124,10 +124,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int } // Save the header bytes - connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) + connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes; - connection->message.data = connection->buffer + offs; - connection->message.fragmented_msg_data_offset -= offs; + connection->outbound_message.data = connection->buffer + offs; + connection->outbound_message.fragmented_msg_data_offset -= offs; // type byte connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); // length bytes @@ -135,14 +135,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int connection->buffer[offs++] = encoded_lens[j]; } - return &connection->message; -} - -void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length) -{ - memset(connection, 0, sizeof(mqtt_connection_t)); - connection->buffer = buffer; - connection->buffer_length = buffer_length; + return &connection->outbound_message; } size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len) @@ -347,7 +340,7 @@ uint16_t mqtt_get_id(uint8_t *buffer, size_t length) mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info) { - init_message(connection); + set_message_header_size(connection); int header_len; if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) { @@ -356,11 +349,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE; } - if (connection->message.length + header_len > connection->buffer_length) { + if (connection->outbound_message.length + header_len > connection->buffer_length) { return fail_message(connection); } - char *variable_header = (char *)(connection->buffer + connection->message.length); - connection->message.length += header_len; + char *variable_header = (char *)(connection->buffer + connection->outbound_message.length); + connection->outbound_message.length += header_len; int header_idx = 0; variable_header[header_idx++] = 0; // Variable header length MSB @@ -445,7 +438,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id) { - init_message(connection); + set_message_header_size(connection); if (topic == NULL || topic[0] == '\0') { return fail_message(connection); @@ -468,16 +461,16 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi } if (data != NULL) { - if (connection->message.length + data_length > connection->buffer_length) { + if (connection->outbound_message.length + data_length > connection->buffer_length) { // Not enough size in buffer -> fragment this message - connection->message.fragmented_msg_data_offset = connection->message.length; - memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length); - connection->message.length = connection->buffer_length; - connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset; + connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length; + memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length); + connection->outbound_message.length = connection->buffer_length; + connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset; } else { - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; - connection->message.fragmented_msg_total_length = 0; + memcpy(connection->buffer + connection->outbound_message.length, data, data_length); + connection->outbound_message.length += data_length; + connection->outbound_message.fragmented_msg_total_length = 0; } } return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); @@ -485,7 +478,7 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -494,7 +487,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -503,7 +496,7 @@ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -512,7 +505,7 @@ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id) { - init_message(connection); + set_message_header_size(connection); if (append_message_id(connection, message_id) == 0) { return fail_message(connection); } @@ -521,7 +514,7 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) { - init_message(connection); + set_message_header_size(connection); if ((*message_id = append_message_id(connection, 0)) == 0) { return fail_message(connection); @@ -536,11 +529,11 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt return fail_message(connection); } - if (connection->message.length + 1 > connection->buffer_length) { + if (connection->outbound_message.length + 1 > connection->buffer_length) { return fail_message(connection); } - connection->buffer[connection->message.length] = topic_list[topic_number].qos; - connection->message.length ++; + connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos; + connection->outbound_message.length ++; } return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); @@ -548,7 +541,7 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id) { - init_message(connection); + set_message_header_size(connection); if (topic == NULL || topic[0] == '\0') { return fail_message(connection); @@ -567,19 +560,19 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char * mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection) { - init_message(connection); + set_message_header_size(connection); return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); } mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection) { - init_message(connection); + set_message_header_size(connection); return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); } mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection) { - init_message(connection); + set_message_header_size(connection); return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); } @@ -622,3 +615,23 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length) return 0; } } + +esp_err_t mqtt_connection_init(mqtt_connection_t *connection, int buffer_size) +{ + memset(connection, 0, sizeof(mqtt_connection_t)); + connection->buffer = (uint8_t *)calloc(0, buffer_size); + if (!connection->buffer) { + return ESP_ERR_NO_MEM; + } + connection->buffer_length = buffer_size; + return ESP_OK; +} + +void mqtt_connection_destroy(mqtt_connection_t *connection) +{ + if (connection) { + free(connection->buffer); + } +} + + diff --git a/mqtt5_client.c b/mqtt5_client.c index 82d4f41..851bb5a 100644 --- a/mqtt5_client.c +++ b/mqtt5_client.c @@ -18,7 +18,7 @@ static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_ void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client) { - bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data); + bool msg_dup = mqtt5_get_dup(client->mqtt_state.connection.outbound_message.data); if (msg_dup == false) { client->send_publish_packet_count ++; ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count); @@ -35,7 +35,7 @@ void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBCOMP return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); size_t msg_data_len = client->mqtt_state.in_buffer_read_len; client->event.data = mqtt5_get_pubcomp_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property); @@ -47,7 +47,7 @@ void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); size_t msg_data_len = client->mqtt_state.in_buffer_read_len; client->event.data = mqtt5_get_puback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property); @@ -59,7 +59,7 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_UNSUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); size_t msg_data_len = client->mqtt_state.in_buffer_read_len; client->event.data = mqtt5_get_unsuback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property); @@ -71,7 +71,7 @@ void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client) void esp_mqtt5_parse_suback(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { ESP_LOGI(TAG, "MQTT_MSG_TYPE_SUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); } } @@ -81,13 +81,14 @@ esp_err_t esp_mqtt5_parse_connack(esp_mqtt5_client_handle_t client, int *connect size_t len = client->mqtt_state.in_buffer_read_len; client->mqtt_state.in_buffer_read_len = 0; uint8_t ack_flag = 0; - if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) { + if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->mqtt_state. + connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) { ESP_LOGE(TAG, "Failed to parse CONNACK packet"); return ESP_FAIL; } if (*connect_rsp_code == MQTT_CONNECTION_ACCEPTED) { ESP_LOGD(TAG, "Connected"); - client->event.session_present = ack_flag & 0x01; + client->event.session_present = ack_flag & 0x01; return ESP_OK; } esp_mqtt5_print_error_code(client, *connect_rsp_code); @@ -116,7 +117,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t * *msg_topic = esp_mqtt5_client_get_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, msg_topic_len); if (!*msg_topic) { ESP_LOGE(TAG, "%s: esp_mqtt5_client_get_topic_alias() failed", __func__); - return ESP_FAIL; + return ESP_FAIL; } } else { if (esp_mqtt5_client_update_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, *msg_topic, *msg_topic_len) != ESP_OK) { @@ -139,7 +140,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t * esp_err_t esp_mqtt5_create_default_config(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { client->event.property = calloc(1, sizeof(esp_mqtt5_event_property_t)); ESP_MEM_CHECK(TAG, client->event.property, return ESP_FAIL) client->mqtt5_config = calloc(1, sizeof(mqtt5_config_storage_t)); @@ -304,7 +305,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q void esp_mqtt5_client_destory(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { if (client->mqtt5_config) { free(client->mqtt5_config->will_property_info.content_type); free(client->mqtt5_config->will_property_info.response_topic); @@ -416,7 +417,7 @@ esp_err_t esp_mqtt5_client_set_publish_property(esp_mqtt5_client_handle_t client MQTT_API_LOCK(client); /* Check protocol version */ - if(client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -445,7 +446,7 @@ esp_err_t esp_mqtt5_client_set_subscribe_property(esp_mqtt5_client_handle_t clie MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -482,7 +483,7 @@ esp_err_t esp_mqtt5_client_set_unsubscribe_property(esp_mqtt5_client_handle_t cl MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -513,7 +514,7 @@ esp_err_t esp_mqtt5_client_set_disconnect_property(esp_mqtt5_client_handle_t cli MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -556,7 +557,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client MQTT_API_LOCK(client); /* Check protocol version */ - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { ESP_LOGE(TAG, "MQTT protocol version is not v5"); MQTT_API_UNLOCK(client); return ESP_FAIL; @@ -572,7 +573,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client return ESP_FAIL; } else { client->mqtt5_config->connect_property_info.maximum_packet_size = connect_property->maximum_packet_size; - } + } } else { client->mqtt5_config->connect_property_info.maximum_packet_size = client->mqtt_state.in_buffer_length; } @@ -757,4 +758,4 @@ void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_pro } } free(user_property); -} \ No newline at end of file +} diff --git a/mqtt_client.c b/mqtt_client.c index 295e806..6719a49 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -4,7 +4,7 @@ #include "esp_log.h" #include #include "esp_heap_caps.h" - +#include "mqtt_msg.h" _Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type"); #ifdef ESP_EVENT_ANY_ID @@ -396,70 +396,70 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl err = ESP_ERR_NO_MEM; ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.hostname, &client->config->host), goto _mqtt_set_config_failed); ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.path, &client->config->path), goto _mqtt_set_config_failed); - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->connect_info.username), goto _mqtt_set_config_failed); - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->connect_info.password), goto _mqtt_set_config_failed); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->mqtt_state.connection.information.username), goto _mqtt_set_config_failed); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->mqtt_state.connection.information.password), goto _mqtt_set_config_failed); if (!config->credentials.set_null_client_id) { if (config->credentials.client_id) { - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->connect_info.client_id), goto _mqtt_set_config_failed); - } else if (client->connect_info.client_id == NULL) { - client->connect_info.client_id = platform_create_id_string(); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->mqtt_state.connection.information.client_id), goto _mqtt_set_config_failed); + } else if (client->mqtt_state.connection.information.client_id == NULL) { + client->mqtt_state.connection.information.client_id = platform_create_id_string(); } - ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed); - ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id); + ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.client_id, goto _mqtt_set_config_failed); + ESP_LOGD(TAG, "MQTT client_id=%s", client->mqtt_state.connection.information.client_id); } ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.uri, &client->config->uri), goto _mqtt_set_config_failed); - ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->connect_info.will_topic), goto _mqtt_set_config_failed); + ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->mqtt_state.connection.information.will_topic), goto _mqtt_set_config_failed); if (config->session.last_will.msg_len && config->session.last_will.msg) { - free(client->connect_info.will_message); - client->connect_info.will_message = malloc(config->session.last_will.msg_len); - ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); - memcpy(client->connect_info.will_message, config->session.last_will.msg, config->session.last_will.msg_len); - client->connect_info.will_length = config->session.last_will.msg_len; + free(client->mqtt_state.connection.information.will_message); + client->mqtt_state.connection.information.will_message = malloc(config->session.last_will.msg_len); + ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed); + memcpy(client->mqtt_state.connection.information.will_message, config->session.last_will.msg, config->session.last_will.msg_len); + client->mqtt_state.connection.information.will_length = config->session.last_will.msg_len; } else if (config->session.last_will.msg) { - free(client->connect_info.will_message); - client->connect_info.will_message = strdup(config->session.last_will.msg); - ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); - client->connect_info.will_length = strlen(config->session.last_will.msg); + free(client->mqtt_state.connection.information.will_message); + client->mqtt_state.connection.information.will_message = strdup(config->session.last_will.msg); + ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed); + client->mqtt_state.connection.information.will_length = strlen(config->session.last_will.msg); } if (config->session.last_will.qos) { - client->connect_info.will_qos = config->session.last_will.qos; + client->mqtt_state.connection.information.will_qos = config->session.last_will.qos; } if (config->session.last_will.retain) { - client->connect_info.will_retain = config->session.last_will.retain; + client->mqtt_state.connection.information.will_retain = config->session.last_will.retain; } - if (config->session.disable_clean_session == client->connect_info.clean_session) { - client->connect_info.clean_session = !config->session.disable_clean_session; - if (!client->connect_info.clean_session && config->credentials.set_null_client_id) { + if (config->session.disable_clean_session == client->mqtt_state.connection.information.clean_session) { + client->mqtt_state.connection.information.clean_session = !config->session.disable_clean_session; + if (!client->mqtt_state.connection.information.clean_session && config->credentials.set_null_client_id) { ESP_LOGE(TAG, "Clean Session flag must be true if client has a null id"); } } if (config->session.keepalive) { - client->connect_info.keepalive = config->session.keepalive; + client->mqtt_state.connection.information.keepalive = config->session.keepalive; } - if (client->connect_info.keepalive == 0) { - client->connect_info.keepalive = MQTT_KEEPALIVE_TICK; + if (client->mqtt_state.connection.information.keepalive == 0) { + client->mqtt_state.connection.information.keepalive = MQTT_KEEPALIVE_TICK; } if (config->session.disable_keepalive) { // internal `keepalive` value (in connect_info) is in line with 3.1.2.10 Keep Alive from mqtt spec: // * keepalive=0: Keep alive mechanism disabled (server not to disconnect the client on its inactivity) // * period in seconds to send a Control packet if inactive - client->connect_info.keepalive = 0; + client->mqtt_state.connection.information.keepalive = 0; } if (config->session.protocol_ver) { - client->connect_info.protocol_ver = config->session.protocol_ver; + client->mqtt_state.connection.information.protocol_ver = config->session.protocol_ver; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_UNDEFINED) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_UNDEFINED) { #ifdef MQTT_PROTOCOL_311 - client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1_1; + client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1_1; #else - client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1; + client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1; #endif - } else if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + } else if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifndef MQTT_PROTOCOL_5 ESP_LOGE(TAG, "Please first enable MQTT_PROTOCOL_5 feature in menuconfig"); goto _mqtt_set_config_failed; @@ -485,9 +485,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } else { client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS; } - if (config->network.transport) { - client->config->transport = config->network.transport; - } if (config->broker.verification.alpn_protos) { for (int i = 0; i < client->config->num_alpn_protos; i++) { @@ -592,15 +589,15 @@ void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) } free(client->config->alpn_protos); free(client->config->clientkey_password); - free(client->connect_info.will_topic); - free(client->connect_info.will_message); - free(client->connect_info.client_id); - free(client->connect_info.username); - free(client->connect_info.password); + free(client->mqtt_state.connection.information.will_topic); + free(client->mqtt_state.connection.information.will_message); + free(client->mqtt_state.connection.information.client_id); + free(client->mqtt_state.connection.information.username); + free(client->mqtt_state.connection.information.password); #ifdef MQTT_PROTOCOL_5 esp_mqtt5_client_destory(client); #endif - memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t)); + memset(&client->mqtt_state.connection.information, 0, sizeof(mqtt_connect_info_t)); #ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP if (client->config->event_loop_handle) { esp_event_loop_delete(client->config->event_loop_handle); @@ -620,8 +617,8 @@ static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout) static esp_err_t process_keepalive(esp_mqtt_client_handle_t client) { - if (client->connect_info.keepalive > 0) { - const uint64_t keepalive_ms = client->connect_info.keepalive * 1000; + if (client->mqtt_state.connection.information.keepalive > 0) { + const uint64_t keepalive_ms = client->mqtt_state.connection.information.keepalive * 1000; if (client->wait_for_ping_resp == true ) { if (has_timed_out(client->keepalive_tick, keepalive_ms)) { @@ -648,10 +645,10 @@ static esp_err_t process_keepalive(esp_mqtt_client_handle_t client) static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client) { - int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length; + int wlen = 0, widx = 0, len = client->mqtt_state.connection.outbound_message.length; while (len > 0) { wlen = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data + widx, + (char *)client->mqtt_state.connection.outbound_message.data + widx, len, client->config->network_timeout_ms); if (wlen < 0) { @@ -674,29 +671,29 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m { int read_len, connect_rsp_code = 0; client->wait_for_ping_resp = false; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_connect(&client->mqtt_state.mqtt_connection, - &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info); + mqtt5_msg_connect(&client->mqtt_state.connection, + &client->mqtt_state.connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, - &client->connect_info); + mqtt_msg_connect(&client->mqtt_state.connection, + &client->mqtt_state.connection.information); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Connect message cannot be created"); return ESP_FAIL; } - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); + client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.connection.outbound_message.data, + client->mqtt_state.connection.outbound_message.length); #endif } else { - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.connection.outbound_message.data, + client->mqtt_state.connection.outbound_message.length); } ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X", client->mqtt_state.pending_msg_type, @@ -724,7 +721,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len); return ESP_FAIL; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) { client->send_publish_packet_count = 0; @@ -807,6 +804,24 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co if (!create_client_data(client)) { goto _mqtt_init_failed; } + int buffer_size = config->buffer.size; + if (buffer_size <= 0) { + buffer_size = MQTT_BUFFER_SIZE_BYTE; + } + + // use separate value for output buffer size if configured + int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size; + if (mqtt_connection_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) { + goto _mqtt_init_failed; + } + + client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size); + ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed); + client->mqtt_state.in_buffer_length = buffer_size; + client->outbox = outbox_init(); + ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed); + client->status_bits = xEventGroupCreate(); + ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed); if (esp_mqtt_set_config(client, config) != ESP_OK) { goto _mqtt_init_failed; @@ -826,27 +841,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co client->reconnect_tick = platform_tick_get_ms(); client->refresh_connection_tick = platform_tick_get_ms(); client->wait_for_ping_resp = false; - int buffer_size = config->buffer.size; - if (buffer_size <= 0) { - buffer_size = MQTT_BUFFER_SIZE_BYTE; - } - // use separate value for output buffer size if configured - int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size; - - client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size); - ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed); - client->mqtt_state.in_buffer_length = buffer_size; - client->mqtt_state.out_buffer = (uint8_t *)malloc(out_buffer_size); - ESP_MEM_CHECK(TAG, client->mqtt_state.out_buffer, goto _mqtt_init_failed); - - client->mqtt_state.out_buffer_length = out_buffer_size; - client->outbox = outbox_init(); - ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed); - client->status_bits = xEventGroupCreate(); - ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed); - - mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, - client->mqtt_state.out_buffer_length); #ifdef MQTT_PROTOCOL_5 if (esp_mqtt5_create_default_config(client) != ESP_OK) { goto _mqtt_init_failed; @@ -877,7 +871,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client) vEventGroupDelete(client->status_bits); } free(client->mqtt_state.in_buffer); - free(client->mqtt_state.out_buffer); + mqtt_connection_destroy(&client->mqtt_state.connection); if (client->api_lock) { vSemaphoreDelete(client->api_lock); } @@ -944,9 +938,9 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u if (pass) { pass[0] = 0; //terminal username pass ++; - client->connect_info.password = strdup(pass); + client->mqtt_state.connection.information.password = strdup(pass); } - client->connect_info.username = strdup(user_info); + client->mqtt_state.connection.information.username = strdup(user_info); free(user_info); } @@ -958,7 +952,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->event.msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); #endif @@ -982,16 +976,16 @@ esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mq static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) { client->event.client = client; - client->event.protocol_ver = client->connect_info.protocol_ver; + client->event.protocol_ver = client->mqtt_state.connection.information.protocol_ver; esp_err_t ret = ESP_FAIL; -#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP - esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY); - ret = esp_event_loop_run(client->config->event_loop_handle, 0); + #ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP + esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY); + ret = esp_event_loop_run(client->config->event_loop_handle, 0); #else return ESP_FAIL; #endif - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 esp_mqtt5_client_delete_user_property(client->event.property->user_property); client->event.property->user_property = NULL; @@ -1009,7 +1003,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) size_t msg_data_offset = 0; char *msg_topic = NULL, *msg_data = NULL; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 if (esp_mqtt5_get_publish_data(client, msg_buf, msg_read_len, &msg_topic, &msg_topic_len, &msg_data, &msg_data_len) != ESP_OK) { ESP_LOGE(TAG, "%s: esp_mqtt5_get_publish_data() failed", __func__); @@ -1034,7 +1028,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) } // post data event client->event.retain = mqtt_get_retain(msg_buf); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 client->event.msg_id = mqtt5_get_id(msg_buf, msg_read_len); #endif @@ -1082,7 +1076,7 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) size_t msg_data_len = client->mqtt_state.in_buffer_read_len; char *msg_data = NULL; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property); #else @@ -1135,16 +1129,16 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_ { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); - outbox_message_t msg = { 0 }; - msg.data = client->mqtt_state.outbound_message->data; - msg.len = client->mqtt_state.outbound_message->length; - msg.msg_id = client->mqtt_state.pending_msg_id; - msg.msg_type = client->mqtt_state.pending_msg_type; - msg.msg_qos = client->mqtt_state.pending_publish_qos; - msg.remaining_data = remaining_data; - msg.remaining_len = remaining_len; - //Copy to queue buffer - return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); + outbox_message_t msg = { 0 }; + msg.data = client->mqtt_state.connection.outbound_message.data; + msg.len = client->mqtt_state.connection.outbound_message.length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; + msg.remaining_data = remaining_data; + msg.remaining_len = remaining_len; + //Copy to queue buffer + return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } @@ -1323,23 +1317,23 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_FAIL; } if (msg_qos == 1) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_puback(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_puback(&client->mqtt_state.connection, msg_id); } } else if (msg_qos == 2) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_pubrec(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_pubrec(&client->mqtt_state.connection, msg_id); } } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created"); return ESP_FAIL; } @@ -1355,7 +1349,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; case MQTT_MSG_TYPE_PUBACK: #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_decrement_packet_counter(client); } #endif @@ -1370,15 +1364,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; case MQTT_MSG_TYPE_PUBREC: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREC return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); - client->mqtt_state.outbound_message = mqtt5_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_pubrel(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_pubrel(&client->mqtt_state.connection, msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish response message PUBREL cannot be created"); return ESP_FAIL; } @@ -1388,15 +1382,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; case MQTT_MSG_TYPE_PUBREL: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREL return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len)); - client->mqtt_state.outbound_message = mqtt5_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + mqtt5_msg_pubcomp(&client->mqtt_state.connection, msg_id); #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_msg_pubcomp(&client->mqtt_state.connection, msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created"); return ESP_FAIL; } @@ -1406,7 +1400,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) case MQTT_MSG_TYPE_PUBCOMP: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { esp_mqtt5_decrement_packet_counter(client); } #endif @@ -1438,11 +1432,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item) { // decode queued data - client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id, + client->mqtt_state.connection.outbound_message.data = outbox_item_get_data(item, &client->mqtt_state.connection.outbound_message.length, &client->mqtt_state.pending_msg_id, &client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos); // set duplicate flag for QoS-1 and QoS-2 messages if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) { - mqtt_set_dup(client->mqtt_state.outbound_message->data); + mqtt_set_dup(client->mqtt_state.connection.outbound_message.data); ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id); } @@ -1580,7 +1574,7 @@ static void esp_mqtt_task(void *pv) break; } client->event.event_id = MQTT_EVENT_CONNECTED; - if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) { client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); } client->state = MQTT_STATE_CONNECTED; @@ -1733,19 +1727,19 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client) static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client) { // Notify the broker we are disconnecting - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_disconnect(&client->mqtt_state.mqtt_connection, &client->mqtt5_config->disconnect_property_info); - if (client->mqtt_state.outbound_message->length) { + mqtt5_msg_disconnect(&client->mqtt_state.connection, &client->mqtt5_config->disconnect_property_info); + if (client->mqtt_state.connection.outbound_message.length) { esp_mqtt5_client_delete_user_property(client->mqtt5_config->disconnect_property_info.user_property); client->mqtt5_config->disconnect_property_info.user_property = NULL; memset(&client->mqtt5_config->disconnect_property_info, 0, sizeof(esp_mqtt5_disconnect_property_config_t)); } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection); + mqtt_msg_disconnect(&client->mqtt_state.connection); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Disconnect message cannot be created"); return ESP_FAIL; } @@ -1793,8 +1787,8 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client) { - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - if (client->mqtt_state.outbound_message->length == 0) { + mqtt_msg_pingreq(&client->mqtt_state.connection); + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Ping message cannot be created"); return ESP_FAIL; } @@ -1820,7 +1814,7 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, MQTT_API_UNLOCK(client); return -1; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 int max_qos = topic_list[0].qos; for (int topic_number = 0; topic_number < size; ++topic_number) { @@ -1833,25 +1827,25 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, MQTT_API_UNLOCK(client); return -1; } - client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection, + mqtt5_msg_subscribe(&client->mqtt_state.connection, topic_list, size, &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); - if (client->mqtt_state.outbound_message->length) { + if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->subscribe_property_info = NULL; } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, + mqtt_msg_subscribe(&client->mqtt_state.connection, topic_list, size, &client->mqtt_state.pending_msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Subscribe message cannot be created"); MQTT_API_UNLOCK(client); return -1; } - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); //move pending msg to outbox (if have) if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); @@ -1888,28 +1882,28 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top ESP_LOGE(TAG, "Client has not connected"); return -1; } - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_unsubscribe(&client->mqtt_state.mqtt_connection, + mqtt5_msg_unsubscribe(&client->mqtt_state.connection, topic, &client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info); - if (client->mqtt_state.outbound_message->length) { + if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->unsubscribe_property_info = NULL; } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection, + mqtt_msg_unsubscribe(&client->mqtt_state.connection, topic, &client->mqtt_state.pending_msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { MQTT_API_UNLOCK(client); ESP_LOGE(TAG, "Unubscribe message cannot be created"); return -1; } ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id); - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); if (!mqtt_enqueue(client, NULL, 0)) { MQTT_API_UNLOCK(client); return -1; @@ -1927,48 +1921,54 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top return client->mqtt_state.pending_msg_id; } -static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data, - int len, int qos, int retain, bool store) +static int make_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, + int len, int qos, int retain) { uint16_t pending_msg_id = 0; - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - client->mqtt_state.outbound_message = mqtt5_msg_publish(&client->mqtt_state.mqtt_connection, + mqtt5_msg_publish(&client->mqtt_state.connection, topic, data, len, qos, retain, &pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info); - if (client->mqtt_state.outbound_message->length) { + if (client->mqtt_state.connection.outbound_message.length) { client->mqtt5_config->publish_property_info = NULL; } #endif } else { - client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, + mqtt_msg_publish(&client->mqtt_state.connection, topic, data, len, qos, retain, &pending_msg_id); } - if (client->mqtt_state.outbound_message->length == 0) { + if (client->mqtt_state.connection.outbound_message.length == 0) { ESP_LOGE(TAG, "Publish message cannot be created"); return -1; } + return pending_msg_id; +} +static inline int mqtt_client_enqueue_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, + int len, int qos, int retain, bool store) +{ + int pending_msg_id = make_publish(client, topic, data, len, qos, retain); /* We have to set as pending all the qos>0 messages */ //TODO: client->mqtt_state.outbound_message = publish_msg; if (qos > 0 || store) { - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer - if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { + if (client->mqtt_state.connection.outbound_message.fragmented_msg_total_length == 0) { if (!mqtt_enqueue(client, NULL, 0)) { return -1; } } else { - int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; + int first_fragment = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset; if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) { return -1; } - client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; + client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0; } } return pending_msg_id; @@ -1990,7 +1990,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, #endif #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) { ESP_LOGI(TAG, "MQTT5 publish check fail"); MQTT_API_UNLOCK(client); @@ -2008,7 +2008,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } - int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false); + int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false); if (pending_msg_id < 0) { MQTT_API_UNLOCK(client); return -1; @@ -2043,27 +2043,19 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, goto cannot_publish; } - int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; - client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0; - client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; + int data_sent = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset; + client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset = 0; + client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0; remaining_len -= data_sent; current_data += data_sent; if (remaining_len > 0) { - mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection; + mqtt_connection_t *connection = &client->mqtt_state.connection; ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len); - if (remaining_len > connection->buffer_length) { - // Continue with sending - memcpy(connection->buffer, current_data, connection->buffer_length); - connection->message.length = connection->buffer_length; - sending = true; - } else { - memcpy(connection->buffer, current_data, remaining_len); - connection->message.length = remaining_len; - sending = true; - } - connection->message.data = connection->buffer; - client->mqtt_state.outbound_message = &connection->message; + int write_len = remaining_len > connection->buffer_length ? connection->buffer_length : remaining_len; + memcpy(connection->buffer, current_data, write_len); + connection->outbound_message.length = write_len; + sending = true; } else { // Message was sent correctly sending = false; @@ -2085,7 +2077,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, cannot_publish: // clear out possible fragmented publish if failed or skipped - client->mqtt_state.outbound_message->fragmented_msg_total_length = 0; + client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0; if (qos == 0) { ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected"); } @@ -2112,7 +2104,7 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, MQTT_API_LOCK(client); #ifdef MQTT_PROTOCOL_5 - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) { ESP_LOGI(TAG, "esp_mqtt_client_enqueue check fail"); MQTT_API_UNLOCK(client); @@ -2120,7 +2112,7 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, } } #endif - int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store); + int ret = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, store); MQTT_API_UNLOCK(client); if (ret == 0 && store == false) { // messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error