Compare commits

...

67 Commits

Author SHA1 Message Date
b43d93c82d fix: Fix host test for github ci. 2024-04-17 14:35:34 +02:00
aa6f889fb4 Merge branch 'fix/error_codes' into 'master'
fix: regard reason codes greater than 0x80 as failures.

See merge request espressif/esp-mqtt!205
2024-04-05 17:09:50 +08:00
1edd167dcb Merge branch 'fix/calloc_failure' into 'master'
PR: Return on allocation failure

See merge request espressif/esp-mqtt!204
2024-04-03 15:29:51 +08:00
c06f1540fe set last_retransmit to now when first connected
when published before connected, the msg will be always retransmitted immediately even it is  send right now
2024-04-02 10:48:45 +02:00
37478a9c00 add return to faile_message, avoid segment fault
should not use response_topic when calloc failed
2024-04-02 10:48:45 +02:00
fd21b27a83 Merge branch 'fix/unused_variable_warning' into 'master'
Minor warning of unused variable

See merge request espressif/esp-mqtt!203
2024-04-02 14:41:47 +08:00
e7b9aa5e6a fix: regard reason codes greater than 0x80 as failures. 2024-04-01 10:37:42 +08:00
726e5f2fce fix: Minor warning of unused variable
In case of hardcoded id mac array was unused.
2024-03-28 14:16:34 +01:00
14f5fa079f Merge branch 'fix/cliend_id' into 'master'
Cover the case for SOC without MAC address

See merge request espressif/esp-mqtt!202
2024-03-28 20:37:54 +08:00
5e3abd4b8c fix: Cover the case for SOC without MAC address
We need to cover for the case where the available MAC isn't defined in
soc caps. E.g. A new target is being introduced but the support isn't
complete yet.
2024-03-28 12:40:07 +01:00
37cb056c5f Merge branch 'feature/atomic_state' into 'master'
Make state and size atomic

See merge request espressif/esp-mqtt!199
2024-03-22 15:26:15 +08:00
74481cbbf4 Merge branch 'fix/log_level_on_timeout' into 'master'
fix: Adjust the log level on few messages to avoid cluthering the logs

See merge request espressif/esp-mqtt!201
2024-03-22 15:05:49 +08:00
c33a0c8e44 Merge branch 'fix/detect_mac_type' into 'master'
fix: Make automatic client_id soc dependent

See merge request espressif/esp-mqtt!200
2024-03-22 15:05:21 +08:00
5c17fc4a1a fix: Adjust the log level on few messages to avoid cluthering the logs 2024-03-21 14:11:57 +01:00
657a2aea77 fix: Make automatic client_id soc dependent
When creating the client_id for user, the library uses the device MAC.
For some of our devices WIFI isn't available and the library needs to select
a different MAC to use.
2024-03-14 07:31:37 +01:00
891380bdf5 feat: Make state and size atomic
This makes the mqtt client state atomic to reduce the scope of locking in
some parts of the code.
2024-03-12 08:49:36 +01:00
fa88da5282 Merge branch 'lifetime_clarification' into 'master'
Clarify data that users need to take care of lifetime.

See merge request espressif/esp-mqtt!197
2024-01-26 19:10:50 +08:00
5a35782272 Merge branch 'fix/subscribe_macro_type' into 'master'
Update mqtt_client.h

See merge request espressif/esp-mqtt!198
2024-01-26 18:54:42 +08:00
acdb66d5c6 add const char * to esp_mqtt_client_subscribe() generic macros 2024-01-25 15:35:32 +07:00
d3e3c4e7ad Merge branch 'bugfix/consolidate_err_messages' into 'master'
client: Report failure on timeout in mid-message timeout (GitHub PR)

See merge request espressif/esp-mqtt!165
2024-01-24 22:23:04 +08:00
371f594cce docs: Clarify data that users need to take care of lifetime.
- Adds to field that the mqtt client doesn't copy a note about it.
2024-01-24 09:38:37 +01:00
ddde502782 client: Report failure on timeout in mid-message timeout
Fixes error processing on network reads:

1) Treat EOF as an error, since the connection is closed (FIN) from the
server side. If we didn't we would try to read (in the next iteration)
from the same socket that has been already closed and get an error
ENOTCONN.
Before the fix:
D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF
E (13800) transport_base: tcp_read error, errno=Socket is not connected
E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128
D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR
E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008
E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80
I (13840) esp_mqtt_demo: Last errno string (Socket is not connected)
E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1
D (13860) mqtt_client: Reconnect after 10000 ms
D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
After the fix:
E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF
E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128
D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR
E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008
I (12450) esp_mqtt_demo: Last errno string (Success)
E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1
D (12470) mqtt_client: Reconnect after 10000 ms
D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED

2) Treat timeouts in the middle of MQTT message reading as errors (if
timeouted for the second time and didn't read a byte)
Before the fix:
D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2
D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2)
D (19190) mqtt_client: mqtt_message_receive: read_len=0
D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready!
E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0
E (19210) mqtt_client: MQTT connect failed
D (19220) mqtt_client: Reconnect after 10000 ms
D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
After the fix:
D (19190) mqtt_client: mqtt_message_receive: read_len=0
E (19190) mqtt_client: Network timeout while reading MQTT message
E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119
D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR
I (19220) esp_mqtt_demo: Last errno string (Success)
E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1
E (19240) mqtt_client: MQTT connect failed
D (19240) mqtt_client: Reconnect after 10000 ms
D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
(Note that the above log is from mid-message timeout of CONNECT message,
which was hadled before the fix. If the mid-message timeout ocurs with
for example SUBACK, the current version would repeatably resend
susbscribe message)

Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 15:40:44 +01:00
7894dd0ace Merge branch 'fix/buffer_creation_in_set_config' into 'master'
fix: Move buffer initialization to set config

See merge request espressif/esp-mqtt!194
2023-10-25 16:16:35 +08:00
ea0df31e00 fix: Move buffer initialization to set config
When calling set config message buffers were not affected because the
creation was handled on init.

Closes https://github.com/espressif/esp-mqtt/issues/267
2023-10-25 07:47:05 +00:00
abd8b6cadc Merge branch 'fix/qos0publishdispatch' into 'master'
Fix check for message creation when processing publish

See merge request espressif/esp-mqtt!195
2023-10-10 16:25:20 +08:00
67800569de fix: Deliver publish verifies if message was created only for QoS >0
The previous version depends on a sucessful message being created prior
to that check. The check only makes sense for the case where puback or
pubrec messages should be created.
2023-10-10 06:47:07 +00:00
e6afdb4025 Merge branch 'fix/check_asprintf_return_value' into 'master'
fix: using return value of asprintf now

See merge request espressif/esp-mqtt!192
2023-09-14 19:46:26 +08:00
c0b40b1293 fix: using return value of asprintf now 2023-09-14 19:46:26 +08:00
dc93367bd5 Merge branch 'fix/outbox_memory_destination' into 'master'
fix: Uses caps allocation for data buffer instead of item struct

See merge request espressif/esp-mqtt!193
2023-09-14 13:47:33 +08:00
00ee059bf8 fix: Uses caps allocation for data buffer instead of item struct
Once introduced the memory destination for outbox was incorrectly
allocating the outbox data structuture instead of data buffer to the
selected memory.
2023-09-13 08:04:38 +02:00
05b347643f Merge branch 'bugfix/mock_tests_include_idf_additions' into 'master'
Fix: Mock test should include idf_additions.h

