Merge branch 'feature/configurable_max_outbox' into 'master'

Add outbox size control feature

See merge request espressif/esp-mqtt!141
This commit is contained in:
Rocha Euripedes
2023-06-14 01:02:34 +08:00
12 changed files with 547 additions and 504 deletions

View File

@ -11,7 +11,6 @@ list(APPEND EXTRA_COMPONENT_DIRS
"$ENV{IDF_PATH}/tools/mocks/lwip/"
"$ENV{IDF_PATH}/tools/mocks/esp-tls/"
"$ENV{IDF_PATH}/tools/mocks/http_parser/"
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/"
)
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/")
project(host_mqtt_client_test)

View File

@ -2,6 +2,13 @@ idf_component_register(SRCS "test_mqtt_client.cpp"
INCLUDE_DIRS "$ENV{IDF_PATH}/tools/catch"
REQUIRES cmock mqtt esp_timer esp_hw_support http_parser log)
target_compile_options(${COMPONENT_LIB} PUBLIC -fsanitize=address -fconcepts)
target_link_options(${COMPONENT_LIB} PUBLIC -fsanitize=address)
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
target_compile_options(${mqtt} PUBLIC -fsanitize=address -fconcepts)
target_link_options(${mqtt} PUBLIC -fsanitize=address)
if(CONFIG_GCOV_ENABLED)
target_compile_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
target_link_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)

View File

@ -3,10 +3,15 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <memory>
#include <random>
#include <string_view>
#include <type_traits>
#include "esp_transport.h"
#define CATCH_CONFIG_MAIN // This tells the catch header to generate a main
#include "catch.hpp"
#include "mqtt_client.h"
extern "C" {
#include "Mockesp_event.h"
#include "Mockesp_mac.h"
@ -30,99 +35,121 @@ extern "C" {
}
}
#include "mqtt_client.h"
struct ClientInitializedFixture {
esp_mqtt_client_handle_t client;
ClientInitializedFixture()
{
[[maybe_unused]] auto protect = TEST_PROTECT();
int mtx;
int transport_list;
int transport;
int event_group;
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
esp_timer_get_time_IgnoreAndReturn(0);
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
xQueueCreateMutex_ExpectAnyArgsAndReturn(
reinterpret_cast<QueueHandle_t>(&mtx));
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
http_parser_parse_url_IgnoreAndReturn(0);
http_parser_url_init_ExpectAnyArgs();
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
esp_read_mac_IgnoreAndReturn(ESP_OK);
esp_read_mac_ReturnThruPtr_mac(mac);
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
esp_transport_destroy_IgnoreAndReturn(ESP_OK);
vEventGroupDelete_Ignore();
vQueueDelete_Ignore();
esp_mqtt_client_config_t config{};
client = esp_mqtt_client_init(&config);
}
~ClientInitializedFixture()
{
esp_mqtt_client_destroy(client);
}
};
TEST_CASE_METHOD(ClientInitializedFixture, "Client set uri")
auto random_string(std::size_t n)
{
struct http_parser_url ret_uri = {
.field_set = 1,
.port = 0,
.field_data = { { 0, 1} }
};
SECTION("User set a correct URI") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client, " ");
REQUIRE(res == ESP_OK);
}
SECTION("Incorrect URI from user") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client, " ");
REQUIRE(res == ESP_FAIL);
}
static constexpr std::string_view char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ123456790";
std::string str;
std::sample(char_set.begin(), char_set.end(), std::back_inserter(str), n,
std::mt19937 {std::random_device{}()});
return str;
}
TEST_CASE_METHOD(ClientInitializedFixture, "Client Start")
using unique_mqtt_client = std::unique_ptr < std::remove_pointer_t<esp_mqtt_client_handle_t>, decltype([](esp_mqtt_client_handle_t client)
{
SECTION("Successful start") {
esp_mqtt_client_destroy(client);
}) >;
SCENARIO("MQTT Client Operation")
{
// [[maybe_unused]] auto protect = TEST_PROTECT();
// Set expectations for the mocked calls.
int mtx = 0;
int transport_list = 0;
int transport = 0;
int event_group = 0;
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
esp_timer_get_time_IgnoreAndReturn(0);
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
xQueueCreateMutex_ExpectAnyArgsAndReturn(
reinterpret_cast<QueueHandle_t>(&mtx));
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
http_parser_url_init_Ignore();
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
esp_read_mac_IgnoreAndReturn(ESP_OK);
esp_read_mac_ReturnThruPtr_mac(mac);
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
vEventGroupDelete_Ignore();
vQueueDelete_Ignore();
GIVEN("An a minimal config") {
esp_mqtt_client_config_t config{};
config.broker.address.uri = "mqtt://1.1.1.1";
struct http_parser_url ret_uri = {
.field_set = 1 | (1<<1),
.field_set = 1 | (1 << 1),
.port = 0,
.field_data = { { 0, 4 } /*mqtt*/, { 7, 1 } } // at least *scheme* and *host*
};
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdTRUE);
auto res = esp_mqtt_set_config(client, &config);
REQUIRE(res == ESP_OK);
res = esp_mqtt_client_start(client);
REQUIRE(res == ESP_OK);
}
SECTION("Failed on initialization") {
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
auto res = esp_mqtt_client_start(nullptr);
REQUIRE(res == ESP_ERR_INVALID_ARG);
}
SECTION("Client already started") {}
SECTION("Failed to start task") {
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
auto res = esp_mqtt_client_start(client);
REQUIRE(res == ESP_FAIL);
SECTION("Client with minimal config") {
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
REQUIRE(client != nullptr);
SECTION("User will set a new uri") {
struct http_parser_url ret_uri = {
.field_set = 1,
.port = 0,
.field_data = { { 0, 1} }
};
SECTION("User set a correct URI") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client.get(), " ");
REQUIRE(res == ESP_OK);
}
SECTION("Incorrect URI from user") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client.get(), " ");
REQUIRE(res == ESP_FAIL);
}
}
SECTION("After Start Client Is Cleanly destroyed") {
REQUIRE(esp_mqtt_client_start(client.get()) == ESP_OK);
// Only need to start the client, destroy is called automatically at the end of
// scope
}
}
SECTION("Client with all allocating configuration set") {
auto host = random_string(20);
auto path = random_string(10);
auto username = random_string(10);
auto client_id = random_string(10);
auto password = random_string(10);
auto lw_topic = random_string(10);
auto lw_msg = random_string(10);
config.broker = {.address = {
.hostname = host.data(),
.path = path.data()
}
};
config.credentials = {
.username = username.data(),
.client_id = client_id.data(),
.authentication = {
.password = password.data()
}
};
config.session = {
.last_will {
.topic = lw_topic.data(),
.msg = lw_msg.data()
}
};
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
REQUIRE(client != nullptr);
}
}
}

