forked from espressif/esp-mqtt
Compare commits
67 Commits
revert-258
...
fix_host_t
Author | SHA1 | Date | |
---|---|---|---|
b43d93c82d | |||
aa6f889fb4 | |||
1edd167dcb | |||
c06f1540fe | |||
37478a9c00 | |||
fd21b27a83 | |||
e7b9aa5e6a | |||
726e5f2fce | |||
14f5fa079f | |||
5e3abd4b8c | |||
37cb056c5f | |||
74481cbbf4 | |||
c33a0c8e44 | |||
5c17fc4a1a | |||
657a2aea77 | |||
891380bdf5 | |||
fa88da5282 | |||
5a35782272 | |||
acdb66d5c6 | |||
d3e3c4e7ad | |||
371f594cce | |||
ddde502782 | |||
7894dd0ace | |||
ea0df31e00 | |||
abd8b6cadc | |||
67800569de | |||
e6afdb4025 | |||
c0b40b1293 | |||
dc93367bd5 | |||
00ee059bf8 | |||
05b347643f | |||
f35aaa1577 | |||
4b28192d00 | |||
c355e0b5ae | |||
301bd9e028 | |||
cc41d1b852 | |||
1ca73479cb | |||
90b4a4538e | |||
dc775bb52e | |||
fe32d8f224 | |||
cb1e6cf218 | |||
cd81773bd1 | |||
395aa141c8 | |||
8d98103013 | |||
a3b04f2d0a | |||
585e3ba2e0 | |||
36eec6f625 | |||
effd1e6705 | |||
6c849c62ef | |||
ee3ea29d52 | |||
4050df4caf | |||
aee82c7ba8 | |||
5896e259ad | |||
363fbf7dab | |||
5d491a45ce | |||
63cfec799c | |||
372ab7b374 | |||
21a5491d53 | |||
122875bf8a | |||
ed628098a1 | |||
6438676b66 | |||
a492935951 | |||
a89af4bf8d | |||
6195762d28 | |||
a5c1b441dc | |||
ffd7d4df6c | |||
4c2ed1676f |
2
.github/workflows/mqtt__host-tests.yml
vendored
2
.github/workflows/mqtt__host-tests.yml
vendored
@ -22,7 +22,7 @@ jobs:
|
||||
- name: Build and Test
|
||||
shell: bash
|
||||
run: |
|
||||
apt-get update && apt-get install -y gcc-8 g++-8 python3-pip rsync
|
||||
apt-get update && apt-get install -y gcc g++ python3-pip rsync
|
||||
${IDF_PATH}/install.sh
|
||||
. ${IDF_PATH}/export.sh
|
||||
echo "IDF_PATH=${IDF_PATH}" >> $GITHUB_ENV
|
||||
|
@ -1,5 +1,6 @@
|
||||
stages:
|
||||
- build
|
||||
- test
|
||||
- deploy
|
||||
|
||||
|
||||
@ -48,6 +49,16 @@ build_idf_latest:
|
||||
extends: .build_template
|
||||
image: espressif/idf:latest
|
||||
|
||||
build_and_host_test:
|
||||
stage: build
|
||||
image: espressif/idf:latest
|
||||
script:
|
||||
# Replace the IDF's default esp-mqtt with this version
|
||||
- rm -rf $IDF_PATH/components/mqtt/esp-mqtt && cp -r $MQTT_PATH $IDF_PATH/components/mqtt/
|
||||
- cd $IDF_PATH/components/mqtt/esp-mqtt/host_test
|
||||
- idf.py build
|
||||
- build/host_mqtt_client_test.elf
|
||||
|
||||
build_and_test_qemu:
|
||||
stage: build
|
||||
image: ${CI_DOCKER_REGISTRY}/qemu-v5.1:1-20220802
|
||||
@ -80,6 +91,17 @@ build_and_test_qemu:
|
||||
- export MQTT_PUBLISH_MSG_len_3=20 MQTT_PUBLISH_MSG_repeat_3=20
|
||||
- python Runner.py $TEST_PATH -c $MQTT_PATH/ci/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
|
||||
|
||||
check_remotes_sync:
|
||||
stage: test
|
||||
except:
|
||||
- master
|
||||
- idf
|
||||
script:
|
||||
- *add_gh_key_remote
|
||||
- git fetch --depth=1 origin master
|
||||
- git fetch --depth=1 github master
|
||||
- test "$(git rev-parse origin/master)" == "$(git rev-parse github/master)"
|
||||
|
||||
push_master_to_github:
|
||||
stage: deploy
|
||||
image: ${CI_DOCKER_REGISTRY}/esp32-ci-env
|
||||
|
@ -12,4 +12,3 @@ idf_component_register(SRCS "${srcs}"
|
||||
PRIV_REQUIRES esp_timer http_parser esp_hw_support heap
|
||||
KCONFIG ${CMAKE_CURRENT_LIST_DIR}/Kconfig
|
||||
)
|
||||
target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format")
|
||||
|
@ -11,7 +11,6 @@ list(APPEND EXTRA_COMPONENT_DIRS
|
||||
"$ENV{IDF_PATH}/tools/mocks/lwip/"
|
||||
"$ENV{IDF_PATH}/tools/mocks/esp-tls/"
|
||||
"$ENV{IDF_PATH}/tools/mocks/http_parser/"
|
||||
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/"
|
||||
)
|
||||
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/")
|
||||
|
||||
project(host_mqtt_client_test)
|
||||
|
@ -2,10 +2,18 @@ idf_component_register(SRCS "test_mqtt_client.cpp"
|
||||
INCLUDE_DIRS "$ENV{IDF_PATH}/tools/catch"
|
||||
REQUIRES cmock mqtt esp_timer esp_hw_support http_parser log)
|
||||
|
||||
target_compile_options(${COMPONENT_LIB} PUBLIC -fsanitize=address -fconcepts)
|
||||
target_link_options(${COMPONENT_LIB} PUBLIC -fsanitize=address)
|
||||
|
||||
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
|
||||
target_compile_definitions(${mqtt} PRIVATE SOC_WIFI_SUPPORTED=1)
|
||||
target_compile_options(${mqtt} PUBLIC -fsanitize=address -fconcepts)
|
||||
target_link_options(${mqtt} PUBLIC -fsanitize=address)
|
||||
|
||||
if(CONFIG_GCOV_ENABLED)
|
||||
target_compile_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
|
||||
target_link_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
|
||||
|
||||
|
||||
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
|
||||
target_compile_options(${mqtt} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
|
||||
target_link_options(${mqtt} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
|
||||
|
@ -3,9 +3,16 @@
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
#include <memory>
|
||||
#include <net/if.h>
|
||||
#include <random>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
#include "esp_transport.h"
|
||||
#define CATCH_CONFIG_MAIN // This tells the catch header to generate a main
|
||||
#include "catch.hpp"
|
||||
|
||||
#include "mqtt_client.h"
|
||||
extern "C" {
|
||||
#include "Mockesp_event.h"
|
||||
#include "Mockesp_mac.h"
|
||||
@ -17,6 +24,10 @@ extern "C" {
|
||||
#include "Mockhttp_parser.h"
|
||||
#include "Mockqueue.h"
|
||||
#include "Mocktask.h"
|
||||
#if __has_include ("Mockidf_additions.h")
|
||||
/* Some functions were moved from "task.h" to "idf_additions.h" */
|
||||
#include "Mockidf_additions.h"
|
||||
#endif
|
||||
#include "Mockesp_timer.h"
|
||||
|
||||
/*
|
||||
@ -29,98 +40,130 @@ extern "C" {
|
||||
}
|
||||
}
|
||||
|
||||
#include "mqtt_client.h"
|
||||
|
||||
struct ClientInitializedFixture {
|
||||
esp_mqtt_client_handle_t client;
|
||||
ClientInitializedFixture()
|
||||
{
|
||||
[[maybe_unused]] auto protect = TEST_PROTECT();
|
||||
int mtx;
|
||||
int transport_list;
|
||||
int transport;
|
||||
int event_group;
|
||||
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
|
||||
esp_timer_get_time_IgnoreAndReturn(0);
|
||||
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
|
||||
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
|
||||
xQueueCreateMutex_ExpectAnyArgsAndReturn(
|
||||
reinterpret_cast<QueueHandle_t>(&mtx));
|
||||
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
|
||||
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
|
||||
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
|
||||
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
|
||||
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
|
||||
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
|
||||
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
|
||||
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
|
||||
http_parser_parse_url_IgnoreAndReturn(0);
|
||||
http_parser_url_init_ExpectAnyArgs();
|
||||
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
|
||||
esp_read_mac_IgnoreAndReturn(ESP_OK);
|
||||
esp_read_mac_ReturnThruPtr_mac(mac);
|
||||
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
|
||||
vEventGroupDelete_Ignore();
|
||||
vQueueDelete_Ignore();
|
||||
|
||||
esp_mqtt_client_config_t config{};
|
||||
client = esp_mqtt_client_init(&config);
|
||||
}
|
||||
~ClientInitializedFixture()
|
||||
{
|
||||
esp_mqtt_client_destroy(client);
|
||||
}
|
||||
};
|
||||
TEST_CASE_METHOD(ClientInitializedFixture, "Client set uri")
|
||||
auto random_string(std::size_t n)
|
||||
{
|
||||
struct http_parser_url ret_uri = {
|
||||
.field_set = 1,
|
||||
.port = 0,
|
||||
.field_data = { { 0, 1} }
|
||||
};
|
||||
SECTION("User set a correct URI") {
|
||||
http_parser_parse_url_StopIgnore();
|
||||
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
|
||||
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
|
||||
auto res = esp_mqtt_client_set_uri(client, " ");
|
||||
REQUIRE(res == ESP_OK);
|
||||
}
|
||||
SECTION("Incorrect URI from user") {
|
||||
http_parser_parse_url_StopIgnore();
|
||||
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
|
||||
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
|
||||
auto res = esp_mqtt_client_set_uri(client, " ");
|
||||
REQUIRE(res == ESP_FAIL);
|
||||
}
|
||||
static constexpr std::string_view char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ123456790";
|
||||
std::string str;
|
||||
std::sample(char_set.begin(), char_set.end(), std::back_inserter(str), n,
|
||||
std::mt19937 {std::random_device{}()});
|
||||
return str;
|
||||
}
|
||||
TEST_CASE_METHOD(ClientInitializedFixture, "Client Start")
|
||||
|
||||
using unique_mqtt_client = std::unique_ptr < std::remove_pointer_t<esp_mqtt_client_handle_t>, decltype([](esp_mqtt_client_handle_t client)
|
||||
{
|
||||
SECTION("Successful start") {
|
||||
esp_mqtt_client_destroy(client);
|
||||
}) >;
|
||||
|
||||
SCENARIO("MQTT Client Operation")
|
||||
{
|
||||
// Set expectations for the mocked calls.
|
||||
int mtx = 0;
|
||||
int transport_list = 0;
|
||||
int transport = 0;
|
||||
int event_group = 0;
|
||||
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
|
||||
esp_timer_get_time_IgnoreAndReturn(0);
|
||||
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
|
||||
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
|
||||
xQueueCreateMutex_ExpectAnyArgsAndReturn(
|
||||
reinterpret_cast<QueueHandle_t>(&mtx));
|
||||
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
|
||||
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
|
||||
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
|
||||
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
|
||||
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
|
||||
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
|
||||
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
|
||||
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
|
||||
http_parser_url_init_Ignore();
|
||||
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
|
||||
esp_read_mac_IgnoreAndReturn(ESP_OK);
|
||||
esp_read_mac_ReturnThruPtr_mac(mac);
|
||||
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
|
||||
esp_transport_destroy_IgnoreAndReturn(ESP_OK);
|
||||
vEventGroupDelete_Ignore();
|
||||
vQueueDelete_Ignore();
|
||||
GIVEN("An a minimal config") {
|
||||
esp_mqtt_client_config_t config{};
|
||||
config.broker.address.uri = "mqtt://1.1.1.1";
|
||||
struct http_parser_url ret_uri = {
|
||||
.field_set = 1 | (1<<1),
|
||||
.field_set = 1 | (1 << 1),
|
||||
.port = 0,
|
||||
.field_data = { { 0, 4 } /*mqtt*/, { 7, 1 } } // at least *scheme* and *host*
|
||||
};
|
||||
http_parser_parse_url_StopIgnore();
|
||||
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
|
||||
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
|
||||
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdTRUE);
|
||||
auto res = esp_mqtt_set_config(client, &config);
|
||||
REQUIRE(res == ESP_OK);
|
||||
res = esp_mqtt_client_start(client);
|
||||
REQUIRE(res == ESP_OK);
|
||||
}
|
||||
SECTION("Failed on initialization") {
|
||||
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
|
||||
auto res = esp_mqtt_client_start(nullptr);
|
||||
REQUIRE(res == ESP_ERR_INVALID_ARG);
|
||||
}
|
||||
SECTION("Client already started") {}
|
||||
SECTION("Failed to start task") {
|
||||
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
|
||||
auto res = esp_mqtt_client_start(client);
|
||||
REQUIRE(res == ESP_FAIL);
|
||||
SECTION("Client with minimal config") {
|
||||
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
|
||||
REQUIRE(client != nullptr);
|
||||
SECTION("User will set a new uri") {
|
||||
struct http_parser_url ret_uri = {
|
||||
.field_set = 1,
|
||||
.port = 0,
|
||||
.field_data = { { 0, 1} }
|
||||
};
|
||||
SECTION("User set a correct URI") {
|
||||
http_parser_parse_url_StopIgnore();
|
||||
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
|
||||
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
|
||||
auto res = esp_mqtt_client_set_uri(client.get(), " ");
|
||||
REQUIRE(res == ESP_OK);
|
||||
}
|
||||
SECTION("Incorrect URI from user") {
|
||||
http_parser_parse_url_StopIgnore();
|
||||
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
|
||||
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
|
||||
auto res = esp_mqtt_client_set_uri(client.get(), " ");
|
||||
REQUIRE(res == ESP_FAIL);
|
||||
}
|
||||
}
|
||||
SECTION("User set interface to use"){
|
||||
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
|
||||
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
|
||||
struct ifreq if_name = {.ifr_ifrn = {"custom"}};
|
||||
config.network.if_name = &if_name;
|
||||
SECTION("Client is not started"){
|
||||
REQUIRE(esp_mqtt_set_config(client.get(), &config)== ESP_OK);
|
||||
}
|
||||
}
|
||||
SECTION("After Start Client Is Cleanly destroyed") {
|
||||
REQUIRE(esp_mqtt_client_start(client.get()) == ESP_OK);
|
||||
// Only need to start the client, destroy is called automatically at the end of
|
||||
// scope
|
||||
}
|
||||
}
|
||||
SECTION("Client with all allocating configuration set") {
|
||||
auto host = random_string(20);
|
||||
auto path = random_string(10);
|
||||
auto username = random_string(10);
|
||||
auto client_id = random_string(10);
|
||||
auto password = random_string(10);
|
||||
auto lw_topic = random_string(10);
|
||||
auto lw_msg = random_string(10);
|
||||
|
||||
config.broker = {.address = {
|
||||
.hostname = host.data(),
|
||||
.path = path.data()
|
||||
}
|
||||
};
|
||||
config.credentials = {
|
||||
.username = username.data(),
|
||||
.client_id = client_id.data(),
|
||||
.authentication = {
|
||||
.password = password.data()
|
||||
}
|
||||
};
|
||||
config.session = {
|
||||
.last_will {
|
||||
.topic = lw_topic.data(),
|
||||
.msg = lw_msg.data()
|
||||
}
|
||||
};
|
||||
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
|
||||
REQUIRE(client != nullptr);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,41 +19,46 @@ typedef struct esp_mqtt_client *esp_mqtt5_client_handle_t;
|
||||
* MQTT5 protocol error reason code, more details refer to MQTT5 protocol document section 2.4
|
||||
*/
|
||||
enum mqtt5_error_reason_code {
|
||||
MQTT5_UNSPECIFIED_ERROR = 0x80,
|
||||
MQTT5_MALFORMED_PACKET = 0x81,
|
||||
MQTT5_PROTOCOL_ERROR = 0x82,
|
||||
MQTT5_IMPLEMENT_SPECIFIC_ERROR = 0x83,
|
||||
MQTT5_UNSUPPORTED_PROTOCOL_VER = 0x84,
|
||||
MQTT5_INVAILD_CLIENT_ID = 0x85,
|
||||
MQTT5_BAD_USERNAME_OR_PWD = 0x86,
|
||||
MQTT5_NOT_AUTHORIZED = 0x87,
|
||||
MQTT5_SERVER_UNAVAILABLE = 0x88,
|
||||
MQTT5_SERVER_BUSY = 0x89,
|
||||
MQTT5_BANNED = 0x8A,
|
||||
MQTT5_SERVER_SHUTTING_DOWN = 0x8B,
|
||||
MQTT5_BAD_AUTH_METHOD = 0x8C,
|
||||
MQTT5_KEEP_ALIVE_TIMEOUT = 0x8D,
|
||||
MQTT5_SESSION_TAKEN_OVER = 0x8E,
|
||||
MQTT5_TOPIC_FILTER_INVAILD = 0x8F,
|
||||
MQTT5_TOPIC_NAME_INVAILD = 0x90,
|
||||
MQTT5_PACKET_IDENTIFIER_IN_USE = 0x91,
|
||||
MQTT5_PACKET_IDENTIFIER_NOT_FOUND = 0x92,
|
||||
MQTT5_RECEIVE_MAXIMUM_EXCEEDED = 0x93,
|
||||
MQTT5_TOPIC_ALIAS_INVAILD = 0x94,
|
||||
MQTT5_PACKET_TOO_LARGE = 0x95,
|
||||
MQTT5_MESSAGE_RATE_TOO_HIGH = 0x96,
|
||||
MQTT5_QUOTA_EXCEEDED = 0x97,
|
||||
MQTT5_ADMINISTRATIVE_ACTION = 0x98,
|
||||
MQTT5_PAYLOAD_FORMAT_INVAILD = 0x99,
|
||||
MQTT5_RETAIN_NOT_SUPPORT = 0x9A,
|
||||
MQTT5_QOS_NOT_SUPPORT = 0x9B,
|
||||
MQTT5_USE_ANOTHER_SERVER = 0x9C,
|
||||
MQTT5_SERVER_MOVED = 0x9D,
|
||||
MQTT5_SHARED_SUBSCR_NOT_SUPPORTED = 0x9E,
|
||||
MQTT5_CONNECTION_RATE_EXCEEDED = 0x9F,
|
||||
MQTT5_MAXIMUM_CONNECT_TIME = 0xA0,
|
||||
MQTT5_SUBSCRIBE_IDENTIFIER_NOT_SUPPORT = 0xA1,
|
||||
MQTT5_WILDCARD_SUBSCRIBE_NOT_SUPPORT = 0xA2,
|
||||
MQTT5_UNSPECIFIED_ERROR = 0x80,
|
||||
MQTT5_MALFORMED_PACKET = 0x81,
|
||||
MQTT5_PROTOCOL_ERROR = 0x82,
|
||||
MQTT5_IMPLEMENT_SPECIFIC_ERROR = 0x83,
|
||||
MQTT5_UNSUPPORTED_PROTOCOL_VER = 0x84,
|
||||
MQTT5_INVAILD_CLIENT_ID __attribute__((deprecated)) = 0x85,
|
||||
MQTT5_INVALID_CLIENT_ID = 0x85,
|
||||
MQTT5_BAD_USERNAME_OR_PWD = 0x86,
|
||||
MQTT5_NOT_AUTHORIZED = 0x87,
|
||||
MQTT5_SERVER_UNAVAILABLE = 0x88,
|
||||
MQTT5_SERVER_BUSY = 0x89,
|
||||
MQTT5_BANNED = 0x8A,
|
||||
MQTT5_SERVER_SHUTTING_DOWN = 0x8B,
|
||||
MQTT5_BAD_AUTH_METHOD = 0x8C,
|
||||
MQTT5_KEEP_ALIVE_TIMEOUT = 0x8D,
|
||||
MQTT5_SESSION_TAKEN_OVER = 0x8E,
|
||||
MQTT5_TOPIC_FILTER_INVAILD __attribute__((deprecated)) = 0x8F,
|
||||
MQTT5_TOPIC_FILTER_INVALID = 0x8F,
|
||||
MQTT5_TOPIC_NAME_INVAILD __attribute__((deprecated)) = 0x90,
|
||||
MQTT5_TOPIC_NAME_INVALID = 0x90,
|
||||
MQTT5_PACKET_IDENTIFIER_IN_USE = 0x91,
|
||||
MQTT5_PACKET_IDENTIFIER_NOT_FOUND = 0x92,
|
||||
MQTT5_RECEIVE_MAXIMUM_EXCEEDED = 0x93,
|
||||
MQTT5_TOPIC_ALIAS_INVAILD __attribute__((deprecated)) = 0x94,
|
||||
MQTT5_TOPIC_ALIAS_INVALID = 0x94,
|
||||
MQTT5_PACKET_TOO_LARGE = 0x95,
|
||||
MQTT5_MESSAGE_RATE_TOO_HIGH = 0x96,
|
||||
MQTT5_QUOTA_EXCEEDED = 0x97,
|
||||
MQTT5_ADMINISTRATIVE_ACTION = 0x98,
|
||||
MQTT5_PAYLOAD_FORMAT_INVAILD __attribute__((deprecated)) = 0x99,
|
||||
MQTT5_PAYLOAD_FORMAT_INVALID = 0x99,
|
||||
MQTT5_RETAIN_NOT_SUPPORT = 0x9A,
|
||||
MQTT5_QOS_NOT_SUPPORT = 0x9B,
|
||||
MQTT5_USE_ANOTHER_SERVER = 0x9C,
|
||||
MQTT5_SERVER_MOVED = 0x9D,
|
||||
MQTT5_SHARED_SUBSCR_NOT_SUPPORTED = 0x9E,
|
||||
MQTT5_CONNECTION_RATE_EXCEEDED = 0x9F,
|
||||
MQTT5_MAXIMUM_CONNECT_TIME = 0xA0,
|
||||
MQTT5_SUBSCRIBE_IDENTIFIER_NOT_SUPPORT = 0xA1,
|
||||
MQTT5_WILDCARD_SUBSCRIBE_NOT_SUPPORT = 0xA2,
|
||||
};
|
||||
|
||||
/**
|
||||
@ -65,7 +70,7 @@ typedef struct mqtt5_user_property_list_t *mqtt5_user_property_handle_t;
|
||||
* MQTT5 protocol connect properties and will properties configuration, more details refer to MQTT5 protocol document section 3.1.2.11 and 3.3.2.3
|
||||
*/
|
||||
typedef struct {
|
||||
uint32_t session_expiry_interval; /*!< The interval time of session expiry */
|
||||
uint32_t session_expiry_interval; /*!< The interval time of session expiry */
|
||||
uint32_t maximum_packet_size; /*!< The maximum packet size that we can receive */
|
||||
uint16_t receive_maximum; /*!< The maximum pakcket count that we process concurrently */
|
||||
uint16_t topic_alias_maximum; /*!< The maximum topic alias that we support */
|
||||
@ -270,7 +275,7 @@ uint8_t esp_mqtt5_client_get_user_property_count(mqtt5_user_property_handle_t us
|
||||
* @brief Free the user property list
|
||||
*
|
||||
* @param user_property user_property handle
|
||||
*
|
||||
*
|
||||
* This API will free the memory in user property list and free user_property itself
|
||||
*/
|
||||
void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_property);
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <string.h>
|
||||
#include "esp_err.h"
|
||||
#include "esp_event.h"
|
||||
#include "esp_transport.h"
|
||||
#ifdef CONFIG_MQTT_PROTOCOL_5
|
||||
#include "mqtt5_client.h"
|
||||
#endif
|
||||
@ -213,8 +214,6 @@ typedef struct esp_mqtt_event_t {
|
||||
|
||||
typedef esp_mqtt_event_t *esp_mqtt_event_handle_t;
|
||||
|
||||
typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
|
||||
|
||||
/**
|
||||
* *MQTT* client configuration structure
|
||||
*
|
||||
@ -250,16 +249,21 @@ typedef struct esp_mqtt_client_config_t {
|
||||
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
|
||||
documentation for details. */
|
||||
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
|
||||
the usage of certificate bundles. */
|
||||
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
|
||||
the usage of certificate bundles. Client only attach the bundle, the clean up must be done by the user. */
|
||||
const char *certificate; /*!< Certificate data, default is NULL. It's not copied nor freed by the client, user needs to clean up.*/
|
||||
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
|
||||
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
|
||||
authentication (as alternative to certificate verification).
|
||||
PSK is enabled only if there are no other ways to
|
||||
verify broker.*/
|
||||
verify broker. It's not copied nor freed by the client, user needs to clean up.*/
|
||||
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
|
||||
security of TLS and makes the *MQTT* client susceptible to MITM attacks */
|
||||
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
|
||||
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN.*/
|
||||
const char *common_name; /*!< Pointer to the string containing server certificate common name.
|
||||
If non-NULL, server certificate CN must match this name,
|
||||
If NULL, server certificate CN must match hostname.
|
||||
This is ignored if skip_cert_common_name_check=true.
|
||||
It's not copied nor freed by the client, user needs to clean up.*/
|
||||
} verification; /*!< Security verification of the broker */
|
||||
} broker; /*!< Broker address and security verification */
|
||||
/**
|
||||
@ -283,17 +287,17 @@ typedef struct esp_mqtt_client_config_t {
|
||||
struct authentication_t {
|
||||
const char *password; /*!< *MQTT* password */
|
||||
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
|
||||
authentication is not needed. Must be provided with `key`.*/
|
||||
authentication is not needed. Must be provided with `key`. It's not copied nor freed by the client, user needs to clean up.*/
|
||||
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
|
||||
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
|
||||
is not needed. If it is not NULL, also `certificate` has to be provided.*/
|
||||
is not needed. If it is not NULL, also `certificate` has to be provided. It's not copied nor freed by the client, user needs to clean up.*/
|
||||
size_t key_len; /*!< Length of the buffer pointed to by key.*/
|
||||
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
|
||||
`key_password_len` must be correctly set. */
|
||||
`key_password_len` must be correctly set.*/
|
||||
int key_password_len; /*!< Length of the password pointed to by `key_password` */
|
||||
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
|
||||
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
|
||||
available in some Espressif devices. */
|
||||
available in some Espressif devices. It's not copied nor freed by the client, user needs to clean up.*/
|
||||
} authentication; /*!< Client authentication */
|
||||
} credentials; /*!< User credentials for broker */
|
||||
/**
|
||||
@ -311,7 +315,10 @@ typedef struct esp_mqtt_client_config_t {
|
||||
int retain; /*!< LWT retained message flag */
|
||||
} last_will; /*!< Last will configuration */
|
||||
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
|
||||
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
|
||||
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds
|
||||
When configuring this value, keep in mind that the client attempts
|
||||
to communicate with the broker at half the interval that is actually set.
|
||||
This conservative approach allows for more attempts before the broker's timeout occurs */
|
||||
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
|
||||
by default. Note: setting the config value `keepalive` to `0` doesn't disable
|
||||
keepalive feature, but uses a default keepalive period */
|
||||
@ -329,6 +336,8 @@ typedef struct esp_mqtt_client_config_t {
|
||||
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
|
||||
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
|
||||
`disable_auto_reconnect=true` to disable */
|
||||
esp_transport_handle_t transport; /*!< Custom transport handle to use. Warning: The transport should be valid during the client lifetime and is destroyed when esp_mqtt_client_destroy is called. */
|
||||
struct ifreq * if_name; /*!< The name of interface for data to go through. Use the default interface without setting */
|
||||
} network; /*!< Network configuration */
|
||||
/**
|
||||
* Client task configuration
|
||||
@ -347,6 +356,13 @@ typedef struct esp_mqtt_client_config_t {
|
||||
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
|
||||
``buffer_size`` */
|
||||
} buffer; /*!< Buffer size configuration.*/
|
||||
|
||||
/**
|
||||
* Client outbox configuration options.
|
||||
*/
|
||||
struct outbox_config_t {
|
||||
uint64_t limit; /*!< Size limit for the outbox in bytes.*/
|
||||
} outbox; /*!< Outbox configuration. */
|
||||
} esp_mqtt_client_config_t;
|
||||
|
||||
/**
|
||||
@ -425,7 +441,6 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
|
||||
*/
|
||||
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
||||
#define esp_mqtt_client_subscribe esp_mqtt_client_subscribe_single
|
||||
@ -444,9 +459,11 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
|
||||
*
|
||||
* @return message_id of the subscribe message on success
|
||||
* -1 on failure
|
||||
* -2 in case of full outbox.
|
||||
*/
|
||||
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
|
||||
char *: esp_mqtt_client_subscribe_single, \
|
||||
const char *: esp_mqtt_client_subscribe_single, \
|
||||
esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \
|
||||
)(client_handle, topic_type, qos_or_size)
|
||||
|
||||
@ -468,6 +485,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
|
||||
*
|
||||
* @return message_id of the subscribe message on success
|
||||
* -1 on failure
|
||||
* -2 in case of full outbox.
|
||||
*/
|
||||
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
|
||||
const char *topic, int qos);
|
||||
@ -488,6 +506,7 @@ int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
|
||||
*
|
||||
* @return message_id of the subscribe message on success
|
||||
* -1 on failure
|
||||
* -2 in case of full outbox.
|
||||
*/
|
||||
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
|
||||
const esp_mqtt_topic_t *topic_list, int size);
|
||||
@ -531,7 +550,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client,
|
||||
* @param retain retain flag
|
||||
*
|
||||
* @return message_id of the publish message (for QoS 0 message_id will always
|
||||
* be zero) on success. -1 on failure.
|
||||
* be zero) on success. -1 on failure, -2 in case of full outbox.
|
||||
*/
|
||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
const char *data, int len, int qos, int retain);
|
||||
@ -556,7 +575,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
* @param store if true, all messages are enqueued; otherwise only QoS 1 and
|
||||
* QoS 2 are enqueued
|
||||
*
|
||||
* @return message_id if queued successfully, -1 otherwise
|
||||
* @return message_id if queued successfully, -1 on failure, -2 in case of full outbox.
|
||||
*/
|
||||
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
|
||||
const char *data, int len, int qos, int retain,
|
||||
@ -579,6 +598,9 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client);
|
||||
* @brief Set configuration structure, typically used when updating the config
|
||||
* (i.e. on "before_connect" event
|
||||
*
|
||||
* Notes:
|
||||
* - When calling this function make sure to have all the intendend configurations
|
||||
* set, otherwise default values are set.
|
||||
* @param client *MQTT* client handle
|
||||
*
|
||||
* @param config *MQTT* configuration structure
|
||||
@ -603,9 +625,9 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client,
|
||||
* ESP_OK on success
|
||||
*/
|
||||
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client,
|
||||
esp_mqtt_event_id_t event,
|
||||
esp_event_handler_t event_handler,
|
||||
void *event_handler_arg);
|
||||
esp_mqtt_event_id_t event,
|
||||
esp_event_handler_t event_handler,
|
||||
void *event_handler_arg);
|
||||
|
||||
/**
|
||||
* @brief Unregisters mqtt event
|
||||
|
@ -64,5 +64,11 @@
|
||||
#define MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
|
||||
#endif
|
||||
|
||||
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
|
||||
// Features supported in 5.1.0
|
||||
#define MQTT_SUPPORTED_FEATURE_CRT_CMN_NAME
|
||||
#endif
|
||||
|
||||
|
||||
#endif /* ESP_IDF_VERSION */
|
||||
#endif // _MQTT_SUPPORTED_FEATURES_H_
|
||||
|
@ -7,6 +7,7 @@
|
||||
#ifndef _MQTT_CLIENT_PRIV_H_
|
||||
#define _MQTT_CLIENT_PRIV_H_
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdatomic.h>
|
||||
@ -38,6 +39,14 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#if CONFIG_NEWLIB_NANO_FORMAT
|
||||
#define NEWLIB_NANO_COMPAT_FORMAT PRIu32
|
||||
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) (uint32_t)size_t_var
|
||||
#else
|
||||
#define NEWLIB_NANO_COMPAT_FORMAT "zu"
|
||||
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) size_t_var
|
||||
#endif
|
||||
|
||||
#ifdef MQTT_DISABLE_API_LOCKS
|
||||
# define MQTT_API_LOCK(c)
|
||||
# define MQTT_API_UNLOCK(c)
|
||||
@ -48,20 +57,16 @@ extern "C" {
|
||||
|
||||
typedef struct mqtt_state {
|
||||
uint8_t *in_buffer;
|
||||
uint8_t *out_buffer;
|
||||
int in_buffer_length;
|
||||
int out_buffer_length;
|
||||
size_t message_length;
|
||||
size_t in_buffer_read_len;
|
||||
mqtt_message_t *outbound_message;
|
||||
mqtt_connection_t mqtt_connection;
|
||||
mqtt_connection_t connection;
|
||||
uint16_t pending_msg_id;
|
||||
int pending_msg_type;
|
||||
int pending_publish_qos;
|
||||
} mqtt_state_t;
|
||||
|
||||
typedef struct {
|
||||
mqtt_event_callback_t event_handle;
|
||||
esp_event_loop_handle_t event_loop_handle;
|
||||
int task_stack;
|
||||
int task_prio;
|
||||
@ -88,9 +93,13 @@ typedef struct {
|
||||
size_t clientkey_bytes;
|
||||
const struct psk_key_hint *psk_hint_key;
|
||||
bool skip_cert_common_name_check;
|
||||
const char *common_name;
|
||||
bool use_secure_element;
|
||||
void *ds_data;
|
||||
int message_retransmit_timeout;
|
||||
uint64_t outbox_limit;
|
||||
esp_transport_handle_t transport;
|
||||
struct ifreq * if_name;
|
||||
} mqtt_config_storage_t;
|
||||
|
||||
typedef enum {
|
||||
@ -105,8 +114,7 @@ struct esp_mqtt_client {
|
||||
esp_transport_handle_t transport;
|
||||
mqtt_config_storage_t *config;
|
||||
mqtt_state_t mqtt_state;
|
||||
mqtt_connect_info_t connect_info;
|
||||
mqtt_client_state_t state;
|
||||
_Atomic mqtt_client_state_t state;
|
||||
uint64_t refresh_connection_tick;
|
||||
int64_t keepalive_tick;
|
||||
uint64_t reconnect_tick;
|
||||
|
@ -100,6 +100,7 @@
|
||||
#define MQTT_ENABLE_SSL CONFIG_MQTT_TRANSPORT_SSL
|
||||
#define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET
|
||||
#define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE
|
||||
#define MQTT_DEFAULT_RETRANSMIT_TIMEOUT_MS 1000
|
||||
|
||||
#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE
|
||||
#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE
|
||||
|
@ -68,16 +68,6 @@ typedef struct mqtt_message {
|
||||
size_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
|
||||
} mqtt_message_t;
|
||||
|
||||
typedef struct mqtt_connection {
|
||||
mqtt_message_t message;
|
||||
#if MQTT_MSG_ID_INCREMENTAL
|
||||
uint16_t last_message_id; /*!< last used id if incremental message id configured */
|
||||
#endif
|
||||
uint8_t *buffer;
|
||||
size_t buffer_length;
|
||||
|
||||
} mqtt_connection_t;
|
||||
|
||||
typedef struct mqtt_connect_info {
|
||||
char *client_id;
|
||||
char *username;
|
||||
@ -90,9 +80,18 @@ typedef struct mqtt_connect_info {
|
||||
int will_retain;
|
||||
int clean_session;
|
||||
esp_mqtt_protocol_ver_t protocol_ver;
|
||||
|
||||
} mqtt_connect_info_t;
|
||||
|
||||
typedef struct mqtt_connection {
|
||||
mqtt_message_t outbound_message;
|
||||
#if MQTT_MSG_ID_INCREMENTAL
|
||||
uint16_t last_message_id; /*!< last used id if incremental message id configured */
|
||||
#endif
|
||||
uint8_t *buffer;
|
||||
size_t buffer_length;
|
||||
mqtt_connect_info_t information;
|
||||
|
||||
} mqtt_connection_t;
|
||||
|
||||
static inline int mqtt_get_type(const uint8_t *buffer)
|
||||
{
|
||||
@ -123,7 +122,6 @@ static inline int mqtt_get_retain(const uint8_t *buffer)
|
||||
return (buffer[0] & 0x01);
|
||||
}
|
||||
|
||||
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length);
|
||||
bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length);
|
||||
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len);
|
||||
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length);
|
||||
@ -132,6 +130,9 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length);
|
||||
uint16_t mqtt_get_id(uint8_t *buffer, size_t length);
|
||||
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length);
|
||||
|
||||
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size);
|
||||
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection);
|
||||
|
||||
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info);
|
||||
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id);
|
||||
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id);
|
||||
@ -143,8 +144,6 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *
|
||||
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
|
||||
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
|
||||
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -14,7 +14,7 @@ extern "C" {
|
||||
|
||||
struct outbox_item;
|
||||
|
||||
typedef struct outbox_list_t *outbox_handle_t;
|
||||
typedef struct outbox_t *outbox_handle_t;
|
||||
typedef struct outbox_item *outbox_item_handle_t;
|
||||
typedef struct outbox_message *outbox_message_handle_t;
|
||||
typedef long long outbox_tick_t;
|
||||
@ -42,8 +42,6 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
|
||||
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
|
||||
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
|
||||
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
|
||||
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
|
||||
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
|
||||
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
|
||||
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
|
||||
/**
|
||||
@ -56,7 +54,7 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t
|
||||
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
|
||||
pending_state_t outbox_item_get_pending(outbox_item_handle_t item);
|
||||
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
|
||||
int outbox_get_size(outbox_handle_t outbox);
|
||||
uint64_t outbox_get_size(outbox_handle_t outbox);
|
||||
void outbox_destroy(outbox_handle_t outbox);
|
||||
void outbox_delete_all_items(outbox_handle_t outbox);
|
||||
|
||||
|
172
lib/mqtt5_msg.c
172
lib/mqtt5_msg.c
@ -69,8 +69,8 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
|
||||
generate_variable_len(len, &len_bytes, encoded_lens);
|
||||
int offset = len_bytes - 1;
|
||||
|
||||
connection->message.length += offset;
|
||||
if (connection->message.length > connection->buffer_length) {
|
||||
connection->outbound_message.length += offset;
|
||||
if (connection->outbound_message.length > connection->buffer_length) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -89,33 +89,33 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
|
||||
|
||||
static int append_property(mqtt_connection_t *connection, uint8_t property_type, uint8_t len_occupy, const char *data, size_t data_len)
|
||||
{
|
||||
if ((connection->message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
|
||||
if ((connection->outbound_message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t origin_message_len = connection->message.length;
|
||||
size_t origin_message_len = connection->outbound_message.length;
|
||||
if (property_type) {
|
||||
connection->buffer[connection->message.length ++] = property_type;
|
||||
connection->buffer[connection->outbound_message.length ++] = property_type;
|
||||
}
|
||||
|
||||
if (len_occupy == 0) {
|
||||
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
|
||||
generate_variable_len(data_len, &len_bytes, encoded_lens);
|
||||
for (int j = 0; j < len_bytes; j ++) {
|
||||
connection->buffer[connection->message.length ++] = encoded_lens[j];
|
||||
connection->buffer[connection->outbound_message.length ++] = encoded_lens[j];
|
||||
}
|
||||
} else {
|
||||
for (int i = 1; i <= len_occupy; i ++) {
|
||||
connection->buffer[connection->message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
|
||||
connection->buffer[connection->outbound_message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
|
||||
}
|
||||
}
|
||||
|
||||
if (data) {
|
||||
memcpy(connection->buffer + connection->message.length, data, data_len);
|
||||
connection->message.length += data_len;
|
||||
memcpy(connection->buffer + connection->outbound_message.length, data, data_len);
|
||||
connection->outbound_message.length += data_len;
|
||||
}
|
||||
|
||||
return connection->message.length - origin_message_len;
|
||||
return connection->outbound_message.length - origin_message_len;
|
||||
}
|
||||
|
||||
static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id)
|
||||
@ -130,36 +130,36 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
|
||||
#endif
|
||||
}
|
||||
|
||||
if (connection->message.length + 2 > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + 2 > connection->buffer_length) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], message_id)
|
||||
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], message_id)
|
||||
|
||||
return message_id;
|
||||
}
|
||||
|
||||
static int init_message(mqtt_connection_t *connection)
|
||||
{
|
||||
connection->message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
|
||||
connection->outbound_message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
|
||||
return MQTT5_MAX_FIXED_HEADER_SIZE;
|
||||
}
|
||||
|
||||
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
|
||||
{
|
||||
connection->message.data = connection->buffer;
|
||||
connection->message.length = 0;
|
||||
return &connection->message;
|
||||
connection->outbound_message.data = connection->buffer;
|
||||
connection->outbound_message.length = 0;
|
||||
return &connection->outbound_message;
|
||||
}
|
||||
|
||||
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
|
||||
{
|
||||
int message_length = connection->message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
|
||||
int message_length = connection->outbound_message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
|
||||
int total_length = message_length;
|
||||
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
|
||||
// Check if we have fragmented message and update total_len
|
||||
if (connection->message.fragmented_msg_total_length) {
|
||||
total_length = connection->message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
|
||||
if (connection->outbound_message.fragmented_msg_total_length) {
|
||||
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
|
||||
}
|
||||
|
||||
// Encode MQTT message length
|
||||
@ -171,10 +171,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
|
||||
}
|
||||
|
||||
// Save the header bytes
|
||||
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
|
||||
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
|
||||
int offs = MQTT5_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
|
||||
connection->message.data = connection->buffer + offs;
|
||||
connection->message.fragmented_msg_data_offset -= offs;
|
||||
connection->outbound_message.data = connection->buffer + offs;
|
||||
connection->outbound_message.fragmented_msg_data_offset -= offs;
|
||||
// type byte
|
||||
connection->buffer[offs ++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
|
||||
// length bytes
|
||||
@ -182,7 +182,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
|
||||
connection->buffer[offs ++] = encoded_lens[j];
|
||||
}
|
||||
|
||||
return &connection->message;
|
||||
return &connection->outbound_message;
|
||||
}
|
||||
|
||||
static esp_err_t mqtt5_msg_set_user_property(mqtt5_user_property_handle_t *user_property, char *key, size_t key_len, char *value, size_t value_len)
|
||||
@ -339,7 +339,7 @@ char *mqtt5_get_publish_property_payload(uint8_t *buffer, size_t buffer_length,
|
||||
continue;
|
||||
case MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL:
|
||||
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(resp_property->message_expiry_interval, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
|
||||
ESP_LOGD(TAG, "MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL %d", resp_property->message_expiry_interval);
|
||||
ESP_LOGD(TAG, "MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL %"PRIu32, resp_property->message_expiry_interval);
|
||||
continue;
|
||||
case MQTT5_PROPERTY_TOPIC_ALIAS:
|
||||
MQTT5_CONVERT_ONE_BYTE_TO_TWO(resp_property->topic_alias, property[property_offset ++], property[property_offset ++])
|
||||
@ -465,24 +465,24 @@ char *mqtt5_get_puback_data(uint8_t *buffer, size_t *length, mqtt5_user_property
|
||||
mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info, esp_mqtt5_connection_property_storage_t *property, esp_mqtt5_connection_will_property_storage_t *will_property)
|
||||
{
|
||||
init_message(connection);
|
||||
connection->buffer[connection->message.length ++] = 0; // Variable header length MSB
|
||||
connection->buffer[connection->outbound_message.length ++] = 0; // Variable header length MSB
|
||||
/* Defaults to protocol version 5 values */
|
||||
connection->buffer[connection->message.length ++] = 4; // Variable header length LSB
|
||||
memcpy(&connection->buffer[connection->message.length], "MQTT", 4); // Protocol name
|
||||
connection->message.length += 4;
|
||||
connection->buffer[connection->message.length ++] = 5; // Protocol version
|
||||
connection->buffer[connection->outbound_message.length ++] = 4; // Variable header length LSB
|
||||
memcpy(&connection->buffer[connection->outbound_message.length], "MQTT", 4); // Protocol name
|
||||
connection->outbound_message.length += 4;
|
||||
connection->buffer[connection->outbound_message.length ++] = 5; // Protocol version
|
||||
|
||||
int flags_offset = connection->message.length;
|
||||
connection->buffer[connection->message.length ++] = 0; // Flags
|
||||
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], info->keepalive) // Keep-alive
|
||||
int flags_offset = connection->outbound_message.length;
|
||||
connection->buffer[connection->outbound_message.length ++] = 0; // Flags
|
||||
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], info->keepalive) // Keep-alive
|
||||
|
||||
if (info->clean_session) {
|
||||
connection->buffer[flags_offset] |= MQTT5_CONNECT_FLAG_CLEAN_SESSION;
|
||||
}
|
||||
|
||||
//Add properties
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
if (property->session_expiry_interval) {
|
||||
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, property->session_expiry_interval), fail_message(connection));
|
||||
}
|
||||
@ -508,7 +508,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
|
||||
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
|
||||
}
|
||||
}
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
|
||||
if (info->client_id != NULL && info->client_id[0] != '\0') {
|
||||
APPEND_CHECK(append_property(connection, 0, 2, info->client_id, strlen(info->client_id)), fail_message(connection));
|
||||
@ -518,8 +518,8 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
|
||||
|
||||
//Add will properties
|
||||
if (info->will_topic != NULL && info->will_topic[0] != '\0') {
|
||||
properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
if (will_property->will_delay_interval) {
|
||||
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_WILL_DELAY_INTERVAL, 4, NULL, will_property->will_delay_interval), fail_message(connection));
|
||||
}
|
||||
@ -545,7 +545,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
|
||||
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
|
||||
}
|
||||
}
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
|
||||
APPEND_CHECK(append_property(connection, 0, 2, info->will_topic, strlen(info->will_topic)), fail_message(connection));
|
||||
APPEND_CHECK(append_property(connection, 0, 2, info->will_message, info->will_length), fail_message(connection));
|
||||
@ -603,7 +603,7 @@ esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, m
|
||||
switch (property_id) {
|
||||
case MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL:
|
||||
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(connection_property->session_expiry_interval, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
|
||||
ESP_LOGD(TAG, "MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL %d", connection_property->session_expiry_interval);
|
||||
ESP_LOGD(TAG, "MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL %"PRIu32, connection_property->session_expiry_interval);
|
||||
continue;
|
||||
case MQTT5_PROPERTY_RECEIVE_MAXIMUM:
|
||||
MQTT5_CONVERT_ONE_BYTE_TO_TWO(resp_property->receive_maximum, property[property_offset ++], property[property_offset ++])
|
||||
@ -619,7 +619,7 @@ esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, m
|
||||
continue;
|
||||
case MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE:
|
||||
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(resp_property->maximum_packet_size, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
|
||||
ESP_LOGD(TAG, "MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE %d", resp_property->maximum_packet_size);
|
||||
ESP_LOGD(TAG, "MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE %"PRIu32, resp_property->maximum_packet_size);
|
||||
continue;
|
||||
case MQTT5_PROPERTY_ASSIGNED_CLIENT_IDENTIFIER:
|
||||
MQTT5_CONVERT_ONE_BYTE_TO_TWO(len, property[property_offset ++], property[property_offset ++])
|
||||
@ -742,8 +742,8 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
|
||||
*message_id = 0;
|
||||
}
|
||||
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
|
||||
if (property) {
|
||||
if (property->payload_format_indicator) {
|
||||
@ -761,7 +761,7 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
|
||||
char *response_topic = calloc(1, response_topic_size);
|
||||
if (!response_topic) {
|
||||
ESP_LOGE(TAG, "Failed to calloc %d memory", response_topic_size);
|
||||
fail_message(connection);
|
||||
return fail_message(connection);
|
||||
}
|
||||
snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info);
|
||||
if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) {
|
||||
@ -788,20 +788,20 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
|
||||
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_CONTENT_TYPE, 2, property->content_type, strlen(property->content_type)), fail_message(connection));
|
||||
}
|
||||
}
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
|
||||
if (connection->message.length + data_length > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + data_length > connection->buffer_length) {
|
||||
// Not enough size in buffer -> fragment this message
|
||||
connection->message.fragmented_msg_data_offset = connection->message.length;
|
||||
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
|
||||
connection->message.length = connection->buffer_length;
|
||||
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
|
||||
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
|
||||
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
|
||||
connection->outbound_message.length = connection->buffer_length;
|
||||
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
|
||||
} else {
|
||||
if (data != NULL) {
|
||||
memcpy(connection->buffer + connection->message.length, data, data_length);
|
||||
connection->message.length += data_length;
|
||||
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
|
||||
connection->outbound_message.length += data_length;
|
||||
}
|
||||
connection->message.fragmented_msg_total_length = 0;
|
||||
connection->outbound_message.fragmented_msg_total_length = 0;
|
||||
}
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
|
||||
}
|
||||
@ -858,8 +858,8 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
|
||||
if (property) {
|
||||
if (property->subscribe_id) {
|
||||
@ -873,7 +873,7 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
|
||||
}
|
||||
}
|
||||
}
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
|
||||
for (int topic_number = 0; topic_number < size; ++topic_number) {
|
||||
if (topic_list[topic_number].filter[0] == '\0') {
|
||||
@ -897,23 +897,23 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
|
||||
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length] = 0;
|
||||
connection->buffer[connection->outbound_message.length] = 0;
|
||||
if (property) {
|
||||
if (property->retain_handle > 0 && property->retain_handle < 3) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
|
||||
connection->buffer[connection->outbound_message.length] |= (property->retain_handle & 3) << 4;
|
||||
}
|
||||
if (property->no_local_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
|
||||
connection->buffer[connection->outbound_message.length] |= (property->no_local_flag << 2);
|
||||
}
|
||||
if (property->retain_as_published_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
|
||||
connection->buffer[connection->outbound_message.length] |= (property->retain_as_published_flag << 3);
|
||||
}
|
||||
}
|
||||
connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3);
|
||||
connection->message.length ++;
|
||||
connection->buffer[connection->outbound_message.length] |= (topic_list[topic_number].qos & 3);
|
||||
connection->outbound_message.length ++;
|
||||
}
|
||||
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
|
||||
}
|
||||
@ -921,10 +921,10 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
|
||||
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info)
|
||||
{
|
||||
init_message(connection);
|
||||
int reason_offset = connection->message.length;
|
||||
connection->buffer[connection->message.length ++] = 0;
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
int reason_offset = connection->outbound_message.length;
|
||||
connection->buffer[connection->outbound_message.length ++] = 0;
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
if (disconnect_property_info) {
|
||||
if (disconnect_property_info->session_expiry_interval) {
|
||||
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, disconnect_property_info->session_expiry_interval), fail_message(connection));
|
||||
@ -940,7 +940,7 @@ mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_di
|
||||
connection->buffer[reason_offset] = disconnect_property_info->disconnect_reason;
|
||||
}
|
||||
}
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
|
||||
}
|
||||
|
||||
@ -956,8 +956,8 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
if (property) {
|
||||
if (property->user_property) {
|
||||
mqtt5_user_property_item_t item;
|
||||
@ -968,7 +968,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
|
||||
}
|
||||
}
|
||||
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
if (property && property->is_share_subscribe) {
|
||||
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
|
||||
char *shared_topic = calloc(1, shared_topic_size);
|
||||
@ -996,10 +996,10 @@ mqtt_message_t *mqtt5_msg_puback(mqtt_connection_t *connection, uint16_t message
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
|
||||
}
|
||||
|
||||
@ -1009,10 +1009,10 @@ mqtt_message_t *mqtt5_msg_pubrec(mqtt_connection_t *connection, uint16_t message
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
|
||||
}
|
||||
|
||||
@ -1022,10 +1022,10 @@ mqtt_message_t *mqtt5_msg_pubrel(mqtt_connection_t *connection, uint16_t message
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
|
||||
}
|
||||
|
||||
@ -1035,9 +1035,9 @@ mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t messag
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->message.length;
|
||||
connection->message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
|
||||
int properties_offset = connection->outbound_message.length;
|
||||
connection->outbound_message.length ++;
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
|
||||
}
|
||||
|
117
lib/mqtt_msg.c
117
lib/mqtt_msg.c
@ -48,14 +48,14 @@ enum mqtt_connect_flag {
|
||||
|
||||
static int append_string(mqtt_connection_t *connection, const char *string, int len)
|
||||
{
|
||||
if (connection->message.length + len + 2 > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + len + 2 > connection->buffer_length) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
connection->buffer[connection->message.length++] = len >> 8;
|
||||
connection->buffer[connection->message.length++] = len & 0xff;
|
||||
memcpy(connection->buffer + connection->message.length, string, len);
|
||||
connection->message.length += len;
|
||||
connection->buffer[connection->outbound_message.length++] = len >> 8;
|
||||
connection->buffer[connection->outbound_message.length++] = len & 0xff;
|
||||
memcpy(connection->buffer + connection->outbound_message.length, string, len);
|
||||
connection->outbound_message.length += len;
|
||||
|
||||
return len + 2;
|
||||
}
|
||||
@ -72,38 +72,38 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
|
||||
#endif
|
||||
}
|
||||
|
||||
if (connection->message.length + 2 > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + 2 > connection->buffer_length) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
connection->buffer[connection->message.length++] = message_id >> 8;
|
||||
connection->buffer[connection->message.length++] = message_id & 0xff;
|
||||
connection->buffer[connection->outbound_message.length++] = message_id >> 8;
|
||||
connection->buffer[connection->outbound_message.length++] = message_id & 0xff;
|
||||
|
||||
return message_id;
|
||||
}
|
||||
|
||||
static int init_message(mqtt_connection_t *connection)
|
||||
static int set_message_header_size(mqtt_connection_t *connection)
|
||||
{
|
||||
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
return MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
}
|
||||
|
||||
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
|
||||
{
|
||||
connection->message.data = connection->buffer;
|
||||
connection->message.length = 0;
|
||||
return &connection->message;
|
||||
connection->outbound_message.data = connection->buffer;
|
||||
connection->outbound_message.length = 0;
|
||||
return &connection->outbound_message;
|
||||
}
|
||||
|
||||
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
|
||||
{
|
||||
int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
int total_length = message_length;
|
||||
int encoded_length = 0;
|
||||
uint8_t encoded_lens[4] = {0};
|
||||
// Check if we have fragmented message and update total_len
|
||||
if (connection->message.fragmented_msg_total_length) {
|
||||
total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
if (connection->outbound_message.fragmented_msg_total_length) {
|
||||
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
|
||||
}
|
||||
|
||||
// Encode MQTT message length
|
||||
@ -124,10 +124,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
|
||||
}
|
||||
|
||||
// Save the header bytes
|
||||
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
|
||||
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
|
||||
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
|
||||
connection->message.data = connection->buffer + offs;
|
||||
connection->message.fragmented_msg_data_offset -= offs;
|
||||
connection->outbound_message.data = connection->buffer + offs;
|
||||
connection->outbound_message.fragmented_msg_data_offset -= offs;
|
||||
// type byte
|
||||
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
|
||||
// length bytes
|
||||
@ -135,14 +135,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
|
||||
connection->buffer[offs++] = encoded_lens[j];
|
||||
}
|
||||
|
||||
return &connection->message;
|
||||
}
|
||||
|
||||
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length)
|
||||
{
|
||||
memset(connection, 0, sizeof(mqtt_connection_t));
|
||||
connection->buffer = buffer;
|
||||
connection->buffer_length = buffer_length;
|
||||
return &connection->outbound_message;
|
||||
}
|
||||
|
||||
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len)
|
||||
@ -347,7 +340,7 @@ uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
|
||||
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
|
||||
{
|
||||
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
|
||||
int header_len;
|
||||
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
|
||||
@ -356,11 +349,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
|
||||
header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE;
|
||||
}
|
||||
|
||||
if (connection->message.length + header_len > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + header_len > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
char *variable_header = (char *)(connection->buffer + connection->message.length);
|
||||
connection->message.length += header_len;
|
||||
char *variable_header = (char *)(connection->buffer + connection->outbound_message.length);
|
||||
connection->outbound_message.length += header_len;
|
||||
|
||||
int header_idx = 0;
|
||||
variable_header[header_idx++] = 0; // Variable header length MSB
|
||||
@ -445,7 +438,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
|
||||
|
||||
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
|
||||
if (topic == NULL || topic[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
@ -468,16 +461,16 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
|
||||
}
|
||||
|
||||
if (data != NULL) {
|
||||
if (connection->message.length + data_length > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + data_length > connection->buffer_length) {
|
||||
// Not enough size in buffer -> fragment this message
|
||||
connection->message.fragmented_msg_data_offset = connection->message.length;
|
||||
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
|
||||
connection->message.length = connection->buffer_length;
|
||||
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
|
||||
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
|
||||
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
|
||||
connection->outbound_message.length = connection->buffer_length;
|
||||
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
|
||||
} else {
|
||||
memcpy(connection->buffer + connection->message.length, data, data_length);
|
||||
connection->message.length += data_length;
|
||||
connection->message.fragmented_msg_total_length = 0;
|
||||
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
|
||||
connection->outbound_message.length += data_length;
|
||||
connection->outbound_message.fragmented_msg_total_length = 0;
|
||||
}
|
||||
}
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
|
||||
@ -485,7 +478,7 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
|
||||
|
||||
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
@ -494,7 +487,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
|
||||
|
||||
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
@ -503,7 +496,7 @@ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_
|
||||
|
||||
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
@ -512,7 +505,7 @@ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_
|
||||
|
||||
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
if (append_message_id(connection, message_id) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
@ -521,7 +514,7 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message
|
||||
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
|
||||
if ((*message_id = append_message_id(connection, 0)) == 0) {
|
||||
return fail_message(connection);
|
||||
@ -536,11 +529,11 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
if (connection->outbound_message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length] = topic_list[topic_number].qos;
|
||||
connection->message.length ++;
|
||||
connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos;
|
||||
connection->outbound_message.length ++;
|
||||
}
|
||||
|
||||
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
|
||||
@ -548,7 +541,7 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt
|
||||
|
||||
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
|
||||
if (topic == NULL || topic[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
@ -567,19 +560,19 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *
|
||||
|
||||
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection)
|
||||
{
|
||||
init_message(connection);
|
||||
set_message_header_size(connection);
|
||||
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
|
||||
}
|
||||
|
||||
@ -622,3 +615,23 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length)
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size)
|
||||
{
|
||||
memset(&connection->outbound_message, 0, sizeof(mqtt_message_t));
|
||||
connection->buffer = (uint8_t *)calloc(buffer_size, sizeof(uint8_t));
|
||||
if (!connection->buffer) {
|
||||
return ESP_ERR_NO_MEM;
|
||||
}
|
||||
connection->buffer_length = buffer_size;
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection)
|
||||
{
|
||||
if (connection) {
|
||||
free(connection->buffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include "mqtt_outbox.h"
|
||||
#include <stdint.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "mqtt_config.h"
|
||||
@ -22,18 +23,25 @@ typedef struct outbox_item {
|
||||
|
||||
STAILQ_HEAD(outbox_list_t, outbox_item);
|
||||
|
||||
struct outbox_t {
|
||||
_Atomic uint64_t size;
|
||||
struct outbox_list_t *list;
|
||||
};
|
||||
|
||||
outbox_handle_t outbox_init(void)
|
||||
{
|
||||
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t));
|
||||
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t));
|
||||
ESP_MEM_CHECK(TAG, outbox, return NULL);
|
||||
STAILQ_INIT(outbox);
|
||||
outbox->list = calloc(1, sizeof(struct outbox_list_t));
|
||||
ESP_MEM_CHECK(TAG, outbox->list, {free(outbox); return NULL;});
|
||||
outbox->size = 0;
|
||||
STAILQ_INIT(outbox->list);
|
||||
return outbox;
|
||||
}
|
||||
|
||||
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
|
||||
{
|
||||
outbox_item_handle_t item = heap_caps_calloc(1, sizeof(outbox_item_t), MQTT_OUTBOX_MEMORY);
|
||||
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
|
||||
ESP_MEM_CHECK(TAG, item, return NULL);
|
||||
item->msg_id = message->msg_id;
|
||||
item->msg_type = message->msg_type;
|
||||
@ -41,7 +49,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
|
||||
item->tick = tick;
|
||||
item->len = message->len + message->remaining_len;
|
||||
item->pending = QUEUED;
|
||||
item->buffer = malloc(message->len + message->remaining_len);
|
||||
item->buffer = heap_caps_malloc(message->len + message->remaining_len, MQTT_OUTBOX_MEMORY);
|
||||
ESP_MEM_CHECK(TAG, item->buffer, {
|
||||
free(item);
|
||||
return NULL;
|
||||
@ -50,15 +58,16 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
|
||||
if (message->remaining_data) {
|
||||
memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
|
||||
}
|
||||
STAILQ_INSERT_TAIL(outbox, item, next);
|
||||
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
|
||||
STAILQ_INSERT_TAIL(outbox->list, item, next);
|
||||
outbox->size += item->len;
|
||||
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%"PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
|
||||
return item;
|
||||
}
|
||||
|
||||
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
|
||||
{
|
||||
outbox_item_handle_t item;
|
||||
STAILQ_FOREACH(item, outbox, next) {
|
||||
STAILQ_FOREACH(item, outbox->list, next) {
|
||||
if (item->msg_id == msg_id) {
|
||||
return item;
|
||||
}
|
||||
@ -69,7 +78,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
|
||||
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
|
||||
{
|
||||
outbox_item_handle_t item;
|
||||
STAILQ_FOREACH(item, outbox, next) {
|
||||
STAILQ_FOREACH(item, outbox->list, next) {
|
||||
if (item->pending == pending) {
|
||||
if (tick) {
|
||||
*tick = item->tick;
|
||||
@ -83,9 +92,10 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
|
||||
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
|
||||
{
|
||||
outbox_item_handle_t item;
|
||||
STAILQ_FOREACH(item, outbox, next) {
|
||||
STAILQ_FOREACH(item, outbox->list, next) {
|
||||
if (item == item_to_delete) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
|
||||
outbox->size -= item->len;
|
||||
free(item->buffer);
|
||||
free(item);
|
||||
return ESP_OK;
|
||||
@ -109,31 +119,20 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t
|
||||
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
|
||||
{
|
||||
outbox_item_handle_t item, tmp;
|
||||
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
|
||||
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
|
||||
if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
|
||||
outbox->size -= item->len;
|
||||
free(item->buffer);
|
||||
free(item);
|
||||
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox));
|
||||
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%"PRIu64, msg_id, msg_type, outbox_get_size(outbox));
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
}
|
||||
return ESP_FAIL;
|
||||
}
|
||||
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
|
||||
{
|
||||
outbox_item_handle_t item, tmp;
|
||||
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
|
||||
if (item->msg_id == msg_id) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
free(item->buffer);
|
||||
free(item);
|
||||
}
|
||||
|
||||
}
|
||||
return ESP_OK;
|
||||
}
|
||||
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
|
||||
{
|
||||
outbox_item_handle_t item = outbox_get(outbox, msg_id);
|
||||
@ -162,27 +161,15 @@ esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
|
||||
{
|
||||
outbox_item_handle_t item, tmp;
|
||||
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
|
||||
if (item->msg_type == msg_type) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
free(item->buffer);
|
||||
free(item);
|
||||
}
|
||||
|
||||
}
|
||||
return ESP_OK;
|
||||
}
|
||||
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
|
||||
{
|
||||
int msg_id = -1;
|
||||
outbox_item_handle_t item;
|
||||
STAILQ_FOREACH(item, outbox, next) {
|
||||
STAILQ_FOREACH(item, outbox->list, next) {
|
||||
if (current_tick - item->tick > timeout) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
|
||||
free(item->buffer);
|
||||
outbox->size -= item->len;
|
||||
msg_id = item->msg_id;
|
||||
free(item);
|
||||
return msg_id;
|
||||
@ -196,10 +183,11 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
|
||||
{
|
||||
int deleted_items = 0;
|
||||
outbox_item_handle_t item, tmp;
|
||||
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
|
||||
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
|
||||
if (current_tick - item->tick > timeout) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
|
||||
free(item->buffer);
|
||||
outbox->size -= item->len;
|
||||
free(item);
|
||||
deleted_items ++;
|
||||
}
|
||||
@ -208,23 +196,17 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
|
||||
return deleted_items;
|
||||
}
|
||||
|
||||
int outbox_get_size(outbox_handle_t outbox)
|
||||
uint64_t outbox_get_size(outbox_handle_t outbox)
|
||||
{
|
||||
int siz = 0;
|
||||
outbox_item_handle_t item;
|
||||
STAILQ_FOREACH(item, outbox, next) {
|
||||
// Suppressing "use after free" warning as this could happen only if queue is in inconsistent state
|
||||
// which never happens if STAILQ interface used
|
||||
siz += item->len; // NOLINT(clang-analyzer-unix.Malloc)
|
||||
}
|
||||
return siz;
|
||||
return outbox->size;
|
||||
}
|
||||
|
||||
void outbox_delete_all_items(outbox_handle_t outbox)
|
||||
{
|
||||
outbox_item_handle_t item, tmp;
|
||||
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
|
||||
STAILQ_REMOVE(outbox, item, outbox_item, next);
|
||||
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
|
||||
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
|
||||
outbox->size -= item->len;
|
||||
free(item->buffer);
|
||||
free(item);
|
||||
}
|
||||
@ -232,6 +214,7 @@ void outbox_delete_all_items(outbox_handle_t outbox)
|
||||
void outbox_destroy(outbox_handle_t outbox)
|
||||
{
|
||||
outbox_delete_all_items(outbox);
|
||||
free(outbox->list);
|
||||
free(outbox);
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#ifdef ESP_PLATFORM
|
||||
#include "esp_log.h"
|
||||
#include "esp_mac.h"
|
||||
#include "soc/soc_caps.h"
|
||||
#include "esp_timer.h"
|
||||
#include "esp_random.h"
|
||||
#include <stdlib.h>
|
||||
@ -12,13 +13,25 @@ static const char *TAG = "platform";
|
||||
|
||||
#define MAX_ID_STRING (32)
|
||||
|
||||
#if defined SOC_WIFI_SUPPORTED
|
||||
#define MAC_TYPE ESP_MAC_WIFI_STA
|
||||
#elif defined SOC_EMAC_SUPPORTED
|
||||
#define MAC_TYPE ESP_MAC_ETH
|
||||
#elif defined SOC_IEEE802154_SUPPORTED
|
||||
#define MAC_TYPE ESP_MAC_IEEE802154
|
||||
#endif
|
||||
char *platform_create_id_string(void)
|
||||
{
|
||||
uint8_t mac[6];
|
||||
char *id_string = calloc(1, MAX_ID_STRING);
|
||||
ESP_MEM_CHECK(TAG, id_string, return NULL);
|
||||
esp_read_mac(mac, ESP_MAC_WIFI_STA);
|
||||
#ifndef MAC_TYPE
|
||||
ESP_LOGW(TAG, "Soc doesn't provide MAC, client could be disconnected in case of device with same name in the broker.");
|
||||
sprintf(id_string, "esp_mqtt_client_id");
|
||||
#else
|
||||
uint8_t mac[6];
|
||||
esp_read_mac(mac, MAC_TYPE);
|
||||
sprintf(id_string, "ESP32_%02x%02X%02X", mac[3], mac[4], mac[5]);
|
||||
#endif
|
||||
return id_string;
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@ static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_
|
||||
|
||||
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data);
|
||||
bool msg_dup = mqtt5_get_dup(client->mqtt_state.connection.outbound_message.data);
|
||||
if (msg_dup == false) {
|
||||
client->send_publish_packet_count ++;
|
||||
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
|
||||
@ -35,7 +35,7 @@ void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
|
||||
|
||||
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBCOMP return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
|
||||
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
|
||||
client->event.data = mqtt5_get_pubcomp_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
|
||||
@ -47,7 +47,7 @@ void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
|
||||
|
||||
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
|
||||
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
|
||||
client->event.data = mqtt5_get_puback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
|
||||
@ -59,7 +59,7 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
|
||||
|
||||
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGI(TAG, "MQTT_MSG_TYPE_UNSUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
|
||||
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
|
||||
client->event.data = mqtt5_get_unsuback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
|
||||
@ -71,7 +71,7 @@ void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
|
||||
|
||||
void esp_mqtt5_parse_suback(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGI(TAG, "MQTT_MSG_TYPE_SUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
|
||||
}
|
||||
}
|
||||
@ -81,13 +81,14 @@ esp_err_t esp_mqtt5_parse_connack(esp_mqtt5_client_handle_t client, int *connect
|
||||
size_t len = client->mqtt_state.in_buffer_read_len;
|
||||
client->mqtt_state.in_buffer_read_len = 0;
|
||||
uint8_t ack_flag = 0;
|
||||
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
|
||||
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->mqtt_state.
|
||||
connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to parse CONNACK packet");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (*connect_rsp_code == MQTT_CONNECTION_ACCEPTED) {
|
||||
ESP_LOGD(TAG, "Connected");
|
||||
client->event.session_present = ack_flag & 0x01;
|
||||
client->event.session_present = ack_flag & 0x01;
|
||||
return ESP_OK;
|
||||
}
|
||||
esp_mqtt5_print_error_code(client, *connect_rsp_code);
|
||||
@ -116,7 +117,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
|
||||
*msg_topic = esp_mqtt5_client_get_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, msg_topic_len);
|
||||
if (!*msg_topic) {
|
||||
ESP_LOGE(TAG, "%s: esp_mqtt5_client_get_topic_alias() failed", __func__);
|
||||
return ESP_FAIL;
|
||||
return ESP_FAIL;
|
||||
}
|
||||
} else {
|
||||
if (esp_mqtt5_client_update_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, *msg_topic, *msg_topic_len) != ESP_OK) {
|
||||
@ -139,7 +140,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
|
||||
|
||||
esp_err_t esp_mqtt5_create_default_config(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
client->event.property = calloc(1, sizeof(esp_mqtt5_event_property_t));
|
||||
ESP_MEM_CHECK(TAG, client->event.property, return ESP_FAIL)
|
||||
client->mqtt5_config = calloc(1, sizeof(mqtt5_config_storage_t));
|
||||
@ -172,7 +173,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
|
||||
case MQTT5_UNSUPPORTED_PROTOCOL_VER:
|
||||
ESP_LOGW(TAG, "Unsupported Protocol Version");
|
||||
break;
|
||||
case MQTT5_INVAILD_CLIENT_ID:
|
||||
case MQTT5_INVALID_CLIENT_ID:
|
||||
ESP_LOGW(TAG, "Client Identifier not valid");
|
||||
break;
|
||||
case MQTT5_BAD_USERNAME_OR_PWD:
|
||||
@ -202,10 +203,10 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
|
||||
case MQTT5_SESSION_TAKEN_OVER:
|
||||
ESP_LOGW(TAG, "Session taken over");
|
||||
break;
|
||||
case MQTT5_TOPIC_FILTER_INVAILD:
|
||||
case MQTT5_TOPIC_FILTER_INVALID:
|
||||
ESP_LOGW(TAG, "Topic Filter invalid");
|
||||
break;
|
||||
case MQTT5_TOPIC_NAME_INVAILD:
|
||||
case MQTT5_TOPIC_NAME_INVALID:
|
||||
ESP_LOGW(TAG, "Topic Name invalid");
|
||||
break;
|
||||
case MQTT5_PACKET_IDENTIFIER_IN_USE:
|
||||
@ -217,7 +218,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
|
||||
case MQTT5_RECEIVE_MAXIMUM_EXCEEDED:
|
||||
ESP_LOGW(TAG, "Receive Maximum exceeded");
|
||||
break;
|
||||
case MQTT5_TOPIC_ALIAS_INVAILD:
|
||||
case MQTT5_TOPIC_ALIAS_INVALID:
|
||||
ESP_LOGW(TAG, "Topic Alias invalid");
|
||||
break;
|
||||
case MQTT5_PACKET_TOO_LARGE:
|
||||
@ -232,7 +233,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
|
||||
case MQTT5_ADMINISTRATIVE_ACTION:
|
||||
ESP_LOGW(TAG, "Administrative action");
|
||||
break;
|
||||
case MQTT5_PAYLOAD_FORMAT_INVAILD:
|
||||
case MQTT5_PAYLOAD_FORMAT_INVALID:
|
||||
ESP_LOGW(TAG, "Payload format invalid");
|
||||
break;
|
||||
case MQTT5_RETAIN_NOT_SUPPORT:
|
||||
@ -304,7 +305,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
|
||||
|
||||
void esp_mqtt5_client_destory(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt5_config) {
|
||||
free(client->mqtt5_config->will_property_info.content_type);
|
||||
free(client->mqtt5_config->will_property_info.response_topic);
|
||||
@ -416,7 +417,7 @@ esp_err_t esp_mqtt5_client_set_publish_property(esp_mqtt5_client_handle_t client
|
||||
MQTT_API_LOCK(client);
|
||||
|
||||
/* Check protocol version */
|
||||
if(client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGE(TAG, "MQTT protocol version is not v5");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return ESP_FAIL;
|
||||
@ -445,7 +446,7 @@ esp_err_t esp_mqtt5_client_set_subscribe_property(esp_mqtt5_client_handle_t clie
|
||||
MQTT_API_LOCK(client);
|
||||
|
||||
/* Check protocol version */
|
||||
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGE(TAG, "MQTT protocol version is not v5");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return ESP_FAIL;
|
||||
@ -482,7 +483,7 @@ esp_err_t esp_mqtt5_client_set_unsubscribe_property(esp_mqtt5_client_handle_t cl
|
||||
MQTT_API_LOCK(client);
|
||||
|
||||
/* Check protocol version */
|
||||
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGE(TAG, "MQTT protocol version is not v5");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return ESP_FAIL;
|
||||
@ -513,7 +514,7 @@ esp_err_t esp_mqtt5_client_set_disconnect_property(esp_mqtt5_client_handle_t cli
|
||||
MQTT_API_LOCK(client);
|
||||
|
||||
/* Check protocol version */
|
||||
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGE(TAG, "MQTT protocol version is not v5");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return ESP_FAIL;
|
||||
@ -556,7 +557,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
|
||||
MQTT_API_LOCK(client);
|
||||
|
||||
/* Check protocol version */
|
||||
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
|
||||
ESP_LOGE(TAG, "MQTT protocol version is not v5");
|
||||
MQTT_API_UNLOCK(client);
|
||||
return ESP_FAIL;
|
||||
@ -572,7 +573,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
|
||||
return ESP_FAIL;
|
||||
} else {
|
||||
client->mqtt5_config->connect_property_info.maximum_packet_size = connect_property->maximum_packet_size;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
client->mqtt5_config->connect_property_info.maximum_packet_size = client->mqtt_state.in_buffer_length;
|
||||
}
|
||||
@ -757,4 +758,4 @@ void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_pro
|
||||
}
|
||||
}
|
||||
free(user_property);
|
||||
}
|
||||
}
|
||||
|
634
mqtt_client.c
634
mqtt_client.c
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user