See merge request espressif/esp-mqtt!191
2023-09-04 20:01:22 +08:00
f35aaa1577 fix: Mock tests include idf_additions.h
A number of IDF specific FreeRTOS API additions will be moved to
`#include "freertos/idf_additions.h". Mock tests need to include these to
properly use them as mock functions.
2023-09-04 15:05:03 +08:00
4b28192d00 Merge branch 'docs/outbox_doxygen' into 'master'
Adds missing documentation to outbox configuration.

See merge request espressif/esp-mqtt!190
2023-08-31 22:22:04 +08:00
c355e0b5ae docs: Adds missing documentation to outbox configuration. 2023-08-31 22:22:04 +08:00
301bd9e028 Merge branch 'fix/big_messages' into 'master'
fix: Added missing update to message data

See merge request espressif/esp-mqtt!189
2023-08-25 15:38:48 +08:00
cc41d1b852 fix: Added missing update to message data
When handling big messages the refactoring introduced a regression when
handling big messages, messages larger than the message buffer.
2023-08-24 09:52:00 +02:00
1ca73479cb Merge branch 'mqtt5_fixing_typo' into 'master'
PR: fixing typos in `mqtt5_error_reason_code`

See merge request espressif/esp-mqtt!188
2023-08-18 14:43:10 +08:00
90b4a4538e feat: Add enum definition with typo to keep backwards compatibility
Added initial enum definitions with deprecated attribute.
Should be removed with a new version that can introduce a breaking
change.
2023-08-17 14:58:49 +02:00
dc775bb52e fixing typos in mqtt5_error_reason_code 2023-07-24 23:23:32 +08:00
fe32d8f224 Merge branch 'docs/clarify_keepalive_timeout' into 'master'
docs: Clarify keepalive timeout

See merge request espressif/esp-mqtt!186
2023-07-11 05:34:46 +08:00
cb1e6cf218 docs: Clarify keepalive timeout
Adds information on the behavior of the PINGREQ message timeout and some
reasoning behind the choice.
2023-07-11 05:34:46 +08:00
cd81773bd1 Merge branch 'fix/format_log_messages' into 'master'
fix: LOG format strings

See merge request espressif/esp-mqtt!187
2023-07-10 17:28:55 +08:00
395aa141c8 Merge branch 'fix/stop_only_if_started' into 'master'
Fix: Stop client only if it's running.

See merge request espressif/esp-mqtt!183
2023-07-10 15:55:33 +08:00
8d98103013 Merge branch 'fix/account_for_failure_in_make_publish' into 'master'
fix: Error on publish message creation was ignored.

See merge request espressif/esp-mqtt!185
2023-06-30 20:42:28 +08:00
a3b04f2d0a fix: LOG format strings
Fix log format strings and remove no-format warning configuration.
2023-06-30 10:17:32 +02:00
585e3ba2e0 fix: Error on publish message creation was ignored.
In the case of make_publish failure the client would just continue and
the error was ignored and not propagated to caller.
2023-06-27 15:13:54 +02:00
36eec6f625 Fix: Stop client only if it's running.
Check for client run instead of lock to call esp_mqtt_client_stop
when destroying the client.
2023-06-21 09:39:42 +02:00
effd1e6705 Merge branch 'bugfix/connection_buffer_allocation' into 'master'
Fix: Allocation for connection buffer was incorrectly done.

See merge request espressif/esp-mqtt!182
2023-06-21 14:57:11 +08:00
6c849c62ef Fix: Allocation for connection buffer was incorrectly done.
By mistake calloc parameters were incorrect.
2023-06-21 08:24:46 +02:00
ee3ea29d52 Merge branch 'fix/host_test' into 'master'
Adds mqtt host tests to Ci

See merge request espressif/esp-mqtt!181
2023-06-20 13:27:02 +08:00
4050df4caf Adds mqtt host tests to Ci
- New job to run host tests
- Fix leak in case of usage of interface name
- Fix host tests to expect a call to `transport_destroy` and add an
  extra case
2023-06-19 10:34:18 +02:00
aee82c7ba8 Merge branch 'bugfix/outbox_init_failure' into 'master'
Fix: Outbox was leaked in case of initialization failure

See merge request espressif/esp-mqtt!180
2023-06-14 21:46:04 +08:00
5896e259ad Merge branch 'feat/bind_interface' into 'master'
feat: Add option to bind interface of use

See merge request espressif/esp-mqtt!179
2023-06-14 21:43:34 +08:00
363fbf7dab feat: Add option to bind interface of use
Enable user to set which interface should be used for client network,
allowing client to be binded to the interface selected by user forcing
it to go through the selected interface.

Closes https://github.com/espressif/esp-mqtt/issues/253
2023-06-14 14:48:44 +02:00
5d491a45ce Fix: Outbox was leaked in case of initialization failure
If the the list allocation fail, outbox must be freed.
2023-06-14 13:36:42 +02:00
63cfec799c Merge branch 'feature/configurable_max_outbox' into 'master'
Add outbox size control feature

See merge request espressif/esp-mqtt!141
2023-06-14 01:02:34 +08:00
372ab7b374 feat: Introduces outbox limit
A memory limit for the outbox can be configured.
User will not be able to publish or enqueue if the new message goes
beyond the configured limit.
2023-06-13 15:59:55 +02:00
21a5491d53 Removes unused outbox functions. 2023-06-13 11:56:11 +02:00
122875bf8a refactor: Group access to output buffer in mqtt_connection_t
- Moves mqtt_connect_info to mqtt_connection_t.
  - Removes outbound_message in favor of accessing it trough connection.
2023-06-13 11:56:05 +02:00
ed628098a1 Merge branch 'feature/custom_transport' into 'master'
Add custom transport configuration

See merge request espressif/esp-mqtt!169
2023-06-13 14:30:24 +08:00
6438676b66 Merge branch 'fix/removed_unused_handler_field' into 'master'
Removes leftover calls to event_handler

See merge request espressif/esp-mqtt!178
2023-06-13 14:29:49 +08:00
a492935951 Removes leftover calls to event_handler
The possibility to add a callback as custom handler was removed from
the client in favor of esp_event. These removes the older alternative
that wasn't possible to use.
2023-06-09 10:44:50 +02:00
a89af4bf8d Merge branch 'cfg-common-name' into 'master'
PR: Added support to set server common name.

See merge request espressif/esp-mqtt!173
2023-06-08 15:08:22 +08:00
6195762d28 Added support to set server common name. 2023-06-08 08:46:41 +02:00
a5c1b441dc feat: Add custom transport configuration
Today there is no way to add a new transport without applying
modifications to the transport list. This impose limitations on the
client usage. Adding the custom configuration we enable user defined
transports.
2023-06-08 06:38:53 +00:00
ffd7d4df6c Fix: Prevent sync verififcation from running on master
Removes the verification in case the pipeline is running on master.
2023-06-08 08:15:38 +02:00
4c2ed1676f CI: Add check for Gitlab/Github remotes in sync
To avoid merging on Gitlab with Github remote out of sync while we are
moving to Github development.
2023-06-08 08:15:38 +02:00
19 changed files with 860 additions and 654 deletions

View File

@ -22,7 +22,7 @@ jobs:
- name: Build and Test
shell: bash
run: |
apt-get update && apt-get install -y gcc-8 g++-8 python3-pip rsync
apt-get update && apt-get install -y gcc g++ python3-pip rsync
${IDF_PATH}/install.sh
. ${IDF_PATH}/export.sh
echo "IDF_PATH=${IDF_PATH}" >> $GITHUB_ENV

View File

@ -1,5 +1,6 @@
stages:
- build
- test
- deploy
@ -48,6 +49,16 @@ build_idf_latest:
extends: .build_template
image: espressif/idf:latest
build_and_host_test:
stage: build
image: espressif/idf:latest
script:
# Replace the IDF's default esp-mqtt with this version
- rm -rf $IDF_PATH/components/mqtt/esp-mqtt && cp -r $MQTT_PATH $IDF_PATH/components/mqtt/
- cd $IDF_PATH/components/mqtt/esp-mqtt/host_test
- idf.py build
- build/host_mqtt_client_test.elf
build_and_test_qemu:
stage: build
image: ${CI_DOCKER_REGISTRY}/qemu-v5.1:1-20220802
@ -80,6 +91,17 @@ build_and_test_qemu:
- export MQTT_PUBLISH_MSG_len_3=20 MQTT_PUBLISH_MSG_repeat_3=20
- python Runner.py $TEST_PATH -c $MQTT_PATH/ci/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
check_remotes_sync:
stage: test
except:
- master
- idf
script:
- *add_gh_key_remote
- git fetch --depth=1 origin master
- git fetch --depth=1 github master
- test "$(git rev-parse origin/master)" == "$(git rev-parse github/master)"
push_master_to_github:
stage: deploy
image: ${CI_DOCKER_REGISTRY}/esp32-ci-env

View File

@ -12,4 +12,3 @@ idf_component_register(SRCS "${srcs}"
PRIV_REQUIRES esp_timer http_parser esp_hw_support heap
KCONFIG ${CMAKE_CURRENT_LIST_DIR}/Kconfig
)
target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format")

View File

@ -11,7 +11,6 @@ list(APPEND EXTRA_COMPONENT_DIRS
"$ENV{IDF_PATH}/tools/mocks/lwip/"
"$ENV{IDF_PATH}/tools/mocks/esp-tls/"
"$ENV{IDF_PATH}/tools/mocks/http_parser/"
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/"
)
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/")
project(host_mqtt_client_test)

View File

@ -2,10 +2,18 @@ idf_component_register(SRCS "test_mqtt_client.cpp"
INCLUDE_DIRS "$ENV{IDF_PATH}/tools/catch"
REQUIRES cmock mqtt esp_timer esp_hw_support http_parser log)
target_compile_options(${COMPONENT_LIB} PUBLIC -fsanitize=address -fconcepts)
target_link_options(${COMPONENT_LIB} PUBLIC -fsanitize=address)
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
target_compile_definitions(${mqtt} PRIVATE SOC_WIFI_SUPPORTED=1)
target_compile_options(${mqtt} PUBLIC -fsanitize=address -fconcepts)
target_link_options(${mqtt} PUBLIC -fsanitize=address)
if(CONFIG_GCOV_ENABLED)
target_compile_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
target_link_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
target_compile_options(${mqtt} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
target_link_options(${mqtt} PUBLIC --coverage -fprofile-arcs -ftest-coverage)

View File

@ -3,9 +3,16 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <memory>
#include <net/if.h>
#include <random>
#include <string_view>
#include <type_traits>
#include "esp_transport.h"
#define CATCH_CONFIG_MAIN // This tells the catch header to generate a main
#include "catch.hpp"
#include "mqtt_client.h"
extern "C" {
#include "Mockesp_event.h"
#include "Mockesp_mac.h"
@ -17,6 +24,10 @@ extern "C" {
#include "Mockhttp_parser.h"
#include "Mockqueue.h"
#include "Mocktask.h"
#if __has_include ("Mockidf_additions.h")
/* Some functions were moved from "task.h" to "idf_additions.h" */
#include "Mockidf_additions.h"
#endif
#include "Mockesp_timer.h"
/*
@ -29,98 +40,130 @@ extern "C" {
}
}
#include "mqtt_client.h"
struct ClientInitializedFixture {
esp_mqtt_client_handle_t client;
ClientInitializedFixture()
{
[[maybe_unused]] auto protect = TEST_PROTECT();
int mtx;
int transport_list;
int transport;
int event_group;
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
esp_timer_get_time_IgnoreAndReturn(0);
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
xQueueCreateMutex_ExpectAnyArgsAndReturn(
reinterpret_cast<QueueHandle_t>(&mtx));
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
http_parser_parse_url_IgnoreAndReturn(0);
http_parser_url_init_ExpectAnyArgs();
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
esp_read_mac_IgnoreAndReturn(ESP_OK);
esp_read_mac_ReturnThruPtr_mac(mac);
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
vEventGroupDelete_Ignore();
vQueueDelete_Ignore();
esp_mqtt_client_config_t config{};
client = esp_mqtt_client_init(&config);
}
~ClientInitializedFixture()
{
esp_mqtt_client_destroy(client);
}
};
TEST_CASE_METHOD(ClientInitializedFixture, "Client set uri")
auto random_string(std::size_t n)
{
struct http_parser_url ret_uri = {
.field_set = 1,
.port = 0,
.field_data = { { 0, 1} }
};
SECTION("User set a correct URI") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client, " ");
REQUIRE(res == ESP_OK);
}
SECTION("Incorrect URI from user") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client, " ");
REQUIRE(res == ESP_FAIL);
}
static constexpr std::string_view char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ123456790";
std::string str;
std::sample(char_set.begin(), char_set.end(), std::back_inserter(str), n,
std::mt19937 {std::random_device{}()});
return str;
}
TEST_CASE_METHOD(ClientInitializedFixture, "Client Start")
using unique_mqtt_client = std::unique_ptr < std::remove_pointer_t<esp_mqtt_client_handle_t>, decltype([](esp_mqtt_client_handle_t client)
{
SECTION("Successful start") {
esp_mqtt_client_destroy(client);
}) >;
SCENARIO("MQTT Client Operation")
{
// Set expectations for the mocked calls.
int mtx = 0;
int transport_list = 0;
int transport = 0;
int event_group = 0;
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
esp_timer_get_time_IgnoreAndReturn(0);
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
xQueueCreateMutex_ExpectAnyArgsAndReturn(
reinterpret_cast<QueueHandle_t>(&mtx));
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
http_parser_url_init_Ignore();
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
esp_read_mac_IgnoreAndReturn(ESP_OK);
esp_read_mac_ReturnThruPtr_mac(mac);
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
esp_transport_destroy_IgnoreAndReturn(ESP_OK);
vEventGroupDelete_Ignore();
vQueueDelete_Ignore();
GIVEN("An a minimal config") {
esp_mqtt_client_config_t config{};
config.broker.address.uri = "mqtt://1.1.1.1";
struct http_parser_url ret_uri = {
.field_set = 1 | (1<<1),
.field_set = 1 | (1 << 1),
.port = 0,
.field_data = { { 0, 4 } /*mqtt*/, { 7, 1 } } // at least *scheme* and *host*
};
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdTRUE);
auto res = esp_mqtt_set_config(client, &config);
REQUIRE(res == ESP_OK);
res = esp_mqtt_client_start(client);
REQUIRE(res == ESP_OK);
}
SECTION("Failed on initialization") {
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
auto res = esp_mqtt_client_start(nullptr);
REQUIRE(res == ESP_ERR_INVALID_ARG);
}
SECTION("Client already started") {}
SECTION("Failed to start task") {
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
auto res = esp_mqtt_client_start(client);
REQUIRE(res == ESP_FAIL);
SECTION("Client with minimal config") {
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
REQUIRE(client != nullptr);
SECTION("User will set a new uri") {
struct http_parser_url ret_uri = {
.field_set = 1,
.port = 0,
.field_data = { { 0, 1} }
};
SECTION("User set a correct URI") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client.get(), " ");
REQUIRE(res == ESP_OK);
}
SECTION("Incorrect URI from user") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client.get(), " ");
REQUIRE(res == ESP_FAIL);
}
}
SECTION("User set interface to use"){
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
struct ifreq if_name = {.ifr_ifrn = {"custom"}};
config.network.if_name = &if_name;
SECTION("Client is not started"){
REQUIRE(esp_mqtt_set_config(client.get(), &config)== ESP_OK);
}
}
SECTION("After Start Client Is Cleanly destroyed") {
REQUIRE(esp_mqtt_client_start(client.get()) == ESP_OK);
// Only need to start the client, destroy is called automatically at the end of
// scope
}
}
SECTION("Client with all allocating configuration set") {
auto host = random_string(20);
auto path = random_string(10);
auto username = random_string(10);
auto client_id = random_string(10);
auto password = random_string(10);
auto lw_topic = random_string(10);
auto lw_msg = random_string(10);
config.broker = {.address = {
.hostname = host.data(),
.path = path.data()
}
};
config.credentials = {
.username = username.data(),
.client_id = client_id.data(),
.authentication = {
.password = password.data()
}
};
config.session = {
.last_will {
.topic = lw_topic.data(),
.msg = lw_msg.data()
}
};
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
REQUIRE(client != nullptr);
}
}
}

View File

@ -19,41 +19,46 @@ typedef struct esp_mqtt_client *esp_mqtt5_client_handle_t;
* MQTT5 protocol error reason code, more details refer to MQTT5 protocol document section 2.4
*/
enum mqtt5_error_reason_code {
MQTT5_UNSPECIFIED_ERROR = 0x80,
MQTT5_MALFORMED_PACKET = 0x81,
MQTT5_PROTOCOL_ERROR = 0x82,
MQTT5_IMPLEMENT_SPECIFIC_ERROR = 0x83,
MQTT5_UNSUPPORTED_PROTOCOL_VER = 0x84,
MQTT5_INVAILD_CLIENT_ID = 0x85,
MQTT5_BAD_USERNAME_OR_PWD = 0x86,
MQTT5_NOT_AUTHORIZED = 0x87,
MQTT5_SERVER_UNAVAILABLE = 0x88,
MQTT5_SERVER_BUSY = 0x89,
MQTT5_BANNED = 0x8A,
MQTT5_SERVER_SHUTTING_DOWN = 0x8B,
MQTT5_BAD_AUTH_METHOD = 0x8C,
MQTT5_KEEP_ALIVE_TIMEOUT = 0x8D,
MQTT5_SESSION_TAKEN_OVER = 0x8E,
MQTT5_TOPIC_FILTER_INVAILD = 0x8F,
MQTT5_TOPIC_NAME_INVAILD = 0x90,
MQTT5_PACKET_IDENTIFIER_IN_USE = 0x91,
MQTT5_PACKET_IDENTIFIER_NOT_FOUND = 0x92,
MQTT5_RECEIVE_MAXIMUM_EXCEEDED = 0x93,
MQTT5_TOPIC_ALIAS_INVAILD = 0x94,
MQTT5_PACKET_TOO_LARGE = 0x95,
MQTT5_MESSAGE_RATE_TOO_HIGH = 0x96,
MQTT5_QUOTA_EXCEEDED = 0x97,
MQTT5_ADMINISTRATIVE_ACTION = 0x98,
MQTT5_PAYLOAD_FORMAT_INVAILD = 0x99,
MQTT5_RETAIN_NOT_SUPPORT = 0x9A,
MQTT5_QOS_NOT_SUPPORT = 0x9B,
MQTT5_USE_ANOTHER_SERVER = 0x9C,
MQTT5_SERVER_MOVED = 0x9D,
MQTT5_SHARED_SUBSCR_NOT_SUPPORTED = 0x9E,
MQTT5_CONNECTION_RATE_EXCEEDED = 0x9F,
MQTT5_MAXIMUM_CONNECT_TIME = 0xA0,
MQTT5_SUBSCRIBE_IDENTIFIER_NOT_SUPPORT = 0xA1,
MQTT5_WILDCARD_SUBSCRIBE_NOT_SUPPORT = 0xA2,
MQTT5_UNSPECIFIED_ERROR = 0x80,
MQTT5_MALFORMED_PACKET = 0x81,
MQTT5_PROTOCOL_ERROR = 0x82,
MQTT5_IMPLEMENT_SPECIFIC_ERROR = 0x83,
MQTT5_UNSUPPORTED_PROTOCOL_VER = 0x84,
MQTT5_INVAILD_CLIENT_ID __attribute__((deprecated)) = 0x85,
MQTT5_INVALID_CLIENT_ID = 0x85,
MQTT5_BAD_USERNAME_OR_PWD = 0x86,
MQTT5_NOT_AUTHORIZED = 0x87,
MQTT5_SERVER_UNAVAILABLE = 0x88,
MQTT5_SERVER_BUSY = 0x89,
MQTT5_BANNED = 0x8A,
MQTT5_SERVER_SHUTTING_DOWN = 0x8B,
MQTT5_BAD_AUTH_METHOD = 0x8C,
MQTT5_KEEP_ALIVE_TIMEOUT = 0x8D,
MQTT5_SESSION_TAKEN_OVER = 0x8E,
MQTT5_TOPIC_FILTER_INVAILD __attribute__((deprecated)) = 0x8F,
MQTT5_TOPIC_FILTER_INVALID = 0x8F,
MQTT5_TOPIC_NAME_INVAILD __attribute__((deprecated)) = 0x90,
MQTT5_TOPIC_NAME_INVALID = 0x90,
MQTT5_PACKET_IDENTIFIER_IN_USE = 0x91,
MQTT5_PACKET_IDENTIFIER_NOT_FOUND = 0x92,
MQTT5_RECEIVE_MAXIMUM_EXCEEDED = 0x93,
MQTT5_TOPIC_ALIAS_INVAILD __attribute__((deprecated)) = 0x94,
MQTT5_TOPIC_ALIAS_INVALID = 0x94,
MQTT5_PACKET_TOO_LARGE = 0x95,
MQTT5_MESSAGE_RATE_TOO_HIGH = 0x96,
MQTT5_QUOTA_EXCEEDED = 0x97,
MQTT5_ADMINISTRATIVE_ACTION = 0x98,
MQTT5_PAYLOAD_FORMAT_INVAILD __attribute__((deprecated)) = 0x99,
MQTT5_PAYLOAD_FORMAT_INVALID = 0x99,
MQTT5_RETAIN_NOT_SUPPORT = 0x9A,
MQTT5_QOS_NOT_SUPPORT = 0x9B,
MQTT5_USE_ANOTHER_SERVER = 0x9C,
MQTT5_SERVER_MOVED = 0x9D,
MQTT5_SHARED_SUBSCR_NOT_SUPPORTED = 0x9E,
MQTT5_CONNECTION_RATE_EXCEEDED = 0x9F,
MQTT5_MAXIMUM_CONNECT_TIME = 0xA0,
MQTT5_SUBSCRIBE_IDENTIFIER_NOT_SUPPORT = 0xA1,
MQTT5_WILDCARD_SUBSCRIBE_NOT_SUPPORT = 0xA2,
};
/**
@ -65,7 +70,7 @@ typedef struct mqtt5_user_property_list_t *mqtt5_user_property_handle_t;
* MQTT5 protocol connect properties and will properties configuration, more details refer to MQTT5 protocol document section 3.1.2.11 and 3.3.2.3
*/
typedef struct {
uint32_t session_expiry_interval; /*!< The interval time of session expiry */
uint32_t session_expiry_interval; /*!< The interval time of session expiry */
uint32_t maximum_packet_size; /*!< The maximum packet size that we can receive */
uint16_t receive_maximum; /*!< The maximum pakcket count that we process concurrently */
uint16_t topic_alias_maximum; /*!< The maximum topic alias that we support */
@ -270,7 +275,7 @@ uint8_t esp_mqtt5_client_get_user_property_count(mqtt5_user_property_handle_t us
* @brief Free the user property list
*
* @param user_property user_property handle
*
*
* This API will free the memory in user property list and free user_property itself
*/
void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_property);

View File

@ -12,6 +12,7 @@
#include <string.h>
#include "esp_err.h"
#include "esp_event.h"
#include "esp_transport.h"
#ifdef CONFIG_MQTT_PROTOCOL_5
#include "mqtt5_client.h"
#endif
@ -213,8 +214,6 @@ typedef struct esp_mqtt_event_t {
typedef esp_mqtt_event_t *esp_mqtt_event_handle_t;
typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
/**
* *MQTT* client configuration structure
*
@ -250,16 +249,21 @@ typedef struct esp_mqtt_client_config_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
documentation for details. */
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
the usage of certificate bundles. */
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
the usage of certificate bundles. Client only attach the bundle, the clean up must be done by the user. */
const char *certificate; /*!< Certificate data, default is NULL. It's not copied nor freed by the client, user needs to clean up.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
authentication (as alternative to certificate verification).
PSK is enabled only if there are no other ways to
verify broker.*/
verify broker. It's not copied nor freed by the client, user needs to clean up.*/
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
security of TLS and makes the *MQTT* client susceptible to MITM attacks */
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN.*/
const char *common_name; /*!< Pointer to the string containing server certificate common name.
If non-NULL, server certificate CN must match this name,
If NULL, server certificate CN must match hostname.
This is ignored if skip_cert_common_name_check=true.
It's not copied nor freed by the client, user needs to clean up.*/
} verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */
/**
@ -283,17 +287,17 @@ typedef struct esp_mqtt_client_config_t {
struct authentication_t {
const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
authentication is not needed. Must be provided with `key`.*/
authentication is not needed. Must be provided with `key`. It's not copied nor freed by the client, user needs to clean up.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
is not needed. If it is not NULL, also `certificate` has to be provided.*/
is not needed. If it is not NULL, also `certificate` has to be provided. It's not copied nor freed by the client, user needs to clean up.*/
size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
`key_password_len` must be correctly set. */
`key_password_len` must be correctly set.*/
int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
available in some Espressif devices. */
available in some Espressif devices. It's not copied nor freed by the client, user needs to clean up.*/
} authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */
/**
@ -311,7 +315,10 @@ typedef struct esp_mqtt_client_config_t {
int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds
When configuring this value, keep in mind that the client attempts
to communicate with the broker at half the interval that is actually set.
This conservative approach allows for more attempts before the broker's timeout occurs */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
by default. Note: setting the config value `keepalive` to `0` doesn't disable
keepalive feature, but uses a default keepalive period */
@ -329,6 +336,8 @@ typedef struct esp_mqtt_client_config_t {
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
`disable_auto_reconnect=true` to disable */
esp_transport_handle_t transport; /*!< Custom transport handle to use. Warning: The transport should be valid during the client lifetime and is destroyed when esp_mqtt_client_destroy is called. */
struct ifreq * if_name; /*!< The name of interface for data to go through. Use the default interface without setting */
} network; /*!< Network configuration */
/**
* Client task configuration
@ -347,6 +356,13 @@ typedef struct esp_mqtt_client_config_t {
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
``buffer_size`` */
} buffer; /*!< Buffer size configuration.*/
/**
* Client outbox configuration options.
*/
struct outbox_config_t {
uint64_t limit; /*!< Size limit for the outbox in bytes.*/
} outbox; /*!< Outbox configuration. */
} esp_mqtt_client_config_t;
/**
@ -425,7 +441,6 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
*/
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
#ifdef __cplusplus
#define esp_mqtt_client_subscribe esp_mqtt_client_subscribe_single
@ -444,9 +459,11 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
char *: esp_mqtt_client_subscribe_single, \
const char *: esp_mqtt_client_subscribe_single, \
esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \
)(client_handle, topic_type, qos_or_size)
@ -468,6 +485,7 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
const char *topic, int qos);
@ -488,6 +506,7 @@ int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size);
@ -531,7 +550,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client,
* @param retain retain flag
*
* @return message_id of the publish message (for QoS 0 message_id will always
* be zero) on success. -1 on failure.
* be zero) on success. -1 on failure, -2 in case of full outbox.
*/
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
const char *data, int len, int qos, int retain);
@ -556,7 +575,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
* @param store if true, all messages are enqueued; otherwise only QoS 1 and
* QoS 2 are enqueued
*
* @return message_id if queued successfully, -1 otherwise
* @return message_id if queued successfully, -1 on failure, -2 in case of full outbox.
*/
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
const char *data, int len, int qos, int retain,
@ -579,6 +598,9 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client);
* @brief Set configuration structure, typically used when updating the config
* (i.e. on "before_connect" event
*
* Notes:
* - When calling this function make sure to have all the intendend configurations
* set, otherwise default values are set.
* @param client *MQTT* client handle
*
* @param config *MQTT* configuration structure
@ -603,9 +625,9 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client,
* ESP_OK on success
*/
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client,
esp_mqtt_event_id_t event,
esp_event_handler_t event_handler,
void *event_handler_arg);
esp_mqtt_event_id_t event,
esp_event_handler_t event_handler,
void *event_handler_arg);
/**
* @brief Unregisters mqtt event

View File

@ -64,5 +64,11 @@
#define MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
#endif
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
// Features supported in 5.1.0
#define MQTT_SUPPORTED_FEATURE_CRT_CMN_NAME
#endif
#endif /* ESP_IDF_VERSION */
#endif // _MQTT_SUPPORTED_FEATURES_H_

View File

@ -7,6 +7,7 @@
#ifndef _MQTT_CLIENT_PRIV_H_
#define _MQTT_CLIENT_PRIV_H_
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
@ -38,6 +39,14 @@
extern "C" {
#endif
#if CONFIG_NEWLIB_NANO_FORMAT
#define NEWLIB_NANO_COMPAT_FORMAT PRIu32
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) (uint32_t)size_t_var
#else
#define NEWLIB_NANO_COMPAT_FORMAT "zu"
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) size_t_var
#endif
#ifdef MQTT_DISABLE_API_LOCKS
# define MQTT_API_LOCK(c)
# define MQTT_API_UNLOCK(c)
@ -48,20 +57,16 @@ extern "C" {
typedef struct mqtt_state {
uint8_t *in_buffer;
uint8_t *out_buffer;
int in_buffer_length;
int out_buffer_length;
size_t message_length;
size_t in_buffer_read_len;
mqtt_message_t *outbound_message;
mqtt_connection_t mqtt_connection;
mqtt_connection_t connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
} mqtt_state_t;
typedef struct {
mqtt_event_callback_t event_handle;
esp_event_loop_handle_t event_loop_handle;
int task_stack;
int task_prio;
@ -88,9 +93,13 @@ typedef struct {
size_t clientkey_bytes;
const struct psk_key_hint *psk_hint_key;
bool skip_cert_common_name_check;
const char *common_name;
bool use_secure_element;
void *ds_data;
int message_retransmit_timeout;
uint64_t outbox_limit;
esp_transport_handle_t transport;
struct ifreq * if_name;
} mqtt_config_storage_t;
typedef enum {
@ -105,8 +114,7 @@ struct esp_mqtt_client {
esp_transport_handle_t transport;
mqtt_config_storage_t *config;
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
mqtt_client_state_t state;
_Atomic mqtt_client_state_t state;
uint64_t refresh_connection_tick;
int64_t keepalive_tick;
uint64_t reconnect_tick;

View File

@ -100,6 +100,7 @@
#define MQTT_ENABLE_SSL CONFIG_MQTT_TRANSPORT_SSL
#define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET
#define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE
#define MQTT_DEFAULT_RETRANSMIT_TIMEOUT_MS 1000
#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE
#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE

View File

@ -68,16 +68,6 @@ typedef struct mqtt_message {
size_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
} mqtt_message_t;
typedef struct mqtt_connection {
mqtt_message_t message;
#if MQTT_MSG_ID_INCREMENTAL
uint16_t last_message_id; /*!< last used id if incremental message id configured */
#endif
uint8_t *buffer;
size_t buffer_length;
} mqtt_connection_t;
typedef struct mqtt_connect_info {
char *client_id;
char *username;
@ -90,9 +80,18 @@ typedef struct mqtt_connect_info {
int will_retain;
int clean_session;
esp_mqtt_protocol_ver_t protocol_ver;
} mqtt_connect_info_t;
typedef struct mqtt_connection {
mqtt_message_t outbound_message;
#if MQTT_MSG_ID_INCREMENTAL
uint16_t last_message_id; /*!< last used id if incremental message id configured */
#endif
uint8_t *buffer;
size_t buffer_length;
mqtt_connect_info_t information;
} mqtt_connection_t;
static inline int mqtt_get_type(const uint8_t *buffer)
{
@ -123,7 +122,6 @@ static inline int mqtt_get_retain(const uint8_t *buffer)
return (buffer[0] & 0x01);
}
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length);
bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length);
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len);
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length);
@ -132,6 +130,9 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length);
uint16_t mqtt_get_id(uint8_t *buffer, size_t length);
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length);
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size);
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info);
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id);
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id);
@ -143,8 +144,6 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection);
#ifdef __cplusplus
}
#endif

View File

@ -14,7 +14,7 @@ extern "C" {
struct outbox_item;
typedef struct outbox_list_t *outbox_handle_t;
typedef struct outbox_t *outbox_handle_t;
typedef struct outbox_item *outbox_item_handle_t;
typedef struct outbox_message *outbox_message_handle_t;
typedef long long outbox_tick_t;
@ -42,8 +42,6 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
/**
@ -56,7 +54,7 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
pending_state_t outbox_item_get_pending(outbox_item_handle_t item);
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
int outbox_get_size(outbox_handle_t outbox);
uint64_t outbox_get_size(outbox_handle_t outbox);
void outbox_destroy(outbox_handle_t outbox);
void outbox_delete_all_items(outbox_handle_t outbox);

View File

@ -69,8 +69,8 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
generate_variable_len(len, &len_bytes, encoded_lens);
int offset = len_bytes - 1;
connection->message.length += offset;
if (connection->message.length > connection->buffer_length) {
connection->outbound_message.length += offset;
if (connection->outbound_message.length > connection->buffer_length) {
return -1;
}
@ -89,33 +89,33 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
static int append_property(mqtt_connection_t *connection, uint8_t property_type, uint8_t len_occupy, const char *data, size_t data_len)
{
if ((connection->message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
if ((connection->outbound_message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
return -1;
}
size_t origin_message_len = connection->message.length;
size_t origin_message_len = connection->outbound_message.length;
if (property_type) {
connection->buffer[connection->message.length ++] = property_type;
connection->buffer[connection->outbound_message.length ++] = property_type;
}
if (len_occupy == 0) {
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
generate_variable_len(data_len, &len_bytes, encoded_lens);
for (int j = 0; j < len_bytes; j ++) {
connection->buffer[connection->message.length ++] = encoded_lens[j];
connection->buffer[connection->outbound_message.length ++] = encoded_lens[j];
}
} else {
for (int i = 1; i <= len_occupy; i ++) {
connection->buffer[connection->message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
connection->buffer[connection->outbound_message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
}
}
if (data) {
memcpy(connection->buffer + connection->message.length, data, data_len);
connection->message.length += data_len;
memcpy(connection->buffer + connection->outbound_message.length, data, data_len);
connection->outbound_message.length += data_len;
}
return connection->message.length - origin_message_len;
return connection->outbound_message.length - origin_message_len;
}
static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id)
@ -130,36 +130,36 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
#endif
}
if (connection->message.length + 2 > connection->buffer_length) {
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], message_id)
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], message_id)
return message_id;
}
static int init_message(mqtt_connection_t *connection)
{
connection->message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
connection->outbound_message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
return MQTT5_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
int message_length = connection->outbound_message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
}
// Encode MQTT message length
@ -171,10 +171,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT5_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs ++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
@ -182,7 +182,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
connection->buffer[offs ++] = encoded_lens[j];
}
return &connection->message;
return &connection->outbound_message;
}
static esp_err_t mqtt5_msg_set_user_property(mqtt5_user_property_handle_t *user_property, char *key, size_t key_len, char *value, size_t value_len)
@ -339,7 +339,7 @@ char *mqtt5_get_publish_property_payload(uint8_t *buffer, size_t buffer_length,
continue;
case MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL:
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(resp_property->message_expiry_interval, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
ESP_LOGD(TAG, "MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL %d", resp_property->message_expiry_interval);
ESP_LOGD(TAG, "MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL %"PRIu32, resp_property->message_expiry_interval);
continue;
case MQTT5_PROPERTY_TOPIC_ALIAS:
MQTT5_CONVERT_ONE_BYTE_TO_TWO(resp_property->topic_alias, property[property_offset ++], property[property_offset ++])
@ -465,24 +465,24 @@ char *mqtt5_get_puback_data(uint8_t *buffer, size_t *length, mqtt5_user_property
mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info, esp_mqtt5_connection_property_storage_t *property, esp_mqtt5_connection_will_property_storage_t *will_property)
{
init_message(connection);
connection->buffer[connection->message.length ++] = 0; // Variable header length MSB
connection->buffer[connection->outbound_message.length ++] = 0; // Variable header length MSB
/* Defaults to protocol version 5 values */
connection->buffer[connection->message.length ++] = 4; // Variable header length LSB
memcpy(&connection->buffer[connection->message.length], "MQTT", 4); // Protocol name
connection->message.length += 4;
connection->buffer[connection->message.length ++] = 5; // Protocol version
connection->buffer[connection->outbound_message.length ++] = 4; // Variable header length LSB
memcpy(&connection->buffer[connection->outbound_message.length], "MQTT", 4); // Protocol name
connection->outbound_message.length += 4;
connection->buffer[connection->outbound_message.length ++] = 5; // Protocol version
int flags_offset = connection->message.length;
connection->buffer[connection->message.length ++] = 0; // Flags
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], info->keepalive) // Keep-alive
int flags_offset = connection->outbound_message.length;
connection->buffer[connection->outbound_message.length ++] = 0; // Flags
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], info->keepalive) // Keep-alive
if (info->clean_session) {
connection->buffer[flags_offset] |= MQTT5_CONNECT_FLAG_CLEAN_SESSION;
}
//Add properties
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property->session_expiry_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, property->session_expiry_interval), fail_message(connection));
}
@ -508,7 +508,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (info->client_id != NULL && info->client_id[0] != '\0') {
APPEND_CHECK(append_property(connection, 0, 2, info->client_id, strlen(info->client_id)), fail_message(connection));
@ -518,8 +518,8 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
//Add will properties
if (info->will_topic != NULL && info->will_topic[0] != '\0') {
properties_offset = connection->message.length;
connection->message.length ++;
properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (will_property->will_delay_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_WILL_DELAY_INTERVAL, 4, NULL, will_property->will_delay_interval), fail_message(connection));
}
@ -545,7 +545,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(append_property(connection, 0, 2, info->will_topic, strlen(info->will_topic)), fail_message(connection));
APPEND_CHECK(append_property(connection, 0, 2, info->will_message, info->will_length), fail_message(connection));
@ -603,7 +603,7 @@ esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, m
switch (property_id) {
case MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL:
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(connection_property->session_expiry_interval, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
ESP_LOGD(TAG, "MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL %d", connection_property->session_expiry_interval);
ESP_LOGD(TAG, "MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL %"PRIu32, connection_property->session_expiry_interval);
continue;
case MQTT5_PROPERTY_RECEIVE_MAXIMUM:
MQTT5_CONVERT_ONE_BYTE_TO_TWO(resp_property->receive_maximum, property[property_offset ++], property[property_offset ++])
@ -619,7 +619,7 @@ esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, m
continue;
case MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE:
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(resp_property->maximum_packet_size, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
ESP_LOGD(TAG, "MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE %d", resp_property->maximum_packet_size);
ESP_LOGD(TAG, "MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE %"PRIu32, resp_property->maximum_packet_size);
continue;
case MQTT5_PROPERTY_ASSIGNED_CLIENT_IDENTIFIER:
MQTT5_CONVERT_ONE_BYTE_TO_TWO(len, property[property_offset ++], property[property_offset ++])
@ -742,8 +742,8 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
*message_id = 0;
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->payload_format_indicator) {
@ -761,7 +761,7 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
char *response_topic = calloc(1, response_topic_size);
if (!response_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", response_topic_size);
fail_message(connection);
return fail_message(connection);
}
snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info);
if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) {
@ -788,20 +788,20 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_CONTENT_TYPE, 2, property->content_type, strlen(property->content_type)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (connection->message.length + data_length > connection->buffer_length) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
} else {
if (data != NULL) {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
}
connection->message.fragmented_msg_total_length = 0;
connection->outbound_message.fragmented_msg_total_length = 0;
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
@ -858,8 +858,8 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
return fail_message(connection);
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->subscribe_id) {
@ -873,7 +873,7 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
}
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
@ -897,23 +897,23 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
}
if (connection->message.length + 1 > connection->buffer_length) {
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->message.length] = 0;
connection->buffer[connection->outbound_message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
connection->buffer[connection->outbound_message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
connection->buffer[connection->outbound_message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
connection->buffer[connection->outbound_message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3);
connection->message.length ++;
connection->buffer[connection->outbound_message.length] |= (topic_list[topic_number].qos & 3);
connection->outbound_message.length ++;
}
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
@ -921,10 +921,10 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqt
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info)
{
init_message(connection);
int reason_offset = connection->message.length;
connection->buffer[connection->message.length ++] = 0;
int properties_offset = connection->message.length;
connection->message.length ++;
int reason_offset = connection->outbound_message.length;
connection->buffer[connection->outbound_message.length ++] = 0;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (disconnect_property_info) {
if (disconnect_property_info->session_expiry_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, disconnect_property_info->session_expiry_interval), fail_message(connection));
@ -940,7 +940,7 @@ mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_di
connection->buffer[reason_offset] = disconnect_property_info->disconnect_reason;
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
@ -956,8 +956,8 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
return fail_message(connection);
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->user_property) {
mqtt5_user_property_item_t item;
@ -968,7 +968,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
char *shared_topic = calloc(1, shared_topic_size);
@ -996,10 +996,10 @@ mqtt_message_t *mqtt5_msg_puback(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}
@ -1009,10 +1009,10 @@ mqtt_message_t *mqtt5_msg_pubrec(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}
@ -1022,10 +1022,10 @@ mqtt_message_t *mqtt5_msg_pubrel(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}
@ -1035,9 +1035,9 @@ mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t messag
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}

View File

@ -48,14 +48,14 @@ enum mqtt_connect_flag {
static int append_string(mqtt_connection_t *connection, const char *string, int len)
{
if (connection->message.length + len + 2 > connection->buffer_length) {
if (connection->outbound_message.length + len + 2 > connection->buffer_length) {
return -1;
}
connection->buffer[connection->message.length++] = len >> 8;
connection->buffer[connection->message.length++] = len & 0xff;
memcpy(connection->buffer + connection->message.length, string, len);
connection->message.length += len;
connection->buffer[connection->outbound_message.length++] = len >> 8;
connection->buffer[connection->outbound_message.length++] = len & 0xff;
memcpy(connection->buffer + connection->outbound_message.length, string, len);
connection->outbound_message.length += len;
return len + 2;
}
@ -72,38 +72,38 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
#endif
}
if (connection->message.length + 2 > connection->buffer_length) {
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}
connection->buffer[connection->message.length++] = message_id >> 8;
connection->buffer[connection->message.length++] = message_id & 0xff;
connection->buffer[connection->outbound_message.length++] = message_id >> 8;
connection->buffer[connection->outbound_message.length++] = message_id & 0xff;
return message_id;
}
static int init_message(mqtt_connection_t *connection)
static int set_message_header_size(mqtt_connection_t *connection)
{
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
int encoded_length = 0;
uint8_t encoded_lens[4] = {0};
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
}
// Encode MQTT message length
@ -124,10 +124,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
@ -135,14 +135,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
connection->buffer[offs++] = encoded_lens[j];
}
return &connection->message;
}
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length)
{
memset(connection, 0, sizeof(mqtt_connection_t));
connection->buffer = buffer;
connection->buffer_length = buffer_length;
return &connection->outbound_message;
}
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len)
@ -347,7 +340,7 @@ uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
{
init_message(connection);
set_message_header_size(connection);
int header_len;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
@ -356,11 +349,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE;
}
if (connection->message.length + header_len > connection->buffer_length) {
if (connection->outbound_message.length + header_len > connection->buffer_length) {
return fail_message(connection);
}
char *variable_header = (char *)(connection->buffer + connection->message.length);
connection->message.length += header_len;
char *variable_header = (char *)(connection->buffer + connection->outbound_message.length);
connection->outbound_message.length += header_len;
int header_idx = 0;
variable_header[header_idx++] = 0; // Variable header length MSB
@ -445,7 +438,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
@ -468,16 +461,16 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
}
if (data != NULL) {
if (connection->message.length + data_length > connection->buffer_length) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
} else {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
connection->message.fragmented_msg_total_length = 0;
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
connection->outbound_message.fragmented_msg_total_length = 0;
}
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
@ -485,7 +478,7 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -494,7 +487,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -503,7 +496,7 @@ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -512,7 +505,7 @@ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -521,7 +514,7 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
@ -536,11 +529,11 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt
return fail_message(connection);
}
if (connection->message.length + 1 > connection->buffer_length) {
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->message.length] = topic_list[topic_number].qos;
connection->message.length ++;
connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos;
connection->outbound_message.length ++;
}
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
@ -548,7 +541,7 @@ mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
@ -567,19 +560,19 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
@ -622,3 +615,23 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length)
return 0;
}
}
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size)
{
memset(&connection->outbound_message, 0, sizeof(mqtt_message_t));
connection->buffer = (uint8_t *)calloc(buffer_size, sizeof(uint8_t));
if (!connection->buffer) {
return ESP_ERR_NO_MEM;
}
connection->buffer_length = buffer_size;
return ESP_OK;
}
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection)
{
if (connection) {
free(connection->buffer);
}
}

View File

@ -1,4 +1,5 @@
#include "mqtt_outbox.h"
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "mqtt_config.h"
@ -22,18 +23,25 @@ typedef struct outbox_item {
STAILQ_HEAD(outbox_list_t, outbox_item);
struct outbox_t {
_Atomic uint64_t size;
struct outbox_list_t *list;
};
outbox_handle_t outbox_init(void)
{
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t));
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t));
ESP_MEM_CHECK(TAG, outbox, return NULL);
STAILQ_INIT(outbox);
outbox->list = calloc(1, sizeof(struct outbox_list_t));
ESP_MEM_CHECK(TAG, outbox->list, {free(outbox); return NULL;});
outbox->size = 0;
STAILQ_INIT(outbox->list);
return outbox;
}
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
{
outbox_item_handle_t item = heap_caps_calloc(1, sizeof(outbox_item_t), MQTT_OUTBOX_MEMORY);
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;
@ -41,7 +49,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
item->tick = tick;
item->len = message->len + message->remaining_len;
item->pending = QUEUED;
item->buffer = malloc(message->len + message->remaining_len);
item->buffer = heap_caps_malloc(message->len + message->remaining_len, MQTT_OUTBOX_MEMORY);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
return NULL;
@ -50,15 +58,16 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
if (message->remaining_data) {
memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
}
STAILQ_INSERT_TAIL(outbox, item, next);
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
STAILQ_INSERT_TAIL(outbox->list, item, next);
outbox->size += item->len;
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%"PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
return item;
}
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item->msg_id == msg_id) {
return item;
}
@ -69,7 +78,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item->pending == pending) {
if (tick) {
*tick = item->tick;
@ -83,9 +92,10 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item == item_to_delete) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
return ESP_OK;
@ -109,31 +119,20 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox));
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%"PRIu64, msg_id, msg_type, outbox_get_size(outbox));
return ESP_OK;
}
}
return ESP_FAIL;
}
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_id == msg_id) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
}
}
return ESP_OK;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
@ -162,27 +161,15 @@ esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick
return ESP_FAIL;
}
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_type == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
}
}
return ESP_OK;
}
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
int msg_id = -1;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
msg_id = item->msg_id;
free(item);
return msg_id;
@ -196,10 +183,11 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
{
int deleted_items = 0;
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
free(item);
deleted_items ++;
}
@ -208,23 +196,17 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
return deleted_items;
}
int outbox_get_size(outbox_handle_t outbox)
uint64_t outbox_get_size(outbox_handle_t outbox)
{
int siz = 0;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
// Suppressing "use after free" warning as this could happen only if queue is in inconsistent state
// which never happens if STAILQ interface used
siz += item->len; // NOLINT(clang-analyzer-unix.Malloc)
}
return siz;
return outbox->size;
}
void outbox_delete_all_items(outbox_handle_t outbox)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
}
@ -232,6 +214,7 @@ void outbox_delete_all_items(outbox_handle_t outbox)
void outbox_destroy(outbox_handle_t outbox)
{
outbox_delete_all_items(outbox);
free(outbox->list);
free(outbox);
}

View File

@ -3,6 +3,7 @@
#ifdef ESP_PLATFORM
#include "esp_log.h"
#include "esp_mac.h"
#include "soc/soc_caps.h"
#include "esp_timer.h"
#include "esp_random.h"
#include <stdlib.h>
@ -12,13 +13,25 @@ static const char *TAG = "platform";
#define MAX_ID_STRING (32)
#if defined SOC_WIFI_SUPPORTED
#define MAC_TYPE ESP_MAC_WIFI_STA
#elif defined SOC_EMAC_SUPPORTED
#define MAC_TYPE ESP_MAC_ETH
#elif defined SOC_IEEE802154_SUPPORTED
#define MAC_TYPE ESP_MAC_IEEE802154
#endif
char *platform_create_id_string(void)
{
uint8_t mac[6];
char *id_string = calloc(1, MAX_ID_STRING);
ESP_MEM_CHECK(TAG, id_string, return NULL);
esp_read_mac(mac, ESP_MAC_WIFI_STA);
#ifndef MAC_TYPE
ESP_LOGW(TAG, "Soc doesn't provide MAC, client could be disconnected in case of device with same name in the broker.");
sprintf(id_string, "esp_mqtt_client_id");
#else
uint8_t mac[6];
esp_read_mac(mac, MAC_TYPE);
sprintf(id_string, "ESP32_%02x%02X%02X", mac[3], mac[4], mac[5]);
#endif
return id_string;
}

View File

@ -18,7 +18,7 @@ static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
{
bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data);
bool msg_dup = mqtt5_get_dup(client->mqtt_state.connection.outbound_message.data);
if (msg_dup == false) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
@ -35,7 +35,7 @@ void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBCOMP return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_pubcomp_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -47,7 +47,7 @@ void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_puback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -59,7 +59,7 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_UNSUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_unsuback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -71,7 +71,7 @@ void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_suback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_SUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
}
}
@ -81,13 +81,14 @@ esp_err_t esp_mqtt5_parse_connack(esp_mqtt5_client_handle_t client, int *connect
size_t len = client->mqtt_state.in_buffer_read_len;
client->mqtt_state.in_buffer_read_len = 0;
uint8_t ack_flag = 0;
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->mqtt_state.
connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
ESP_LOGE(TAG, "Failed to parse CONNACK packet");
return ESP_FAIL;
}
if (*connect_rsp_code == MQTT_CONNECTION_ACCEPTED) {
ESP_LOGD(TAG, "Connected");
client->event.session_present = ack_flag & 0x01;
client->event.session_present = ack_flag & 0x01;
return ESP_OK;
}
esp_mqtt5_print_error_code(client, *connect_rsp_code);
@ -116,7 +117,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
*msg_topic = esp_mqtt5_client_get_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, msg_topic_len);
if (!*msg_topic) {
ESP_LOGE(TAG, "%s: esp_mqtt5_client_get_topic_alias() failed", __func__);
return ESP_FAIL;
return ESP_FAIL;
}
} else {
if (esp_mqtt5_client_update_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, *msg_topic, *msg_topic_len) != ESP_OK) {
@ -139,7 +140,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
esp_err_t esp_mqtt5_create_default_config(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
client->event.property = calloc(1, sizeof(esp_mqtt5_event_property_t));
ESP_MEM_CHECK(TAG, client->event.property, return ESP_FAIL)
client->mqtt5_config = calloc(1, sizeof(mqtt5_config_storage_t));
@ -172,7 +173,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_UNSUPPORTED_PROTOCOL_VER:
ESP_LOGW(TAG, "Unsupported Protocol Version");
break;
case MQTT5_INVAILD_CLIENT_ID:
case MQTT5_INVALID_CLIENT_ID:
ESP_LOGW(TAG, "Client Identifier not valid");
break;
case MQTT5_BAD_USERNAME_OR_PWD:
@ -202,10 +203,10 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_SESSION_TAKEN_OVER:
ESP_LOGW(TAG, "Session taken over");
break;
case MQTT5_TOPIC_FILTER_INVAILD:
case MQTT5_TOPIC_FILTER_INVALID:
ESP_LOGW(TAG, "Topic Filter invalid");
break;
case MQTT5_TOPIC_NAME_INVAILD:
case MQTT5_TOPIC_NAME_INVALID:
ESP_LOGW(TAG, "Topic Name invalid");
break;
case MQTT5_PACKET_IDENTIFIER_IN_USE:
@ -217,7 +218,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_RECEIVE_MAXIMUM_EXCEEDED:
ESP_LOGW(TAG, "Receive Maximum exceeded");
break;
case MQTT5_TOPIC_ALIAS_INVAILD:
case MQTT5_TOPIC_ALIAS_INVALID:
ESP_LOGW(TAG, "Topic Alias invalid");
break;
case MQTT5_PACKET_TOO_LARGE:
@ -232,7 +233,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_ADMINISTRATIVE_ACTION:
ESP_LOGW(TAG, "Administrative action");
break;
case MQTT5_PAYLOAD_FORMAT_INVAILD:
case MQTT5_PAYLOAD_FORMAT_INVALID:
ESP_LOGW(TAG, "Payload format invalid");
break;
case MQTT5_RETAIN_NOT_SUPPORT:
@ -304,7 +305,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
void esp_mqtt5_client_destory(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt5_config) {
free(client->mqtt5_config->will_property_info.content_type);
free(client->mqtt5_config->will_property_info.response_topic);
@ -416,7 +417,7 @@ esp_err_t esp_mqtt5_client_set_publish_property(esp_mqtt5_client_handle_t client
MQTT_API_LOCK(client);
/* Check protocol version */
if(client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -445,7 +446,7 @@ esp_err_t esp_mqtt5_client_set_subscribe_property(esp_mqtt5_client_handle_t clie
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -482,7 +483,7 @@ esp_err_t esp_mqtt5_client_set_unsubscribe_property(esp_mqtt5_client_handle_t cl
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -513,7 +514,7 @@ esp_err_t esp_mqtt5_client_set_disconnect_property(esp_mqtt5_client_handle_t cli
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -556,7 +557,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -572,7 +573,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
return ESP_FAIL;
} else {
client->mqtt5_config->connect_property_info.maximum_packet_size = connect_property->maximum_packet_size;
}
}
} else {
client->mqtt5_config->connect_property_info.maximum_packet_size = client->mqtt_state.in_buffer_length;
}
@ -757,4 +758,4 @@ void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_pro
}
}
free(user_property);
}
}

File diff suppressed because it is too large Load Diff