View File

@ -352,6 +352,9 @@ typedef struct esp_mqtt_client_config_t {
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
``buffer_size`` */
} buffer; /*!< Buffer size configuration.*/
struct outbox_config_t {
uint64_t limit; /*!< Size limit for the outbox in bytes.*/
} outbox;
} esp_mqtt_client_config_t;
/**
@ -430,7 +433,6 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
*/
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
#ifdef __cplusplus
#define esp_mqtt_client_subscribe esp_mqtt_client_subscribe_single
@ -449,6 +451,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
char *: esp_mqtt_client_subscribe_single, \
@ -473,6 +476,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
const char *topic, int qos);
@ -493,6 +497,7 @@ int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size);
@ -536,7 +541,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client,
* @param retain retain flag
*
* @return message_id of the publish message (for QoS 0 message_id will always
* be zero) on success. -1 on failure.
* be zero) on success. -1 on failure, -2 in case of full outbox.
*/
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
const char *data, int len, int qos, int retain);
@ -561,7 +566,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
* @param store if true, all messages are enqueued; otherwise only QoS 1 and
* QoS 2 are enqueued
*
* @return message_id if queued successfully, -1 otherwise
* @return message_id if queued successfully, -1 on failure, -2 in case of full outbox.
*/
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
const char *data, int len, int qos, int retain,

View File

@ -7,6 +7,7 @@
#ifndef _MQTT_CLIENT_PRIV_H_
#define _MQTT_CLIENT_PRIV_H_
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
@ -48,13 +49,10 @@ extern "C" {
typedef struct mqtt_state {
uint8_t *in_buffer;
uint8_t *out_buffer;
int in_buffer_length;
int out_buffer_length;
size_t message_length;
size_t in_buffer_read_len;
mqtt_message_t *outbound_message;
mqtt_connection_t mqtt_connection;
mqtt_connection_t connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
@ -91,6 +89,7 @@ typedef struct {
bool use_secure_element;
void *ds_data;
int message_retransmit_timeout;
uint64_t outbox_limit;
esp_transport_handle_t transport;
} mqtt_config_storage_t;
@ -106,7 +105,6 @@ struct esp_mqtt_client {
esp_transport_handle_t transport;
mqtt_config_storage_t *config;
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
mqtt_client_state_t state;
uint64_t refresh_connection_tick;
int64_t keepalive_tick;

View File

@ -68,16 +68,6 @@ typedef struct mqtt_message {
size_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
} mqtt_message_t;
typedef struct mqtt_connection {
mqtt_message_t message;
#if MQTT_MSG_ID_INCREMENTAL
uint16_t last_message_id; /*!< last used id if incremental message id configured */
#endif
uint8_t *buffer;
size_t buffer_length;
} mqtt_connection_t;
typedef struct mqtt_connect_info {
char *client_id;
char *username;
@ -90,9 +80,18 @@ typedef struct mqtt_connect_info {
int will_retain;
int clean_session;
esp_mqtt_protocol_ver_t protocol_ver;
} mqtt_connect_info_t;
typedef struct mqtt_connection {
mqtt_message_t outbound_message;
#if MQTT_MSG_ID_INCREMENTAL
uint16_t last_message_id; /*!< last used id if incremental message id configured */
#endif
uint8_t *buffer;
size_t buffer_length;
mqtt_connect_info_t information;
} mqtt_connection_t;
static inline int mqtt_get_type(const uint8_t *buffer)
{
@ -123,7 +122,6 @@ static inline int mqtt_get_retain(const uint8_t *buffer)
return (buffer[0] & 0x01);
}
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length);
bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length);
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len);
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length);
@ -132,6 +130,9 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length);
uint16_t mqtt_get_id(uint8_t *buffer, size_t length);
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length);
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size);
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info);
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id);
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id);
@ -143,8 +144,6 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection);
#ifdef __cplusplus
}
#endif

View File

@ -14,7 +14,7 @@ extern "C" {
struct outbox_item;
typedef struct outbox_list_t *outbox_handle_t;
typedef struct outbox_t *outbox_handle_t;
typedef struct outbox_item *outbox_item_handle_t;
typedef struct outbox_message *outbox_message_handle_t;
typedef long long outbox_tick_t;
@ -42,8 +42,6 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
/**
@ -56,7 +54,7 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
pending_state_t outbox_item_get_pending(outbox_item_handle_t item);
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
int outbox_get_size(outbox_handle_t outbox);
uint64_t outbox_get_size(outbox_handle_t outbox);
void outbox_destroy(outbox_handle_t outbox);
void outbox_delete_all_items(outbox_handle_t outbox);

View File

@ -69,8 +69,8 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
generate_variable_len(len, &len_bytes, encoded_lens);
int offset = len_bytes - 1;
connection->message.length += offset;
if (connection->message.length > connection->buffer_length) {
connection->outbound_message.length += offset;
if (connection->outbound_message.length > connection->buffer_length) {
return -1;
}
@ -89,33 +89,33 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
static int append_property(mqtt_connection_t *connection, uint8_t property_type, uint8_t len_occupy, const char *data, size_t data_len)
{
if ((connection->message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
if ((connection->outbound_message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
return -1;
}
size_t origin_message_len = connection->message.length;
size_t origin_message_len = connection->outbound_message.length;
if (property_type) {
connection->buffer[connection->message.length ++] = property_type;
connection->buffer[connection->outbound_message.length ++] = property_type;
}
if (len_occupy == 0) {
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
generate_variable_len(data_len, &len_bytes, encoded_lens);
for (int j = 0; j < len_bytes; j ++) {
connection->buffer[connection->message.length ++] = encoded_lens[j];
connection->buffer[connection->outbound_message.length ++] = encoded_lens[j];
}
} else {
for (int i = 1; i <= len_occupy; i ++) {
connection->buffer[connection->message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
connection->buffer[connection->outbound_message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
}
}
if (data) {
memcpy(connection->buffer + connection->message.length, data, data_len);
connection->message.length += data_len;
memcpy(connection->buffer + connection->outbound_message.length, data, data_len);
connection->outbound_message.length += data_len;
}
return connection->message.length - origin_message_len;
return connection->outbound_message.length - origin_message_len;
}
static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id)
@ -130,36 +130,36 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
#endif
}
if (connection->message.length + 2 > connection->buffer_length) {
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], message_id)
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], message_id)
return message_id;
}
static int init_message(mqtt_connection_t *connection)
{
connection->message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
connection->outbound_message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
return MQTT5_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
int message_length = connection->outbound_message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
}
// Encode MQTT message length
@ -171,10 +171,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT5_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs ++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
@ -182,7 +182,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
connection->buffer[offs ++] = encoded_lens[j];
}
return &connection->message;
return &connection->outbound_message;
}
static esp_err_t mqtt5_msg_set_user_property(mqtt5_user_property_handle_t *user_property, char *key, size_t key_len, char *value, size_t value_len)
@ -465,24 +465,24 @@ char *mqtt5_get_puback_data(uint8_t *buffer, size_t *length, mqtt5_user_property
mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info, esp_mqtt5_connection_property_storage_t *property, esp_mqtt5_connection_will_property_storage_t *will_property)
{
init_message(connection);
connection->buffer[connection->message.length ++] = 0; // Variable header length MSB
connection->buffer[connection->outbound_message.length ++] = 0; // Variable header length MSB
/* Defaults to protocol version 5 values */
connection->buffer[connection->message.length ++] = 4; // Variable header length LSB
memcpy(&connection->buffer[connection->message.length], "MQTT", 4); // Protocol name
connection->message.length += 4;
connection->buffer[connection->message.length ++] = 5; // Protocol version
connection->buffer[connection->outbound_message.length ++] = 4; // Variable header length LSB
memcpy(&connection->buffer[connection->outbound_message.length], "MQTT", 4); // Protocol name
connection->outbound_message.length += 4;
connection->buffer[connection->outbound_message.length ++] = 5; // Protocol version
int flags_offset = connection->message.length;
connection->buffer[connection->message.length ++] = 0; // Flags
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], info->keepalive) // Keep-alive
int flags_offset = connection->outbound_message.length;
connection->buffer[connection->outbound_message.length ++] = 0; // Flags
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], info->keepalive) // Keep-alive
if (info->clean_session) {
connection->buffer[flags_offset] |= MQTT5_CONNECT_FLAG_CLEAN_SESSION;
}
//Add properties
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property->session_expiry_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, property->session_expiry_interval), fail_message(connection));
}
@ -508,7 +508,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (info->client_id != NULL && info->client_id[0] != '\0') {
APPEND_CHECK(append_property(connection, 0, 2, info->client_id, strlen(info->client_id)), fail_message(connection));
@ -518,8 +518,8 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
//Add will properties
if (info->will_topic != NULL && info->will_topic[0] != '\0') {
properties_offset = connection->message.length;
connection->message.length ++;
properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (will_property->will_delay_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_WILL_DELAY_INTERVAL, 4, NULL, will_property->will_delay_interval), fail_message(connection));
}
@ -545,7 +545,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(append_property(connection, 0, 2, info->will_topic, strlen(info->will_topic)), fail_message(connection));
APPEND_CHECK(append_property(connection, 0, 2, info->will_message, info->will_length), fail_message(connection));
@ -742,8 +742,8 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
*message_id = 0;
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->payload_format_indicator) {
@ -788,20 +788,20 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_CONTENT_TYPE, 2, property->content_type, strlen(property->content_type)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (connection->message.length + data_length > connection->buffer_length) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
} else {
if (data != NULL) {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
}
connection->message.fragmented_msg_total_length = 0;
connection->outbound_message.fragmented_msg_total_length = 0;
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
@ -858,8 +858,8 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
return fail_message(connection);
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->subscribe_id) {
@ -873,7 +873,7 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
}
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
@ -897,23 +897,23 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
}
if (connection->message.length + 1 > connection->buffer_length) {
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->message.length] = 0;
connection->buffer[connection->outbound_message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
connection->buffer[connection->outbound_message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
connection->buffer[connection->outbound_message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
connection->buffer[connection->outbound_message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3);
connection->message.length ++;
connection->buffer[connection->outbound_message.length] |= (topic_list[topic_number].qos & 3);
connection->outbound_message.length ++;
}
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
@ -921,10 +921,10 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info)
{
init_message(connection);
int reason_offset = connection->message.length;
connection->buffer[connection->message.length ++] = 0;
int properties_offset = connection->message.length;
connection->message.length ++;
int reason_offset = connection->outbound_message.length;
connection->buffer[connection->outbound_message.length ++] = 0;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (disconnect_property_info) {
if (disconnect_property_info->session_expiry_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, disconnect_property_info->session_expiry_interval), fail_message(connection));
@ -940,7 +940,7 @@ mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_di
connection->buffer[reason_offset] = disconnect_property_info->disconnect_reason;
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
@ -956,8 +956,8 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
return fail_message(connection);
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->user_property) {
mqtt5_user_property_item_t item;
@ -968,7 +968,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
char *shared_topic = calloc(1, shared_topic_size);
@ -996,10 +996,10 @@ mqtt_message_t *mqtt5_msg_puback(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}
@ -1009,10 +1009,10 @@ mqtt_message_t *mqtt5_msg_pubrec(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}
@ -1022,10 +1022,10 @@ mqtt_message_t *mqtt5_msg_pubrel(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}
@ -1035,9 +1035,9 @@ mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t messag
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}

View File

@ -48,14 +48,14 @@ enum mqtt_connect_flag {
static int append_string(mqtt_connection_t *connection, const char *string, int len)
{
if (connection->message.length + len + 2 > connection->buffer_length) {
if (connection->outbound_message.length + len + 2 > connection->buffer_length) {
return -1;
}
connection->buffer[connection->message.length++] = len >> 8;
connection->buffer[connection->message.length++] = len & 0xff;
memcpy(connection->buffer + connection->message.length, string, len);
connection->message.length += len;
connection->buffer[connection->outbound_message.length++] = len >> 8;
connection->buffer[connection->outbound_message.length++] = len & 0xff;
memcpy(connection->buffer + connection->outbound_message.length, string, len);
connection->outbound_message.length += len;
return len + 2;
}
@ -72,38 +72,38 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
#endif
}
if (connection->message.length + 2 > connection->buffer_length) {
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}
connection->buffer[connection->message.length++] = message_id >> 8;
connection->buffer[connection->message.length++] = message_id & 0xff;
connection->buffer[connection->outbound_message.length++] = message_id >> 8;
connection->buffer[connection->outbound_message.length++] = message_id & 0xff;
return message_id;
}
static int init_message(mqtt_connection_t *connection)
static int set_message_header_size(mqtt_connection_t *connection)
{
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
int encoded_length = 0;
uint8_t encoded_lens[4] = {0};
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
}
// Encode MQTT message length
@ -124,10 +124,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
@ -135,14 +135,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
connection->buffer[offs++] = encoded_lens[j];
}
return &connection->message;
}
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length)
{
memset(connection, 0, sizeof(mqtt_connection_t));
connection->buffer = buffer;
connection->buffer_length = buffer_length;
return &connection->outbound_message;
}
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len)
@ -347,7 +340,7 @@ uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
{
init_message(connection);
set_message_header_size(connection);
int header_len;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
@ -356,11 +349,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE;
}
if (connection->message.length + header_len > connection->buffer_length) {
if (connection->outbound_message.length + header_len > connection->buffer_length) {
return fail_message(connection);
}
char *variable_header = (char *)(connection->buffer + connection->message.length);
connection->message.length += header_len;
char *variable_header = (char *)(connection->buffer + connection->outbound_message.length);
connection->outbound_message.length += header_len;
int header_idx = 0;
variable_header[header_idx++] = 0; // Variable header length MSB
@ -445,7 +438,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
@ -468,16 +461,16 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
}
if (data != NULL) {
if (connection->message.length + data_length > connection->buffer_length) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
} else {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
connection->message.fragmented_msg_total_length = 0;
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
connection->outbound_message.fragmented_msg_total_length = 0;
}
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
@ -485,7 +478,7 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -494,7 +487,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -503,7 +496,7 @@ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -512,7 +505,7 @@ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -521,7 +514,7 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
@ -536,11 +529,11 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt
return fail_message(connection);
}
if (connection->message.length + 1 > connection->buffer_length) {
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->message.length] = topic_list[topic_number].qos;
connection->message.length ++;
connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos;
connection->outbound_message.length ++;
}
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
@ -548,7 +541,7 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
@ -567,19 +560,19 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
@ -622,3 +615,23 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length)
return 0;
}
}
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size)
{
memset(connection, 0, sizeof(mqtt_connection_t));
connection->buffer = (uint8_t *)calloc(0, buffer_size);
if (!connection->buffer) {
return ESP_ERR_NO_MEM;
}
connection->buffer_length = buffer_size;
return ESP_OK;
}
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection)
{
if (connection) {
free(connection->buffer);
}
}

View File

@ -1,4 +1,5 @@
#include "mqtt_outbox.h"
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "mqtt_config.h"
@ -22,12 +23,19 @@ typedef struct outbox_item {
STAILQ_HEAD(outbox_list_t, outbox_item);
struct outbox_t {
uint64_t size;
struct outbox_list_t *list;
};
outbox_handle_t outbox_init(void)
{
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t));
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t));
ESP_MEM_CHECK(TAG, outbox, return NULL);
STAILQ_INIT(outbox);
outbox->list = calloc(1, sizeof(struct outbox_list_t));
ESP_MEM_CHECK(TAG, outbox->list, return NULL); //TODO: Free outbox on failure
outbox->size = 0;
STAILQ_INIT(outbox->list);
return outbox;
}
@ -50,7 +58,8 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
if (message->remaining_data) {
memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
}
STAILQ_INSERT_TAIL(outbox, item, next);
STAILQ_INSERT_TAIL(outbox->list, item, next);
outbox->size += item->len;
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
return item;
}
@ -58,7 +67,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item->msg_id == msg_id) {
return item;
}
@ -69,7 +78,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item->pending == pending) {
if (tick) {
*tick = item->tick;
@ -83,9 +92,10 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item == item_to_delete) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
return ESP_OK;
@ -109,9 +119,10 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox));
@ -121,19 +132,7 @@ esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
}
return ESP_FAIL;
}
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_id == msg_id) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
}
}
return ESP_OK;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
@ -162,27 +161,15 @@ esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick
return ESP_FAIL;
}
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_type == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
}
}
return ESP_OK;
}
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
int msg_id = -1;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
msg_id = item->msg_id;
free(item);
return msg_id;
@ -196,10 +183,11 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
{
int deleted_items = 0;
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
free(item);
deleted_items ++;
}
@ -208,23 +196,17 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
return deleted_items;
}
int outbox_get_size(outbox_handle_t outbox)
uint64_t outbox_get_size(outbox_handle_t outbox)
{
int siz = 0;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
// Suppressing "use after free" warning as this could happen only if queue is in inconsistent state
// which never happens if STAILQ interface used
siz += item->len; // NOLINT(clang-analyzer-unix.Malloc)
}
return siz;
return outbox->size;
}
void outbox_delete_all_items(outbox_handle_t outbox)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
}
@ -232,6 +214,7 @@ void outbox_delete_all_items(outbox_handle_t outbox)
void outbox_destroy(outbox_handle_t outbox)
{
outbox_delete_all_items(outbox);
free(outbox->list);
free(outbox);
}

View File

@ -18,7 +18,7 @@ static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
{
bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data);
bool msg_dup = mqtt5_get_dup(client->mqtt_state.connection.outbound_message.data);
if (msg_dup == false) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
@ -35,7 +35,7 @@ void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBCOMP return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_pubcomp_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -47,7 +47,7 @@ void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_puback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -59,7 +59,7 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_UNSUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_unsuback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -71,7 +71,7 @@ void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_suback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_SUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
}
}
@ -81,13 +81,14 @@ esp_err_t esp_mqtt5_parse_connack(esp_mqtt5_client_handle_t client, int *connect
size_t len = client->mqtt_state.in_buffer_read_len;
client->mqtt_state.in_buffer_read_len = 0;
uint8_t ack_flag = 0;
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->mqtt_state.
connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
ESP_LOGE(TAG, "Failed to parse CONNACK packet");
return ESP_FAIL;
}
if (*connect_rsp_code == MQTT_CONNECTION_ACCEPTED) {
ESP_LOGD(TAG, "Connected");
client->event.session_present = ack_flag & 0x01;
client->event.session_present = ack_flag & 0x01;
return ESP_OK;
}
esp_mqtt5_print_error_code(client, *connect_rsp_code);
@ -116,7 +117,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
*msg_topic = esp_mqtt5_client_get_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, msg_topic_len);
if (!*msg_topic) {
ESP_LOGE(TAG, "%s: esp_mqtt5_client_get_topic_alias() failed", __func__);
return ESP_FAIL;
return ESP_FAIL;
}
} else {
if (esp_mqtt5_client_update_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, *msg_topic, *msg_topic_len) != ESP_OK) {
@ -139,7 +140,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
esp_err_t esp_mqtt5_create_default_config(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
client->event.property = calloc(1, sizeof(esp_mqtt5_event_property_t));
ESP_MEM_CHECK(TAG, client->event.property, return ESP_FAIL)
client->mqtt5_config = calloc(1, sizeof(mqtt5_config_storage_t));
@ -304,7 +305,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
void esp_mqtt5_client_destory(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt5_config) {
free(client->mqtt5_config->will_property_info.content_type);
free(client->mqtt5_config->will_property_info.response_topic);
@ -416,7 +417,7 @@ esp_err_t esp_mqtt5_client_set_publish_property(esp_mqtt5_client_handle_t client
MQTT_API_LOCK(client);
/* Check protocol version */
if(client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -445,7 +446,7 @@ esp_err_t esp_mqtt5_client_set_subscribe_property(esp_mqtt5_client_handle_t clie
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -482,7 +483,7 @@ esp_err_t esp_mqtt5_client_set_unsubscribe_property(esp_mqtt5_client_handle_t cl
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -513,7 +514,7 @@ esp_err_t esp_mqtt5_client_set_disconnect_property(esp_mqtt5_client_handle_t cli
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -556,7 +557,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -572,7 +573,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
return ESP_FAIL;
} else {
client->mqtt5_config->connect_property_info.maximum_packet_size = connect_property->maximum_packet_size;
}
}
} else {
client->mqtt5_config->connect_property_info.maximum_packet_size = client->mqtt_state.in_buffer_length;
}
@ -757,4 +758,4 @@ void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_pro
}
}
free(user_property);
}
}

View File

@ -1,10 +1,12 @@
#include "mqtt_client.h"
#include "esp_transport.h"
#include "mqtt_client_priv.h"
#include "esp_log.h"
#include <stdint.h>
#include "esp_err.h"
#include "esp_log.h"
#include "esp_heap_caps.h"
#include "esp_transport.h"
#include "mqtt_client.h"
#include "mqtt_client_priv.h"
#include "mqtt_msg.h"
#include "mqtt_outbox.h"
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
#ifdef ESP_EVENT_ANY_ID
@ -396,70 +398,70 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
err = ESP_ERR_NO_MEM;
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.hostname, &client->config->host), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.path, &client->config->path), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->connect_info.username), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->connect_info.password), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->mqtt_state.connection.information.username), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->mqtt_state.connection.information.password), goto _mqtt_set_config_failed);
if (!config->credentials.set_null_client_id) {
if (config->credentials.client_id) {
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->connect_info.client_id), goto _mqtt_set_config_failed);
} else if (client->connect_info.client_id == NULL) {
client->connect_info.client_id = platform_create_id_string();
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->mqtt_state.connection.information.client_id), goto _mqtt_set_config_failed);
} else if (client->mqtt_state.connection.information.client_id == NULL) {
client->mqtt_state.connection.information.client_id = platform_create_id_string();
}
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id);
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.client_id, goto _mqtt_set_config_failed);
ESP_LOGD(TAG, "MQTT client_id=%s", client->mqtt_state.connection.information.client_id);
}
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.uri, &client->config->uri), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->connect_info.will_topic), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->mqtt_state.connection.information.will_topic), goto _mqtt_set_config_failed);
if (config->session.last_will.msg_len && config->session.last_will.msg) {
free(client->connect_info.will_message);
client->connect_info.will_message = malloc(config->session.last_will.msg_len);
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
memcpy(client->connect_info.will_message, config->session.last_will.msg, config->session.last_will.msg_len);
client->connect_info.will_length = config->session.last_will.msg_len;
free(client->mqtt_state.connection.information.will_message);
client->mqtt_state.connection.information.will_message = malloc(config->session.last_will.msg_len);
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
memcpy(client->mqtt_state.connection.information.will_message, config->session.last_will.msg, config->session.last_will.msg_len);
client->mqtt_state.connection.information.will_length = config->session.last_will.msg_len;
} else if (config->session.last_will.msg) {
free(client->connect_info.will_message);
client->connect_info.will_message = strdup(config->session.last_will.msg);
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
client->connect_info.will_length = strlen(config->session.last_will.msg);
free(client->mqtt_state.connection.information.will_message);
client->mqtt_state.connection.information.will_message = strdup(config->session.last_will.msg);
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
client->mqtt_state.connection.information.will_length = strlen(config->session.last_will.msg);
}
if (config->session.last_will.qos) {
client->connect_info.will_qos = config->session.last_will.qos;
client->mqtt_state.connection.information.will_qos = config->session.last_will.qos;
}
if (config->session.last_will.retain) {
client->connect_info.will_retain = config->session.last_will.retain;
client->mqtt_state.connection.information.will_retain = config->session.last_will.retain;
}
if (config->session.disable_clean_session == client->connect_info.clean_session) {
client->connect_info.clean_session = !config->session.disable_clean_session;
if (!client->connect_info.clean_session && config->credentials.set_null_client_id) {
if (config->session.disable_clean_session == client->mqtt_state.connection.information.clean_session) {
client->mqtt_state.connection.information.clean_session = !config->session.disable_clean_session;
if (!client->mqtt_state.connection.information.clean_session && config->credentials.set_null_client_id) {
ESP_LOGE(TAG, "Clean Session flag must be true if client has a null id");
}
}
if (config->session.keepalive) {
client->connect_info.keepalive = config->session.keepalive;
client->mqtt_state.connection.information.keepalive = config->session.keepalive;
}
if (client->connect_info.keepalive == 0) {
client->connect_info.keepalive = MQTT_KEEPALIVE_TICK;
if (client->mqtt_state.connection.information.keepalive == 0) {
client->mqtt_state.connection.information.keepalive = MQTT_KEEPALIVE_TICK;
}
if (config->session.disable_keepalive) {
// internal `keepalive` value (in connect_info) is in line with 3.1.2.10 Keep Alive from mqtt spec:
// * keepalive=0: Keep alive mechanism disabled (server not to disconnect the client on its inactivity)
// * period in seconds to send a Control packet if inactive
client->connect_info.keepalive = 0;
client->mqtt_state.connection.information.keepalive = 0;
}
if (config->session.protocol_ver) {
client->connect_info.protocol_ver = config->session.protocol_ver;
client->mqtt_state.connection.information.protocol_ver = config->session.protocol_ver;
}
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_UNDEFINED) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_UNDEFINED) {
#ifdef MQTT_PROTOCOL_311
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
#else
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1;
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1;
#endif
} else if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
} else if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifndef MQTT_PROTOCOL_5
ESP_LOGE(TAG, "Please first enable MQTT_PROTOCOL_5 feature in menuconfig");
goto _mqtt_set_config_failed;
@ -485,9 +487,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
} else {
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
}
if (config->network.transport) {
client->config->transport = config->network.transport;
}
if (config->broker.verification.alpn_protos) {
for (int i = 0; i < client->config->num_alpn_protos; i++) {
@ -567,6 +566,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
goto _mqtt_set_config_failed;
}
}
client->config->outbox_limit = config->outbox.limit;
esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict(client->config, config);
MQTT_API_UNLOCK(client);
@ -592,15 +592,15 @@ void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
}
free(client->config->alpn_protos);
free(client->config->clientkey_password);
free(client->connect_info.will_topic);
free(client->connect_info.will_message);
free(client->connect_info.client_id);
free(client->connect_info.username);
free(client->connect_info.password);
free(client->mqtt_state.connection.information.will_topic);
free(client->mqtt_state.connection.information.will_message);
free(client->mqtt_state.connection.information.client_id);
free(client->mqtt_state.connection.information.username);
free(client->mqtt_state.connection.information.password);
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_destory(client);
#endif
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
memset(&client->mqtt_state.connection.information, 0, sizeof(mqtt_connect_info_t));
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if (client->config->event_loop_handle) {
esp_event_loop_delete(client->config->event_loop_handle);
@ -620,8 +620,8 @@ static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout)
static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
{
if (client->connect_info.keepalive > 0) {
const uint64_t keepalive_ms = client->connect_info.keepalive * 1000;
if (client->mqtt_state.connection.information.keepalive > 0) {
const uint64_t keepalive_ms = client->mqtt_state.connection.information.keepalive * 1000;
if (client->wait_for_ping_resp == true ) {
if (has_timed_out(client->keepalive_tick, keepalive_ms)) {
@ -648,10 +648,10 @@ static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
{
int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length;
int wlen = 0, widx = 0, len = client->mqtt_state.connection.outbound_message.length;
while (len > 0) {
wlen = esp_transport_write(client->transport,
(char *)client->mqtt_state.outbound_message->data + widx,
(char *)client->mqtt_state.connection.outbound_message.data + widx,
len,
client->config->network_timeout_ms);
if (wlen < 0) {
@ -674,29 +674,29 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
{
int read_len, connect_rsp_code = 0;
client->wait_for_ping_resp = false;
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.outbound_message = mqtt5_msg_connect(&client->mqtt_state.mqtt_connection,
&client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);
mqtt5_msg_connect(&client->mqtt_state.connection,
&client->mqtt_state.connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
&client->connect_info);
mqtt_msg_connect(&client->mqtt_state.connection,
&client->mqtt_state.connection.information);
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Connect message cannot be created");
return ESP_FAIL;
}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.connection.outbound_message.data,
client->mqtt_state.connection.outbound_message.length);
#endif
} else {
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.connection.outbound_message.data,
client->mqtt_state.connection.outbound_message.length);
}
ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X",
client->mqtt_state.pending_msg_type,
@ -724,7 +724,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len);
return ESP_FAIL;
}
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
client->send_publish_packet_count = 0;
@ -807,6 +807,24 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
if (!create_client_data(client)) {
goto _mqtt_init_failed;
}
int buffer_size = config->buffer.size;
if (buffer_size <= 0) {
buffer_size = MQTT_BUFFER_SIZE_BYTE;
}
// use separate value for output buffer size if configured
int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size;
if (mqtt_msg_buffer_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) {
goto _mqtt_init_failed;
}
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed);
client->mqtt_state.in_buffer_length = buffer_size;
client->outbox = outbox_init();
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
if (esp_mqtt_set_config(client, config) != ESP_OK) {
goto _mqtt_init_failed;
@ -826,27 +844,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
client->reconnect_tick = platform_tick_get_ms();
client->refresh_connection_tick = platform_tick_get_ms();
client->wait_for_ping_resp = false;
int buffer_size = config->buffer.size;
if (buffer_size <= 0) {
buffer_size = MQTT_BUFFER_SIZE_BYTE;
}
// use separate value for output buffer size if configured
int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size;
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed);
client->mqtt_state.in_buffer_length = buffer_size;
client->mqtt_state.out_buffer = (uint8_t *)malloc(out_buffer_size);
ESP_MEM_CHECK(TAG, client->mqtt_state.out_buffer, goto _mqtt_init_failed);
client->mqtt_state.out_buffer_length = out_buffer_size;
client->outbox = outbox_init();
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer,
client->mqtt_state.out_buffer_length);
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_create_default_config(client) != ESP_OK) {
goto _mqtt_init_failed;
@ -877,7 +874,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
vEventGroupDelete(client->status_bits);
}
free(client->mqtt_state.in_buffer);
free(client->mqtt_state.out_buffer);
mqtt_msg_buffer_destroy(&client->mqtt_state.connection);
if (client->api_lock) {
vSemaphoreDelete(client->api_lock);
}
@ -944,9 +941,9 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
if (pass) {
pass[0] = 0; //terminal username
pass ++;
client->connect_info.password = strdup(pass);
client->mqtt_state.connection.information.password = strdup(pass);
}
client->connect_info.username = strdup(user_info);
client->mqtt_state.connection.information.username = strdup(user_info);
free(user_info);
}
@ -958,7 +955,7 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->event.msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
#endif
@ -982,16 +979,16 @@ esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mq
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
client->event.client = client;
client->event.protocol_ver = client->connect_info.protocol_ver;
client->event.protocol_ver = client->mqtt_state.connection.information.protocol_ver;
esp_err_t ret = ESP_FAIL;
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
ret = esp_event_loop_run(client->config->event_loop_handle, 0);
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
ret = esp_event_loop_run(client->config->event_loop_handle, 0);
#else
return ESP_FAIL;
return ESP_FAIL;
#endif
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_delete_user_property(client->event.property->user_property);
client->event.property->user_property = NULL;
@ -1009,7 +1006,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
size_t msg_data_offset = 0;
char *msg_topic = NULL, *msg_data = NULL;
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_get_publish_data(client, msg_buf, msg_read_len, &msg_topic, &msg_topic_len, &msg_data, &msg_data_len) != ESP_OK) {
ESP_LOGE(TAG, "%s: esp_mqtt5_get_publish_data() failed", __func__);
@ -1034,7 +1031,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
}
// post data event
client->event.retain = mqtt_get_retain(msg_buf);
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->event.msg_id = mqtt5_get_id(msg_buf, msg_read_len);
#endif
@ -1082,7 +1079,7 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
char *msg_data = NULL;
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property);
#else
@ -1135,16 +1132,16 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.connection.outbound_message.data;
msg.len = client->mqtt_state.connection.outbound_message.length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
@ -1283,7 +1280,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, read_len);
#endif
@ -1323,23 +1320,23 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
if (msg_qos == 1) {
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.outbound_message = mqtt5_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
mqtt5_msg_puback(&client->mqtt_state.connection, msg_id);
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_msg_puback(&client->mqtt_state.connection, msg_id);
}
} else if (msg_qos == 2) {
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.outbound_message = mqtt5_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
mqtt5_msg_pubrec(&client->mqtt_state.connection, msg_id);
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_msg_pubrec(&client->mqtt_state.connection, msg_id);
}
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created");
return ESP_FAIL;
}
@ -1355,7 +1352,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
break;
case MQTT_MSG_TYPE_PUBACK:
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
@ -1370,15 +1367,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREC return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
client->mqtt_state.outbound_message = mqtt5_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
mqtt5_msg_pubrel(&client->mqtt_state.connection, msg_id);
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_msg_pubrel(&client->mqtt_state.connection, msg_id);
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBREL cannot be created");
return ESP_FAIL;
}
@ -1388,15 +1385,15 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREL return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
client->mqtt_state.outbound_message = mqtt5_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt5_msg_pubcomp(&client->mqtt_state.connection, msg_id);
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_msg_pubcomp(&client->mqtt_state.connection, msg_id);
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created");
return ESP_FAIL;
}
@ -1406,7 +1403,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
@ -1438,11 +1435,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
// decode queued data
client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id,
client->mqtt_state.connection.outbound_message.data = outbox_item_get_data(item, &client->mqtt_state.connection.outbound_message.length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
// set duplicate flag for QoS-1 and QoS-2 messages
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) {
mqtt_set_dup(client->mqtt_state.outbound_message->data);
mqtt_set_dup(client->mqtt_state.connection.outbound_message.data);
ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id);
}
@ -1462,7 +1459,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
}
} else if (client->mqtt_state.pending_publish_qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
@ -1580,7 +1577,7 @@ static void esp_mqtt_task(void *pv)
break;
}
client->event.event_id = MQTT_EVENT_CONNECTED;
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
}
client->state = MQTT_STATE_CONNECTED;
@ -1733,19 +1730,19 @@ esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
{
// Notify the broker we are disconnecting
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.outbound_message = mqtt5_msg_disconnect(&client->mqtt_state.mqtt_connection, &client->mqtt5_config->disconnect_property_info);
if (client->mqtt_state.outbound_message->length) {
mqtt5_msg_disconnect(&client->mqtt_state.connection, &client->mqtt5_config->disconnect_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
esp_mqtt5_client_delete_user_property(client->mqtt5_config->disconnect_property_info.user_property);
client->mqtt5_config->disconnect_property_info.user_property = NULL;
memset(&client->mqtt5_config->disconnect_property_info, 0, sizeof(esp_mqtt5_disconnect_property_config_t));
}
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
mqtt_msg_disconnect(&client->mqtt_state.connection);
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Disconnect message cannot be created");
return ESP_FAIL;
}
@ -1793,8 +1790,8 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
{
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
if (client->mqtt_state.outbound_message->length == 0) {
mqtt_msg_pingreq(&client->mqtt_state.connection);
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Ping message cannot be created");
return ESP_FAIL;
}
@ -1814,13 +1811,17 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
if (client->config->outbox_limit > 0 && outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
MQTT_API_UNLOCK(client);
return -1;
}
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
int max_qos = topic_list[0].qos;
for (int topic_number = 0; topic_number < size; ++topic_number) {
@ -1833,25 +1834,25 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
MQTT_API_UNLOCK(client);
return -1;
}
client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
if (client->mqtt_state.outbound_message->length) {
mqtt5_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->subscribe_property_info = NULL;
}
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic_list, size,
&client->mqtt_state.pending_msg_id);
mqtt_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id);
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Subscribe message cannot be created");
MQTT_API_UNLOCK(client);
return -1;
}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
//move pending msg to outbox (if have)
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
@ -1888,28 +1889,28 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.outbound_message = mqtt5_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
if (client->mqtt_state.outbound_message->length) {
mqtt5_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->unsubscribe_property_info = NULL;
}
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id);
mqtt_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id);
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Unubscribe message cannot be created");
return -1;
}
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
return -1;
@ -1927,48 +1928,54 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
return client->mqtt_state.pending_msg_id;
}
static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain, bool store)
static int make_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain)
{
uint16_t pending_msg_id = 0;
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.outbound_message = mqtt5_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len,
qos, retain,
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
if (client->mqtt_state.outbound_message->length) {
mqtt5_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->publish_property_info = NULL;
}
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len,
qos, retain,
&pending_msg_id);
mqtt_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id);
}
if (client->mqtt_state.outbound_message->length == 0) {
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish message cannot be created");
return -1;
}
return pending_msg_id;
}
static inline int mqtt_client_enqueue_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain, bool store)
{
int pending_msg_id = make_publish(client, topic, data, len, qos, retain);
/* We have to set as pending all the qos>0 messages */
//TODO: client->mqtt_state.outbound_message = publish_msg;
if (qos > 0 || store) {
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
if (client->mqtt_state.connection.outbound_message.fragmented_msg_total_length == 0) {
if (!mqtt_enqueue(client, NULL, 0)) {
return -1;
}
} else {
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
int first_fragment = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
return -1;
}
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
}
}
return pending_msg_id;
@ -1990,7 +1997,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
#endif
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 publish check fail");
MQTT_API_UNLOCK(client);
@ -2008,7 +2015,14 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
len = strlen(data);
}
int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false);
if (client->config->outbox_limit > 0 && qos > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
MQTT_API_UNLOCK(client);
return -2;
}
}
int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false);
if (pending_msg_id < 0) {
MQTT_API_UNLOCK(client);
return -1;
@ -2043,27 +2057,19 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
goto cannot_publish;
}
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0;
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
int data_sent = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset = 0;
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
remaining_len -= data_sent;
current_data += data_sent;
if (remaining_len > 0) {
mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection;
mqtt_connection_t *connection = &client->mqtt_state.connection;
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
if (remaining_len > connection->buffer_length) {
// Continue with sending
memcpy(connection->buffer, current_data, connection->buffer_length);
connection->message.length = connection->buffer_length;
sending = true;
} else {
memcpy(connection->buffer, current_data, remaining_len);
connection->message.length = remaining_len;
sending = true;
}
connection->message.data = connection->buffer;
client->mqtt_state.outbound_message = &connection->message;
int write_len = remaining_len > connection->buffer_length ? connection->buffer_length : remaining_len;
memcpy(connection->buffer, current_data, write_len);
connection->outbound_message.length = write_len;
sending = true;
} else {
// Message was sent correctly
sending = false;
@ -2072,7 +2078,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
if (qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
@ -2085,7 +2091,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
cannot_publish:
// clear out possible fragmented publish if failed or skipped
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
if (qos == 0) {
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
}
@ -2110,9 +2116,16 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
len = strlen(data);
}
if (client->config->outbox_limit > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}
}
MQTT_API_LOCK(client);
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
ESP_LOGI(TAG, "esp_mqtt_client_enqueue check fail");
MQTT_API_UNLOCK(client);
@ -2120,7 +2133,7 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
}
}
#endif
int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store);
int ret = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, store);
MQTT_API_UNLOCK(client);
if (ret == 0 && store == false) {
// messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error