Compare commits

...

104 Commits

Author SHA1 Message Date
b43d93c82d fix: Fix host test for github ci. 2024-04-17 14:35:34 +02:00
aa6f889fb4 Merge branch 'fix/error_codes' into 'master'
fix: regard reason codes greater than 0x80 as failures.

See merge request espressif/esp-mqtt!205
2024-04-05 17:09:50 +08:00
1edd167dcb Merge branch 'fix/calloc_failure' into 'master'
PR: Return on allocation failure

See merge request espressif/esp-mqtt!204
2024-04-03 15:29:51 +08:00
c06f1540fe set last_retransmit to now when first connected
when published before connected, the msg will be always retransmitted immediately even it is  send right now
2024-04-02 10:48:45 +02:00
37478a9c00 add return to faile_message, avoid segment fault
should not use response_topic when calloc failed
2024-04-02 10:48:45 +02:00
fd21b27a83 Merge branch 'fix/unused_variable_warning' into 'master'
Minor warning of unused variable

See merge request espressif/esp-mqtt!203
2024-04-02 14:41:47 +08:00
e7b9aa5e6a fix: regard reason codes greater than 0x80 as failures. 2024-04-01 10:37:42 +08:00
726e5f2fce fix: Minor warning of unused variable
In case of hardcoded id mac array was unused.
2024-03-28 14:16:34 +01:00
14f5fa079f Merge branch 'fix/cliend_id' into 'master'
Cover the case for SOC without MAC address

See merge request espressif/esp-mqtt!202
2024-03-28 20:37:54 +08:00
5e3abd4b8c fix: Cover the case for SOC without MAC address
We need to cover for the case where the available MAC isn't defined in
soc caps. E.g. A new target is being introduced but the support isn't
complete yet.
2024-03-28 12:40:07 +01:00
37cb056c5f Merge branch 'feature/atomic_state' into 'master'
Make state and size atomic

See merge request espressif/esp-mqtt!199
2024-03-22 15:26:15 +08:00
74481cbbf4 Merge branch 'fix/log_level_on_timeout' into 'master'
fix: Adjust the log level on few messages to avoid cluthering the logs

See merge request espressif/esp-mqtt!201
2024-03-22 15:05:49 +08:00
c33a0c8e44 Merge branch 'fix/detect_mac_type' into 'master'
fix: Make automatic client_id soc dependent

See merge request espressif/esp-mqtt!200
2024-03-22 15:05:21 +08:00
5c17fc4a1a fix: Adjust the log level on few messages to avoid cluthering the logs 2024-03-21 14:11:57 +01:00
657a2aea77 fix: Make automatic client_id soc dependent
When creating the client_id for user, the library uses the device MAC.
For some of our devices WIFI isn't available and the library needs to select
a different MAC to use.
2024-03-14 07:31:37 +01:00
891380bdf5 feat: Make state and size atomic
This makes the mqtt client state atomic to reduce the scope of locking in
some parts of the code.
2024-03-12 08:49:36 +01:00
fa88da5282 Merge branch 'lifetime_clarification' into 'master'
Clarify data that users need to take care of lifetime.

See merge request espressif/esp-mqtt!197
2024-01-26 19:10:50 +08:00
5a35782272 Merge branch 'fix/subscribe_macro_type' into 'master'
Update mqtt_client.h

See merge request espressif/esp-mqtt!198
2024-01-26 18:54:42 +08:00
acdb66d5c6 add const char * to esp_mqtt_client_subscribe() generic macros 2024-01-25 15:35:32 +07:00
d3e3c4e7ad Merge branch 'bugfix/consolidate_err_messages' into 'master'
client: Report failure on timeout in mid-message timeout (GitHub PR)

See merge request espressif/esp-mqtt!165
2024-01-24 22:23:04 +08:00
371f594cce docs: Clarify data that users need to take care of lifetime.
- Adds to field that the mqtt client doesn't copy a note about it.
2024-01-24 09:38:37 +01:00
ddde502782 client: Report failure on timeout in mid-message timeout
Fixes error processing on network reads:

1) Treat EOF as an error, since the connection is closed (FIN) from the
server side. If we didn't we would try to read (in the next iteration)
from the same socket that has been already closed and get an error
ENOTCONN.
Before the fix:
D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF
E (13800) transport_base: tcp_read error, errno=Socket is not connected
E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128
D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR
E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008
E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80
I (13840) esp_mqtt_demo: Last errno string (Socket is not connected)
E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1
D (13860) mqtt_client: Reconnect after 10000 ms
D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
After the fix:
E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF
E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128
D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR
E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008
I (12450) esp_mqtt_demo: Last errno string (Success)
E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1
D (12470) mqtt_client: Reconnect after 10000 ms
D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED

2) Treat timeouts in the middle of MQTT message reading as errors (if
timeouted for the second time and didn't read a byte)
Before the fix:
D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2
D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2)
D (19190) mqtt_client: mqtt_message_receive: read_len=0
D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready!
E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0
E (19210) mqtt_client: MQTT connect failed
D (19220) mqtt_client: Reconnect after 10000 ms
D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
After the fix:
D (19190) mqtt_client: mqtt_message_receive: read_len=0
E (19190) mqtt_client: Network timeout while reading MQTT message
E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119
D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR
I (19220) esp_mqtt_demo: Last errno string (Success)
E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1
E (19240) mqtt_client: MQTT connect failed
D (19240) mqtt_client: Reconnect after 10000 ms
D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
(Note that the above log is from mid-message timeout of CONNECT message,
which was hadled before the fix. If the mid-message timeout ocurs with
for example SUBACK, the current version would repeatably resend
susbscribe message)

Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 15:40:44 +01:00
7894dd0ace Merge branch 'fix/buffer_creation_in_set_config' into 'master'
fix: Move buffer initialization to set config

See merge request espressif/esp-mqtt!194
2023-10-25 16:16:35 +08:00
ea0df31e00 fix: Move buffer initialization to set config
When calling set config message buffers were not affected because the
creation was handled on init.

Closes https://github.com/espressif/esp-mqtt/issues/267
2023-10-25 07:47:05 +00:00
abd8b6cadc Merge branch 'fix/qos0publishdispatch' into 'master'
Fix check for message creation when processing publish

See merge request espressif/esp-mqtt!195
2023-10-10 16:25:20 +08:00
67800569de fix: Deliver publish verifies if message was created only for QoS >0
The previous version depends on a sucessful message being created prior
to that check. The check only makes sense for the case where puback or
pubrec messages should be created.
2023-10-10 06:47:07 +00:00
e6afdb4025 Merge branch 'fix/check_asprintf_return_value' into 'master'
fix: using return value of asprintf now

See merge request espressif/esp-mqtt!192
2023-09-14 19:46:26 +08:00
c0b40b1293 fix: using return value of asprintf now 2023-09-14 19:46:26 +08:00
dc93367bd5 Merge branch 'fix/outbox_memory_destination' into 'master'
fix: Uses caps allocation for data buffer instead of item struct

See merge request espressif/esp-mqtt!193
2023-09-14 13:47:33 +08:00
00ee059bf8 fix: Uses caps allocation for data buffer instead of item struct
Once introduced the memory destination for outbox was incorrectly
allocating the outbox data structuture instead of data buffer to the
selected memory.
2023-09-13 08:04:38 +02:00
05b347643f Merge branch 'bugfix/mock_tests_include_idf_additions' into 'master'
Fix: Mock test should include idf_additions.h

See merge request espressif/esp-mqtt!191
2023-09-04 20:01:22 +08:00
f35aaa1577 fix: Mock tests include idf_additions.h
A number of IDF specific FreeRTOS API additions will be moved to
`#include "freertos/idf_additions.h". Mock tests need to include these to
properly use them as mock functions.
2023-09-04 15:05:03 +08:00
4b28192d00 Merge branch 'docs/outbox_doxygen' into 'master'
Adds missing documentation to outbox configuration.

See merge request espressif/esp-mqtt!190
2023-08-31 22:22:04 +08:00
c355e0b5ae docs: Adds missing documentation to outbox configuration. 2023-08-31 22:22:04 +08:00
301bd9e028 Merge branch 'fix/big_messages' into 'master'
fix: Added missing update to message data

See merge request espressif/esp-mqtt!189
2023-08-25 15:38:48 +08:00
cc41d1b852 fix: Added missing update to message data
When handling big messages the refactoring introduced a regression when
handling big messages, messages larger than the message buffer.
2023-08-24 09:52:00 +02:00
1ca73479cb Merge branch 'mqtt5_fixing_typo' into 'master'
PR: fixing typos in `mqtt5_error_reason_code`

See merge request espressif/esp-mqtt!188
2023-08-18 14:43:10 +08:00
90b4a4538e feat: Add enum definition with typo to keep backwards compatibility
Added initial enum definitions with deprecated attribute.
Should be removed with a new version that can introduce a breaking
change.
2023-08-17 14:58:49 +02:00
dc775bb52e fixing typos in mqtt5_error_reason_code 2023-07-24 23:23:32 +08:00
fe32d8f224 Merge branch 'docs/clarify_keepalive_timeout' into 'master'
docs: Clarify keepalive timeout

See merge request espressif/esp-mqtt!186
2023-07-11 05:34:46 +08:00
cb1e6cf218 docs: Clarify keepalive timeout
Adds information on the behavior of the PINGREQ message timeout and some
reasoning behind the choice.
2023-07-11 05:34:46 +08:00
cd81773bd1 Merge branch 'fix/format_log_messages' into 'master'
fix: LOG format strings

See merge request espressif/esp-mqtt!187
2023-07-10 17:28:55 +08:00
395aa141c8 Merge branch 'fix/stop_only_if_started' into 'master'
Fix: Stop client only if it's running.

See merge request espressif/esp-mqtt!183
2023-07-10 15:55:33 +08:00
8d98103013 Merge branch 'fix/account_for_failure_in_make_publish' into 'master'
fix: Error on publish message creation was ignored.

See merge request espressif/esp-mqtt!185
2023-06-30 20:42:28 +08:00
a3b04f2d0a fix: LOG format strings
Fix log format strings and remove no-format warning configuration.
2023-06-30 10:17:32 +02:00
585e3ba2e0 fix: Error on publish message creation was ignored.
In the case of make_publish failure the client would just continue and
the error was ignored and not propagated to caller.
2023-06-27 15:13:54 +02:00
36eec6f625 Fix: Stop client only if it's running.
Check for client run instead of lock to call esp_mqtt_client_stop
when destroying the client.
2023-06-21 09:39:42 +02:00
effd1e6705 Merge branch 'bugfix/connection_buffer_allocation' into 'master'
Fix: Allocation for connection buffer was incorrectly done.

See merge request espressif/esp-mqtt!182
2023-06-21 14:57:11 +08:00
6c849c62ef Fix: Allocation for connection buffer was incorrectly done.
By mistake calloc parameters were incorrect.
2023-06-21 08:24:46 +02:00
ee3ea29d52 Merge branch 'fix/host_test' into 'master'
Adds mqtt host tests to Ci

See merge request espressif/esp-mqtt!181
2023-06-20 13:27:02 +08:00
4050df4caf Adds mqtt host tests to Ci
- New job to run host tests
- Fix leak in case of usage of interface name
- Fix host tests to expect a call to `transport_destroy` and add an
  extra case
2023-06-19 10:34:18 +02:00
aee82c7ba8 Merge branch 'bugfix/outbox_init_failure' into 'master'
Fix: Outbox was leaked in case of initialization failure

See merge request espressif/esp-mqtt!180
2023-06-14 21:46:04 +08:00
5896e259ad Merge branch 'feat/bind_interface' into 'master'
feat: Add option to bind interface of use

See merge request espressif/esp-mqtt!179
2023-06-14 21:43:34 +08:00
363fbf7dab feat: Add option to bind interface of use
Enable user to set which interface should be used for client network,
allowing client to be binded to the interface selected by user forcing
it to go through the selected interface.

Closes https://github.com/espressif/esp-mqtt/issues/253
2023-06-14 14:48:44 +02:00
5d491a45ce Fix: Outbox was leaked in case of initialization failure
If the the list allocation fail, outbox must be freed.
2023-06-14 13:36:42 +02:00
63cfec799c Merge branch 'feature/configurable_max_outbox' into 'master'
Add outbox size control feature

See merge request espressif/esp-mqtt!141
2023-06-14 01:02:34 +08:00
372ab7b374 feat: Introduces outbox limit
A memory limit for the outbox can be configured.
User will not be able to publish or enqueue if the new message goes
beyond the configured limit.
2023-06-13 15:59:55 +02:00
21a5491d53 Removes unused outbox functions. 2023-06-13 11:56:11 +02:00
122875bf8a refactor: Group access to output buffer in mqtt_connection_t
- Moves mqtt_connect_info to mqtt_connection_t.
  - Removes outbound_message in favor of accessing it trough connection.
2023-06-13 11:56:05 +02:00
ed628098a1 Merge branch 'feature/custom_transport' into 'master'
Add custom transport configuration

See merge request espressif/esp-mqtt!169
2023-06-13 14:30:24 +08:00
6438676b66 Merge branch 'fix/removed_unused_handler_field' into 'master'
Removes leftover calls to event_handler

See merge request espressif/esp-mqtt!178
2023-06-13 14:29:49 +08:00
a492935951 Removes leftover calls to event_handler
The possibility to add a callback as custom handler was removed from
the client in favor of esp_event. These removes the older alternative
that wasn't possible to use.
2023-06-09 10:44:50 +02:00
a89af4bf8d Merge branch 'cfg-common-name' into 'master'
PR: Added support to set server common name.

See merge request espressif/esp-mqtt!173
2023-06-08 15:08:22 +08:00
6195762d28 Added support to set server common name. 2023-06-08 08:46:41 +02:00
a5c1b441dc feat: Add custom transport configuration
Today there is no way to add a new transport without applying
modifications to the transport list. This impose limitations on the
client usage. Adding the custom configuration we enable user defined
transports.
2023-06-08 06:38:53 +00:00
ffd7d4df6c Fix: Prevent sync verififcation from running on master
Removes the verification in case the pipeline is running on master.
2023-06-08 08:15:38 +02:00
4c2ed1676f CI: Add check for Gitlab/Github remotes in sync
To avoid merging on Gitlab with Github remote out of sync while we are
moving to Github development.
2023-06-08 08:15:38 +02:00
5b8a541939 Merge pull request #258 from euripedesrocha/feature/github_actions
feat: Adds github action
2023-06-05 03:26:25 -03:00
6c3cbdd195 feat: Adds github action
- Initial support for github actions
2023-06-01 13:55:48 +02:00
63ebf8b1a2 Merge branch 'cherry-pick-fa40f446' into 'master'
Merge branch 'bugfix/return_on_qos0_disconnected' into 'master'

See merge request espressif/esp-mqtt!175
2023-06-01 18:51:58 +08:00
89981b7277 Merge branch 'cherry-pick-88413ec3' into 'master'
Merge branch 'bugfix/cpp_compilation' into 'master'

See merge request espressif/esp-mqtt!174
2023-06-01 18:31:40 +08:00
5bd9724c69 Merge branch 'bugfix/return_on_qos0_disconnected' into 'master'
bug: Incorrect return on disconnect qos0 publish

See merge request espressif/esp-mqtt!172

(cherry picked from commit fa40f44695fc4a0cde14a31a01b8a03da14dd702)

58c25577 bug: Incorrect return on disconnect qos0 publish
2023-06-01 17:03:03 +08:00
70cbaca728 Merge branch 'bugfix/cpp_compilation' into 'master'
Fix: Compilation in C++ with multiple subscribe

See merge request espressif/esp-mqtt!171

(cherry picked from commit 88413ec3f27102daa805ae1992dd145b42d4690d)

47da99fb Fix: Compilation in C++ with multiple subscribe
2023-06-01 17:02:05 +08:00
b2bd8e5f49 Merge pull request #257 from gabsuren/feature/mqtt_host_tests
feature: added host tests
2023-05-26 13:40:28 +04:00
77372e81f9 feature: added host tests 2023-05-26 13:21:44 +04:00
fe622fc77a Merge pull request #259 from espressif/fix/jira_sync_action
ci: Test for Jira sync action fix (IDFGH-10224)
2023-05-24 16:12:38 +02:00
490bde044f ci: Fix Jira sync action (action setting based on standard boilerplate) 2023-05-24 15:23:21 +02:00
0e4cec9497 Merge branch 'ci/fix_qemu_build' into 'master'
ci: Fix qemu build against 5.1

See merge request espressif/esp-mqtt!170
2023-05-04 16:01:19 +08:00
94defb867e ci: Fix qemu build against 5.1
Also adds build against v5.1 and master separetely
2023-05-03 15:48:41 +02:00
14b11ad07e Merge branch 'merge_enqueue' into 'master'
Minor cleanups on mqtt client

See merge request espressif/esp-mqtt!168
2023-04-21 15:11:48 +08:00
da6d38a17e Removes pending message count
The information was used only to log remaining messages on debug log.
It was checked on writing but updated prior to every call making the
verification meaningless.
2023-04-21 08:40:12 +02:00
5729048683 Bugfix: Dispatch transport error on all write operations
- During connect the error wasn't dispatched.
- Merged esp_mqtt_write with mqtt_write_data since the only difference
  was the error dispatched.
2023-04-20 09:29:33 +02:00
72833c7f8a Merge enqueue functions
- enqueue and oversized enqueue did the same work with small differences
  this clean up the extra unnecessary function
2023-04-20 09:29:33 +02:00
0b315a01b1 Merge branch 'feature/outbox_memory_selection' into 'master'
Adds a configuration for outbox data destination

See merge request espressif/esp-mqtt!166
2023-04-13 15:20:43 +08:00
2c71f9e69b feat: Adds a configuration for outbox data destination
Allow user to move outbox data to external SPI RAM.
2023-04-12 08:43:40 +00:00
6bcd906b8a Merge branch 'ci/fix_qemu_test' into 'master'
CI: Add configuration for ttfw

See merge request espressif/esp-mqtt!167
2023-04-12 16:43:06 +08:00
d71dcf372a CI: Add configuration for ttfw
File was removed from idf. Adding it here to fix CI before we move the
tests to pytest embedded
2023-04-12 08:49:23 +02:00
9c1826d152 Merge branch 'bugfix/fix_mqtt5_flow_control' into 'master'
mqtt5: Fix flow control will increase count when send fragmented packet

See merge request espressif/esp-mqtt!164
2023-03-29 20:36:13 +08:00
5cce2c4f35 mqtt5: Fix flow control will increase count when send fragmented packet
Closes https://github.com/espressif/esp-mqtt/issues/255
2023-03-17 16:55:25 +08:00
9f2db7b4b6 Merge branch 'bugfix/queue_license' into 'master'
Add license information to queue

See merge request espressif/esp-mqtt!163
2023-03-09 14:37:30 +08:00
36f0faa80d Add license information to queue
File was copied from BSD header without the license information.
2023-03-02 08:02:57 +01:00
7089fac9c0 Merge branch 'prs/mqtt5_fixes' into 'master'
MQTTv5: Fixes and additions from GitHub PRs

See merge request espressif/esp-mqtt!162
2023-02-28 16:25:53 +08:00
65a4fdaff5 fix: Allow MQTT v5 zero length payload
Merges https://github.com/espressif/esp-mqtt/pull/250
2023-02-27 12:46:22 +01:00
1011e63cbe feature: Include subscribe_id in esp_mqtt5_event_property_t 2023-02-09 18:44:47 +00:00
5a156f56b4 Merge branch 'feature/multiple_subscribe' into 'master'
Feature:  Enable SUBSCRIBE to multiple topics

See merge request espressif/esp-mqtt!156
2023-02-08 15:35:31 +08:00
32102558d3 Feature: Enable SUBSCRIBE to multiple topics
- Adds an api for multiple topics on SUBSCRIBE message.

Apply 2 suggestion(s) to 1 file(s)

Removing headers

y
2023-02-03 14:19:24 +01:00
5adbe11aaf Merge branch 'feature/config_read_poll' into 'master'
Adds Kconfig option to configure poll read timeout

See merge request espressif/esp-mqtt!159
2023-02-02 20:52:02 +08:00
2fa945d0b8 Adds Kconfig option to configure poll read timeout
A new Kconfig option was added to allow users to configure poll read
timeout.

Closes: https://github.com/espressif/esp-mqtt/issues/245
2023-02-02 20:52:02 +08:00
86d21e4902 Merge branch 'bugfix/nano_printf_format' into 'master'
Fix formatting when using printf nano

See merge request espressif/esp-mqtt!160
2023-02-02 20:24:24 +08:00
e9b865eb9d Fix formatting when using printf nano 2023-02-02 20:24:24 +08:00
acab02f2c5 Merge branch 'bugfix/fix_mqtt5_flow_control' into 'master'
mqtt5: Fix flow control will regard the DUP packet and not consider PUBCOMP packet

See merge request espressif/esp-mqtt!158
2023-01-18 16:03:31 +08:00
ed76036744 mqtt5: Fix flow control will regard the DUP packet and not consider PUBCOMP packet
Closes https://github.com/espressif/esp-mqtt/issues/243
2023-01-18 14:16:28 +08:00
c96f6f804c Merge branch 'bugfix/coverity_fix' into 'master'
Remove possible null pointer dereferences

See merge request espressif/esp-mqtt!157
2023-01-04 20:26:27 +08:00
f80772b8d7 Bugfix: Remove Remove possible null pointer dereferences
- Removed a possible derefrence on data in case of MQTT5 SUBACK with
  MQTT5 disabled.
- Covered a case of NULL data on message with negative size.
- Use correct type on calloc for alpn_protos
- Changed strcasecmp to strncasecmp.
2022-12-15 13:02:46 +01:00
33 changed files with 1575 additions and 909 deletions

View File

@ -0,0 +1,37 @@
name: Build app an run on target
on:
workflow_call:
inputs:
idf_version:
required: true
type: string
target:
required: true
type: string
app_name:
type: string
required: true
app_path:
type: string
required: true
jobs:
build-app:
uses: "./.github/workflows/build-app.yml"
with:
idf_version: ${{inputs.idf_version}}
target: ${{inputs.target}}
app_name: ${{inputs.app_name}}
app_path: ${{inputs.app_path}}
# run-on-target:
# needs: build-app
# uses: "./.github/workflows/run-on-target.yml"
# with:
# idf_version: ${{inputs.idf_version}}
# target: ${{inputs.target}}
# app_name: ${{inputs.app_name}}
# app_path: ${{inputs.app_path}}

59
.github/workflows/build-app.yml vendored Normal file
View File

@ -0,0 +1,59 @@
name: Build app
on:
workflow_call:
inputs:
idf_version:
required: true
type: string
target:
required: true
type: string
app_name:
type: string
required: true
app_path:
type: string
required: true
upload_artifacts:
type: boolean
default: true
jobs:
build:
name: Build App
runs-on: ubuntu-20.04
container: espressif/idf:${{inputs.idf_version}}
steps:
- if: ${{ env.ACT }}
name: Add node for local tests
run: |
curl -fsSL https://deb.nodesource.com/setup_14.x | bash -
apt-get install -y nodejs
- name: Checkout esp-mqtt
uses: actions/checkout@v3
- name: ccache
uses: hendrikmuhs/ccache-action@v1.2
with:
key: ${{inputs.idf_version}}-${{inputs.target}}
- name: Build ${{ inputs.app_name }} with IDF-${{ inputs.idf_version }}
shell: bash
run: |
${IDF_PATH}/install.sh --enable-pytest
. ${IDF_PATH}/export.sh
python -m pip install idf-build-apps
rm -rf $IDF_PATH/components/mqtt/esp-mqtt
cp -r . $IDF_PATH/components/mqtt/esp-mqtt
IDF_CCACHE_ENABLE=1 idf-build-apps build --config-file ci/idf_build_apps.toml -p ${{inputs.app_path}} -t ${{inputs.target}}
- name: Upload files to artifacts for run-target job
uses: actions/upload-artifact@v3
if: ${{inputs.upload_artifacts}}
with:
name: mqtt_bin_${{inputs.target}}_${{ inputs.idf_version }}_${{ inputs.app_name }}
path: |
build_${{inputs.target}}_${{inputs.app_name}}/bootloader/bootloader.bin
build_${{inputs.target}}_${{inputs.app_name}}/partition_table/partition-table.bin
build_${{inputs.target}}_${{inputs.app_name}}/*.bin
build_${{inputs.target}}_${{inputs.app_name}}/*.elf
build_${{inputs.target}}_${{inputs.app_name}}/flasher_args.json
if-no-files-found: error

80
.github/workflows/mqtt__host-tests.yml vendored Normal file
View File

@ -0,0 +1,80 @@
name: "esp-mqtt: host-tests"
on:
push:
branches:
- master
pull_request:
types: [opened, synchronize, reopened, labeled]
jobs:
host_test_esp_mqtt:
name: Host Tests
runs-on: ubuntu-22.04
permissions:
contents: write
container: espressif/idf:latest
env:
COMP_DIR: components/mqtt/esp-mqtt
steps:
- name: Checkout esp-mqtt
uses: actions/checkout@v3
- name: Build and Test
shell: bash
run: |
apt-get update && apt-get install -y gcc g++ python3-pip rsync
${IDF_PATH}/install.sh
. ${IDF_PATH}/export.sh
echo "IDF_PATH=${IDF_PATH}" >> $GITHUB_ENV
rm -rf $IDF_PATH/${{ env.COMP_DIR }}
cp -r . $IDF_PATH/${{ env.COMP_DIR }}
cd $IDF_PATH/${{ env.COMP_DIR }}/host_test
idf.py build
./build/host_mqtt_client_test.elf -r junit -o junit.xml
- name: Build with Coverage Enabled
shell: bash
run: |
. ${IDF_PATH}/export.sh
cd $IDF_PATH/${{ env.COMP_DIR }}/host_test
cat sdkconfig.ci.coverage >> sdkconfig.defaults
rm -rf build sdkconfig
idf.py build
./build/host_mqtt_client_test.elf
- name: Run gcovr
shell: bash
run: |
python -m pip install gcovr
cd $IDF_PATH/${{ env.COMP_DIR }}
gcov -b host_test/main/mqtt_client.c. -o `find . -name "mqtt_client*gcda" -exec dirname {} \;`
gcovr --gcov-ignore-parse-errors -g -k -r . --html index.html -x esp_mqtt_coverage.xml
mkdir docs_gcovr
mv index.html docs_gcovr
touch docs_gcovr/.nojekyll
cp -r docs_gcovr esp_mqtt_coverage.xml $GITHUB_WORKSPACE
- name: Code Coverage Summary Report
uses: irongut/CodeCoverageSummary@v1.3.0
with:
filename: ${{ env.GITHUB_WORKSPACE }}/**/esp_mqtt_coverage.xml
badge: true
fail_below_min: false
format: markdown
hide_branch_rate: false
hide_complexity: false
indicators: true
output: both
thresholds: '60 80'
- name: Write to Job Summary
run: cat code-coverage-results.md >> $GITHUB_STEP_SUMMARY
- name: Upload artifacts
uses: actions/upload-artifact@v3
if: always()
with:
name: docs_gcovr
path: ${{ env.IDF_PATH }}/${{ env.COMP_DIR }}/docs_gcovr
if-no-files-found: error
- name: Deploy coverage summary
if: github.ref == 'refs/heads/master'
uses: JamesIves/github-pages-deploy-action@v4.4.1
with:
branch: gh-pages
folder: ${{ env.IDF_PATH }}/${{ env.COMP_DIR }}/docs_gcovr

View File

@ -1,16 +1,25 @@
name: Sync PRs to JIRA
name: Sync remain PRs to Jira
# This workflow will be triggered when a pull request is opened
on: pull_request
# This workflow will be triggered every hour, to sync remaining PRs (i.e. PRs with zero comment) to Jira project
# Note that, PRs can also get synced when new PR comment is created
on:
schedule:
- cron: "0 * * * *"
# Limit to single concurrent run for workflows which can create Jira issues.
# Same concurrency group is used in issue_comment.yml
concurrency: jira_issues
jobs:
sync_prs_to_jira:
name: Sync PRs to Jira
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2
- name: Sync PRs to Jira project
uses: espressif/github-actions/sync_issues_to_jira@master
with:
cron_job: true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
JIRA_PASS: ${{ secrets.JIRA_PASS }}

64
.github/workflows/run-on-target.yml vendored Normal file
View File

@ -0,0 +1,64 @@
name: Run on target
on:
workflow_call:
inputs:
idf_version:
required: true
type: string
target:
required: true
type: string
app_name:
type: string
required: true
app_path:
type: string
required: true
jobs:
target-test:
if: github.repository == 'espressif/esp-mqtt'
name: Run App on target
env:
IDF_PATH: idf
runs-on: [self-hosted, ESP32-ETHERNET-KIT]
steps:
- name: Select idf ref
id: detect_version
run: |
if echo "${{inputs.idf_version}}" | grep "latest" -> /dev/null ; then
echo ref="master" >> "$GITHUB_OUTPUT"
else
echo ref="`echo ${{matrix.idf_version}} | sed -s "s/-/\//"`" >> "$GITHUB_OUTPUT"
fi
- name: IP discovery
id: detect_ip
run: |
apt-get update && apt-get install -y iproute2
ip route
echo runner_ip ="`ip -4 addr show eth0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}'`" >> "$GITHUB_OUTPUT"
- name: Checkout IDF ${{inputs.idf_version}}
uses: actions/checkout@v3
with:
repository: espressif/esp-idf
path: ${{env.IDF_PATH}}
ref: ${{steps.detect_version.outputs.ref}}
- name: Install Python packages
env:
PIP_EXTRA_INDEX_URL: "https://dl.espressif.com/pypi/"
run: |
pip install --only-binary cryptography -r ${{env.IDF_PATH}}/tools/requirements/requirements.pytest.txt
- uses: actions/download-artifact@v3
with:
name: mqtt_bin_${{inputs.target}}_${{ inputs.idf_version }}_${{ inputs.app_name }}
path: build
- name: Run ${{inputs.app_name}} application on ${{inputs.target}}
run: |
python -m pytest ${{inputs.app_path}} --log-cli-level DEBUG --app-path . --junit-xml=./results_${{inputs.app_name}}_${{inputs.idf_version}}.xml --target=${{inputs.target}}
- uses: actions/upload-artifact@v3
if: always()
with:
name: results_${{inputs.app_name}}_${{inputs.idf_version}}.xml
path: build/*.xml

66
.github/workflows/test-examples.yml vendored Normal file
View File

@ -0,0 +1,66 @@
name: "Build example apps"
on:
push:
branches:
- master
pull_request:
types: [opened, synchronize, reopened, labeled]
jobs:
cpp-compatibility:
name: Cpp compatibility
strategy:
matrix:
idf_version: ["release-v5.0", "release-v5.1", "latest"]
target: ["esp32"]
example: [{name: cpp_compatibility, path: "build_test"}]
uses: "./.github/workflows/build-app.yml"
with:
idf_version: ${{matrix.idf_version}}
target: ${{matrix.target}}
app_name: ${{matrix.example.name}}
app_path: $IDF_PATH/tools/test_apps/protocols/mqtt/${{matrix.example.path}}
upload_artifacts: false
build-only-example:
name: Build Only Apps
strategy:
matrix:
idf_version: ["release-v5.0", "release-v5.1", "latest"]
target: ["esp32s2", "esp32c3", "esp32s3"]
example: [{name: ssl_psk, path: "mqtt/ssl_psk"}]
uses: "./.github/workflows/build-app.yml"
with:
idf_version: ${{matrix.idf_version}}
target: ${{matrix.target}}
app_name: ${{matrix.example.name}}
app_path: $IDF_PATH/examples/protocols/${{matrix.example.path}}
build-only-ds-example:
name: Build Only Apps
strategy:
matrix:
idf_version: ["release-v5.0", "release-v5.1", "latest"]
target: ["esp32s2", "esp32c3", "esp32s3"]
example: [{name: ssl_ds, path: "mqtt/ssl_ds"}, {name: ssl_psk, path: "mqtt/ssl_psk"}]
uses: "./.github/workflows/build-app.yml"
with:
idf_version: ${{matrix.idf_version}}
target: ${{matrix.target}}
app_name: ${{matrix.example.name}}
app_path: $IDF_PATH/examples/protocols/${{matrix.example.path}}
build-examples:
name: Build and Run on target
strategy:
matrix:
idf_version: ["release-v5.0", "release-v5.1", "latest"]
target: ["esp32"]
example: [{name: tcp, path: "mqtt/tcp"}, {name: ssl, path: "mqtt/ssl"},{name: ssl_mutual_auth, path: "mqtt/ssl_mutual_auth"},{name: ws, path: "mqtt/ws"},{name: wss, path: "mqtt/wss"}]
uses: "./.github/workflows/build-and-target-test.yml"
with:
idf_version: ${{matrix.idf_version}}
target: ${{matrix.target}}
app_name: ${{matrix.example.name}}
app_path: $IDF_PATH/examples/protocols/${{matrix.example.path}}

View File

@ -1,5 +1,6 @@
stages:
- build
- test
- deploy
@ -40,10 +41,24 @@ build_idf_v5.0:
extends: .build_template
image: espressif/idf:release-v5.0
build_idf_v5.1:
extends: .build_template
image: espressif/idf:release-v5.1
build_idf_latest:
extends: .build_template
image: espressif/idf:latest
build_and_host_test:
stage: build
image: espressif/idf:latest
script:
# Replace the IDF's default esp-mqtt with this version
- rm -rf $IDF_PATH/components/mqtt/esp-mqtt && cp -r $MQTT_PATH $IDF_PATH/components/mqtt/
- cd $IDF_PATH/components/mqtt/esp-mqtt/host_test
- idf.py build
- build/host_mqtt_client_test.elf
build_and_test_qemu:
stage: build
image: ${CI_DOCKER_REGISTRY}/qemu-v5.1:1-20220802
@ -55,7 +70,7 @@ build_and_test_qemu:
- export IDF_PATH=$CI_PROJECT_DIR/esp-idf
- git clone "${IDF_REPO}"
# switch to IDF and setup the tools
- $MQTT_PATH/ci/set_idf.sh master
- $MQTT_PATH/ci/set_idf.sh release/v5.1
- $IDF_PATH/tools/idf_tools.py install-python-env
- cd $IDF_PATH && tools/idf_tools.py --non-interactive install && eval "$(tools/idf_tools.py --non-interactive export)"
# Remove `debug_backend` and Add `paho-mqtt` to the required packages
@ -74,7 +89,18 @@ build_and_test_qemu:
- export MQTT_PUBLISH_MSG_len_1=2 MQTT_PUBLISH_MSG_repeat_1=50
- export MQTT_PUBLISH_MSG_len_2=128 MQTT_PUBLISH_MSG_repeat_2=2
- export MQTT_PUBLISH_MSG_len_3=20 MQTT_PUBLISH_MSG_repeat_3=20
- python Runner.py $TEST_PATH -c $TEST_PATH/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
- python Runner.py $TEST_PATH -c $MQTT_PATH/ci/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
check_remotes_sync:
stage: test
except:
- master
- idf
script:
- *add_gh_key_remote
- git fetch --depth=1 origin master
- git fetch --depth=1 github master
- test "$(git rev-parse origin/master)" == "$(git rev-parse github/master)"
push_master_to_github:
stage: deploy

View File

@ -12,7 +12,3 @@ idf_component_register(SRCS "${srcs}"
PRIV_REQUIRES esp_timer http_parser esp_hw_support heap
KCONFIG ${CMAKE_CURRENT_LIST_DIR}/Kconfig
)
target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format")

14
Kconfig
View File

@ -124,6 +124,13 @@ menu "ESP-MQTT Configurations"
help
MQTT task priority. Higher number denotes higher priority.
config MQTT_POLL_READ_TIMEOUT_MS
int "MQTT transport poll read timeut"
default 1000
depends on MQTT_USE_CUSTOM_CONFIG
help
Timeout when polling underlying transport for read.
config MQTT_EVENT_QUEUE_SIZE
int "Number of queued events."
default 1
@ -145,6 +152,13 @@ menu "ESP-MQTT Configurations"
bool "Core 1"
endchoice
config MQTT_OUTBOX_DATA_ON_EXTERNAL_MEMORY
bool "Use external memory for outbox data"
default n
depends on MQTT_USE_CUSTOM_CONFIG
help
Set to true to use external memory for outbox data.
config MQTT_CUSTOM_OUTBOX
bool "Enable custom outbox implementation"
default n

11
ci/build-test-rules.yml Normal file
View File

@ -0,0 +1,11 @@
# Documentation: .gitlab/ci/README.md#manifest-file-to-control-the-buildtest-apps
examples/protocols:
enable:
- if: IDF_TARGET in ["esp32"]
examples/protocols/mqtt/ssl_ds:
disable:
- if: SOC_DIG_SIGN_SUPPORTED != 1
temporary: false
reason: DS not present

2
ci/idf_build_apps.toml Normal file
View File

@ -0,0 +1,2 @@
build_dir = "$GITHUB_WORKSPACE/build_@t_@n"
config="sdkconfig.ci"

View File

@ -0,0 +1,7 @@
CaseConfig:
- name: test_app_protocol_mqtt_publish_connect
overwrite:
dut:
class: ESP32QEMUDUT
package: ttfw_idf

View File

@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.16)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
set(COMPONENTS main)
set(COMPONENTS mqtt main)
list(APPEND EXTRA_COMPONENT_DIRS
"mocks/heap/"
"$ENV{IDF_PATH}/tools/mocks/esp_hw_support/"
@ -11,7 +11,6 @@ list(APPEND EXTRA_COMPONENT_DIRS
"$ENV{IDF_PATH}/tools/mocks/lwip/"
"$ENV{IDF_PATH}/tools/mocks/esp-tls/"
"$ENV{IDF_PATH}/tools/mocks/http_parser/"
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/"
)
"$ENV{IDF_PATH}/tools/mocks/tcp_transport/")
project(host_mqtt_client_test)

View File

@ -1,3 +1,20 @@
idf_component_register(SRCS "test_mqtt_client.cpp"
INCLUDE_DIRS "$ENV{IDF_PATH}/tools/catch"
REQUIRES cmock mqtt esp_timer esp_hw_support http_parser log)
target_compile_options(${COMPONENT_LIB} PUBLIC -fsanitize=address -fconcepts)
target_link_options(${COMPONENT_LIB} PUBLIC -fsanitize=address)
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
target_compile_definitions(${mqtt} PRIVATE SOC_WIFI_SUPPORTED=1)
target_compile_options(${mqtt} PUBLIC -fsanitize=address -fconcepts)
target_link_options(${mqtt} PUBLIC -fsanitize=address)
if(CONFIG_GCOV_ENABLED)
target_compile_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
target_link_options(${COMPONENT_LIB} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
target_compile_options(${mqtt} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
target_link_options(${mqtt} PUBLIC --coverage -fprofile-arcs -ftest-coverage)
endif()

9
host_test/main/Kconfig Normal file
View File

@ -0,0 +1,9 @@
menu "Host-test config"
config GCOV_ENABLED
bool "Coverage analyzer"
default n
help
Enables coverage analyzing for host tests.
endmenu

View File

@ -3,9 +3,16 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <memory>
#include <net/if.h>
#include <random>
#include <string_view>
#include <type_traits>
#include "esp_transport.h"
#define CATCH_CONFIG_MAIN // This tells the catch header to generate a main
#include "catch.hpp"
#include "mqtt_client.h"
extern "C" {
#include "Mockesp_event.h"
#include "Mockesp_mac.h"
@ -17,6 +24,10 @@ extern "C" {
#include "Mockhttp_parser.h"
#include "Mockqueue.h"
#include "Mocktask.h"
#if __has_include ("Mockidf_additions.h")
/* Some functions were moved from "task.h" to "idf_additions.h" */
#include "Mockidf_additions.h"
#endif
#include "Mockesp_timer.h"
/*
@ -29,98 +40,130 @@ extern "C" {
}
}
#include "mqtt_client.h"
struct ClientInitializedFixture {
esp_mqtt_client_handle_t client;
ClientInitializedFixture()
{
[[maybe_unused]] auto protect = TEST_PROTECT();
int mtx;
int transport_list;
int transport;
int event_group;
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
esp_timer_get_time_IgnoreAndReturn(0);
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
xQueueCreateMutex_ExpectAnyArgsAndReturn(
reinterpret_cast<QueueHandle_t>(&mtx));
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
http_parser_parse_url_IgnoreAndReturn(0);
http_parser_url_init_ExpectAnyArgs();
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
esp_read_mac_IgnoreAndReturn(ESP_OK);
esp_read_mac_ReturnThruPtr_mac(mac);
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
vEventGroupDelete_Ignore();
vQueueDelete_Ignore();
esp_mqtt_client_config_t config{};
client = esp_mqtt_client_init(&config);
}
~ClientInitializedFixture()
{
esp_mqtt_client_destroy(client);
}
};
TEST_CASE_METHOD(ClientInitializedFixture, "Client set uri")
auto random_string(std::size_t n)
{
struct http_parser_url ret_uri = {
.field_set = 1,
.port = 0,
.field_data = { { 0, 1} }
};
SECTION("User set a correct URI") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client, " ");
REQUIRE(res == ESP_OK);
}
SECTION("Incorrect URI from user") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client, " ");
REQUIRE(res == ESP_FAIL);
}
static constexpr std::string_view char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ123456790";
std::string str;
std::sample(char_set.begin(), char_set.end(), std::back_inserter(str), n,
std::mt19937 {std::random_device{}()});
return str;
}
TEST_CASE_METHOD(ClientInitializedFixture, "Client Start")
using unique_mqtt_client = std::unique_ptr < std::remove_pointer_t<esp_mqtt_client_handle_t>, decltype([](esp_mqtt_client_handle_t client)
{
SECTION("Successful start") {
esp_mqtt_client_destroy(client);
}) >;
SCENARIO("MQTT Client Operation")
{
// Set expectations for the mocked calls.
int mtx = 0;
int transport_list = 0;
int transport = 0;
int event_group = 0;
uint8_t mac[] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
esp_timer_get_time_IgnoreAndReturn(0);
xQueueTakeMutexRecursive_IgnoreAndReturn(true);
xQueueGiveMutexRecursive_IgnoreAndReturn(true);
xQueueCreateMutex_ExpectAnyArgsAndReturn(
reinterpret_cast<QueueHandle_t>(&mtx));
xEventGroupCreate_IgnoreAndReturn(reinterpret_cast<EventGroupHandle_t>(&event_group));
esp_transport_list_init_IgnoreAndReturn(reinterpret_cast<esp_transport_list_handle_t>(&transport_list));
esp_transport_tcp_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ssl_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_init_IgnoreAndReturn(reinterpret_cast<esp_transport_handle_t>(&transport));
esp_transport_ws_set_subprotocol_IgnoreAndReturn(ESP_OK);
esp_transport_list_add_IgnoreAndReturn(ESP_OK);
esp_transport_set_default_port_IgnoreAndReturn(ESP_OK);
http_parser_url_init_Ignore();
esp_event_loop_create_IgnoreAndReturn(ESP_OK);
esp_read_mac_IgnoreAndReturn(ESP_OK);
esp_read_mac_ReturnThruPtr_mac(mac);
esp_transport_list_destroy_IgnoreAndReturn(ESP_OK);
esp_transport_destroy_IgnoreAndReturn(ESP_OK);
vEventGroupDelete_Ignore();
vQueueDelete_Ignore();
GIVEN("An a minimal config") {
esp_mqtt_client_config_t config{};
config.broker.address.uri = "mqtt://1.1.1.1";
struct http_parser_url ret_uri = {
.field_set = 1 | (1<<1),
.field_set = 1 | (1 << 1),
.port = 0,
.field_data = { { 0, 4 } /*mqtt*/, { 7, 1 } } // at least *scheme* and *host*
};
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdTRUE);
auto res = esp_mqtt_set_config(client, &config);
REQUIRE(res == ESP_OK);
res = esp_mqtt_client_start(client);
REQUIRE(res == ESP_OK);
}
SECTION("Failed on initialization") {
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
auto res = esp_mqtt_client_start(nullptr);
REQUIRE(res == ESP_ERR_INVALID_ARG);
}
SECTION("Client already started") {}
SECTION("Failed to start task") {
xTaskCreatePinnedToCore_ExpectAnyArgsAndReturn(pdFALSE);
auto res = esp_mqtt_client_start(client);
REQUIRE(res == ESP_FAIL);
SECTION("Client with minimal config") {
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
REQUIRE(client != nullptr);
SECTION("User will set a new uri") {
struct http_parser_url ret_uri = {
.field_set = 1,
.port = 0,
.field_data = { { 0, 1} }
};
SECTION("User set a correct URI") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client.get(), " ");
REQUIRE(res == ESP_OK);
}
SECTION("Incorrect URI from user") {
http_parser_parse_url_StopIgnore();
http_parser_parse_url_ExpectAnyArgsAndReturn(1);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
auto res = esp_mqtt_client_set_uri(client.get(), " ");
REQUIRE(res == ESP_FAIL);
}
}
SECTION("User set interface to use"){
http_parser_parse_url_ExpectAnyArgsAndReturn(0);
http_parser_parse_url_ReturnThruPtr_u(&ret_uri);
struct ifreq if_name = {.ifr_ifrn = {"custom"}};
config.network.if_name = &if_name;
SECTION("Client is not started"){
REQUIRE(esp_mqtt_set_config(client.get(), &config)== ESP_OK);
}
}
SECTION("After Start Client Is Cleanly destroyed") {
REQUIRE(esp_mqtt_client_start(client.get()) == ESP_OK);
// Only need to start the client, destroy is called automatically at the end of
// scope
}
}
SECTION("Client with all allocating configuration set") {
auto host = random_string(20);
auto path = random_string(10);
auto username = random_string(10);
auto client_id = random_string(10);
auto password = random_string(10);
auto lw_topic = random_string(10);
auto lw_msg = random_string(10);
config.broker = {.address = {
.hostname = host.data(),
.path = path.data()
}
};
config.credentials = {
.username = username.data(),
.client_id = client_id.data(),
.authentication = {
.password = password.data()
}
};
config.session = {
.last_will {
.topic = lw_topic.data(),
.msg = lw_msg.data()
}
};
auto client = unique_mqtt_client{esp_mqtt_client_init(&config)};
REQUIRE(client != nullptr);
}
}
}

View File

@ -1,3 +1,8 @@
/*
* SPDX-FileCopyrightText: 1991-1993 The Regents of the University of California
*
* SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once
/* Implementation from BSD headers*/

View File

@ -0,0 +1 @@
CONFIG_GCOV_ENABLED=y

View File

@ -19,41 +19,46 @@ typedef struct esp_mqtt_client *esp_mqtt5_client_handle_t;
* MQTT5 protocol error reason code, more details refer to MQTT5 protocol document section 2.4
*/
enum mqtt5_error_reason_code {
MQTT5_UNSPECIFIED_ERROR = 0x80,
MQTT5_MALFORMED_PACKET = 0x81,
MQTT5_PROTOCOL_ERROR = 0x82,
MQTT5_IMPLEMENT_SPECIFIC_ERROR = 0x83,
MQTT5_UNSUPPORTED_PROTOCOL_VER = 0x84,
MQTT5_INVAILD_CLIENT_ID = 0x85,
MQTT5_BAD_USERNAME_OR_PWD = 0x86,
MQTT5_NOT_AUTHORIZED = 0x87,
MQTT5_SERVER_UNAVAILABLE = 0x88,
MQTT5_SERVER_BUSY = 0x89,
MQTT5_BANNED = 0x8A,
MQTT5_SERVER_SHUTTING_DOWN = 0x8B,
MQTT5_BAD_AUTH_METHOD = 0x8C,
MQTT5_KEEP_ALIVE_TIMEOUT = 0x8D,
MQTT5_SESSION_TAKEN_OVER = 0x8E,
MQTT5_TOPIC_FILTER_INVAILD = 0x8F,
MQTT5_TOPIC_NAME_INVAILD = 0x90,
MQTT5_PACKET_IDENTIFIER_IN_USE = 0x91,
MQTT5_PACKET_IDENTIFIER_NOT_FOUND = 0x92,
MQTT5_RECEIVE_MAXIMUM_EXCEEDED = 0x93,
MQTT5_TOPIC_ALIAS_INVAILD = 0x94,
MQTT5_PACKET_TOO_LARGE = 0x95,
MQTT5_MESSAGE_RATE_TOO_HIGH = 0x96,
MQTT5_QUOTA_EXCEEDED = 0x97,
MQTT5_ADMINISTRATIVE_ACTION = 0x98,
MQTT5_PAYLOAD_FORMAT_INVAILD = 0x99,
MQTT5_RETAIN_NOT_SUPPORT = 0x9A,
MQTT5_QOS_NOT_SUPPORT = 0x9B,
MQTT5_USE_ANOTHER_SERVER = 0x9C,
MQTT5_SERVER_MOVED = 0x9D,
MQTT5_SHARED_SUBSCR_NOT_SUPPORTED = 0x9E,
MQTT5_CONNECTION_RATE_EXCEEDED = 0x9F,
MQTT5_MAXIMUM_CONNECT_TIME = 0xA0,
MQTT5_SUBSCRIBE_IDENTIFIER_NOT_SUPPORT = 0xA1,
MQTT5_WILDCARD_SUBSCRIBE_NOT_SUPPORT = 0xA2,
MQTT5_UNSPECIFIED_ERROR = 0x80,
MQTT5_MALFORMED_PACKET = 0x81,
MQTT5_PROTOCOL_ERROR = 0x82,
MQTT5_IMPLEMENT_SPECIFIC_ERROR = 0x83,
MQTT5_UNSUPPORTED_PROTOCOL_VER = 0x84,
MQTT5_INVAILD_CLIENT_ID __attribute__((deprecated)) = 0x85,
MQTT5_INVALID_CLIENT_ID = 0x85,
MQTT5_BAD_USERNAME_OR_PWD = 0x86,
MQTT5_NOT_AUTHORIZED = 0x87,
MQTT5_SERVER_UNAVAILABLE = 0x88,
MQTT5_SERVER_BUSY = 0x89,
MQTT5_BANNED = 0x8A,
MQTT5_SERVER_SHUTTING_DOWN = 0x8B,
MQTT5_BAD_AUTH_METHOD = 0x8C,
MQTT5_KEEP_ALIVE_TIMEOUT = 0x8D,
MQTT5_SESSION_TAKEN_OVER = 0x8E,
MQTT5_TOPIC_FILTER_INVAILD __attribute__((deprecated)) = 0x8F,
MQTT5_TOPIC_FILTER_INVALID = 0x8F,
MQTT5_TOPIC_NAME_INVAILD __attribute__((deprecated)) = 0x90,
MQTT5_TOPIC_NAME_INVALID = 0x90,
MQTT5_PACKET_IDENTIFIER_IN_USE = 0x91,
MQTT5_PACKET_IDENTIFIER_NOT_FOUND = 0x92,
MQTT5_RECEIVE_MAXIMUM_EXCEEDED = 0x93,
MQTT5_TOPIC_ALIAS_INVAILD __attribute__((deprecated)) = 0x94,
MQTT5_TOPIC_ALIAS_INVALID = 0x94,
MQTT5_PACKET_TOO_LARGE = 0x95,
MQTT5_MESSAGE_RATE_TOO_HIGH = 0x96,
MQTT5_QUOTA_EXCEEDED = 0x97,
MQTT5_ADMINISTRATIVE_ACTION = 0x98,
MQTT5_PAYLOAD_FORMAT_INVAILD __attribute__((deprecated)) = 0x99,
MQTT5_PAYLOAD_FORMAT_INVALID = 0x99,
MQTT5_RETAIN_NOT_SUPPORT = 0x9A,
MQTT5_QOS_NOT_SUPPORT = 0x9B,
MQTT5_USE_ANOTHER_SERVER = 0x9C,
MQTT5_SERVER_MOVED = 0x9D,
MQTT5_SHARED_SUBSCR_NOT_SUPPORTED = 0x9E,
MQTT5_CONNECTION_RATE_EXCEEDED = 0x9F,
MQTT5_MAXIMUM_CONNECT_TIME = 0xA0,
MQTT5_SUBSCRIBE_IDENTIFIER_NOT_SUPPORT = 0xA1,
MQTT5_WILDCARD_SUBSCRIBE_NOT_SUPPORT = 0xA2,
};
/**
@ -65,7 +70,7 @@ typedef struct mqtt5_user_property_list_t *mqtt5_user_property_handle_t;
* MQTT5 protocol connect properties and will properties configuration, more details refer to MQTT5 protocol document section 3.1.2.11 and 3.3.2.3
*/
typedef struct {
uint32_t session_expiry_interval; /*!< The interval time of session expiry */
uint32_t session_expiry_interval; /*!< The interval time of session expiry */
uint32_t maximum_packet_size; /*!< The maximum packet size that we can receive */
uint16_t receive_maximum; /*!< The maximum pakcket count that we process concurrently */
uint16_t topic_alias_maximum; /*!< The maximum topic alias that we support */
@ -138,6 +143,7 @@ typedef struct {
uint16_t correlation_data_len; /*!< Correlation data length of the message */
char *content_type; /*!< Content type of the message */
int content_type_len; /*!< Content type length of the message */
uint16_t subscribe_id; /*!< Subscription identifier of the message */
mqtt5_user_property_handle_t user_property; /*!< The handle for user property, call function esp_mqtt5_client_delete_user_property to free the memory */
} esp_mqtt5_event_property_t;
@ -269,7 +275,7 @@ uint8_t esp_mqtt5_client_get_user_property_count(mqtt5_user_property_handle_t us
* @brief Free the user property list
*
* @param user_property user_property handle
*
*
* This API will free the memory in user property list and free user_property itself
*/
void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_property);

View File

@ -12,6 +12,7 @@
#include <string.h>
#include "esp_err.h"
#include "esp_event.h"
#include "esp_transport.h"
#ifdef CONFIG_MQTT_PROTOCOL_5
#include "mqtt5_client.h"
#endif
@ -32,7 +33,7 @@ typedef struct esp_mqtt_client *esp_mqtt_client_handle_t;
* @brief *MQTT* event types.
*
* User event handler receives context data in `esp_mqtt_event_t` structure with
* - `client` - *MQTT* client handle
* - client - *MQTT* client handle
* - various other data depending on event type
*
*/
@ -213,8 +214,6 @@ typedef struct esp_mqtt_event_t {
typedef esp_mqtt_event_t *esp_mqtt_event_handle_t;
typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
/**
* *MQTT* client configuration structure
*
@ -223,132 +222,157 @@ typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
* character and the related len field set to 0. DER format requires a related len field set to the correct length.
*/
typedef struct esp_mqtt_client_config_t {
/**
* Broker related configuration
*/
struct broker_t {
/**
* Broker address
*
* - uri have precedence over other fields
* - If uri isn't set at least hostname, transport and port should.
*/
struct address_t {
const char *uri; /*!< Complete *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< Selects transport*/
const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker address configuration */
/**
* Broker identity verification
*
* If fields are not set broker's identity isn't verified. it's recommended
* to set the options in this struct for security reasons.
*/
struct verification_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
/**
* Broker related configuration
*/
struct broker_t {
/**
* Broker address
*
* - uri have precedence over other fields
* - If uri isn't set at least hostname, transport and port should.
*/
struct address_t {
const char *uri; /*!< Complete *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< Selects transport*/
const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker address configuration */
/**
* Broker identity verification
*
* If fields are not set broker's identity isn't verified. it's recommended
* to set the options in this struct for security reasons.
*/
struct verification_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
documentation for details. */
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
the usage of certificate bundles. */
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
the usage of certificate bundles. Client only attach the bundle, the clean up must be done by the user. */
const char *certificate; /*!< Certificate data, default is NULL. It's not copied nor freed by the client, user needs to clean up.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
authentication (as alternative to certificate verification).
PSK is enabled only if there are no other ways to
verify broker.*/
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
verify broker. It's not copied nor freed by the client, user needs to clean up.*/
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
security of TLS and makes the *MQTT* client susceptible to MITM attacks */
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
} verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */
/**
* Client related credentials for authentication.
*/
struct credentials_t {
const char *username; /*!< *MQTT* username */
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN.*/
const char *common_name; /*!< Pointer to the string containing server certificate common name.
If non-NULL, server certificate CN must match this name,
If NULL, server certificate CN must match hostname.
This is ignored if skip_cert_common_name_check=true.
It's not copied nor freed by the client, user needs to clean up.*/
} verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */
/**
* Client related credentials for authentication.
*/
struct credentials_t {
const char *username; /*!< *MQTT* username */
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are
last 3 bytes of MAC address in hex format */
bool set_null_client_id; /*!< Selects a NULL client id */
/**
* Client authentication
*
* Fields related to client authentication by broker
*
* For mutual authentication using TLS, user could select certificate and key,
* secure element or digital signature peripheral if available.
*
*/
struct authentication_t {
const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
authentication is not needed. Must be provided with `key`.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
is not needed. If it is not NULL, also `certificate` has to be provided.*/
size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
`key_password_len` must be correctly set. */
int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
available in some Espressif devices. */
} authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */
/**
* *MQTT* Session related configuration
*/
struct session_t {
/**
* Last Will and Testament message configuration.
*/
struct last_will_t {
const char *topic; /*!< LWT (Last Will and Testament) message topic */
const char *msg; /*!< LWT message, may be NULL terminated*/
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
int qos; /*!< LWT message QoS */
int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
bool set_null_client_id; /*!< Selects a NULL client id */
/**
* Client authentication
*
* Fields related to client authentication by broker
*
* For mutual authentication using TLS, user could select certificate and key,
* secure element or digital signature peripheral if available.
*
*/
struct authentication_t {
const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
authentication is not needed. Must be provided with `key`. It's not copied nor freed by the client, user needs to clean up.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
is not needed. If it is not NULL, also `certificate` has to be provided. It's not copied nor freed by the client, user needs to clean up.*/
size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
`key_password_len` must be correctly set.*/
int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
available in some Espressif devices. It's not copied nor freed by the client, user needs to clean up.*/
} authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */
/**
* *MQTT* Session related configuration
*/
struct session_t {
/**
* Last Will and Testament message configuration.
*/
struct last_will_t {
const char *topic; /*!< LWT (Last Will and Testament) message topic */
const char *msg; /*!< LWT message, may be NULL terminated*/
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
int qos; /*!< LWT message QoS */
int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds
When configuring this value, keep in mind that the client attempts
to communicate with the broker at half the interval that is actually set.
This conservative approach allows for more attempts before the broker's timeout occurs */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
by default. Note: setting the config value `keepalive` to `0` doesn't disable
keepalive feature, but uses a default keepalive period */
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
} session; /*!< *MQTT* session configuration. */
/**
* Network related configuration
*/
struct network_t {
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
} session; /*!< *MQTT* session configuration. */
/**
* Network related configuration
*/
struct network_t {
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
disabled (defaults to 10s) */
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
(defaults to 10s). */
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
`disable_auto_reconnect=true` to disable */
} network; /*!< Network configuration */
/**
* Client task configuration
*/
struct task_t {
int priority; /*!< *MQTT* task priority*/
int stack_size; /*!< *MQTT* task stack size*/
} task; /*!< FreeRTOS task configuration.*/
/**
* Client buffer size configuration
*
* Client have two buffers for input and output respectivelly.
*/
struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
esp_transport_handle_t transport; /*!< Custom transport handle to use. Warning: The transport should be valid during the client lifetime and is destroyed when esp_mqtt_client_destroy is called. */
struct ifreq * if_name; /*!< The name of interface for data to go through. Use the default interface without setting */
} network; /*!< Network configuration */
/**
* Client task configuration
*/
struct task_t {
int priority; /*!< *MQTT* task priority*/
int stack_size; /*!< *MQTT* task stack size*/
} task; /*!< FreeRTOS task configuration.*/
/**
* Client buffer size configuration
*
* Client have two buffers for input and output respectivelly.
*/
struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
``buffer_size`` */
} buffer; /*!< Buffer size configuration.*/
} buffer; /*!< Buffer size configuration.*/
/**
* Client outbox configuration options.
*/
struct outbox_config_t {
uint64_t limit; /*!< Size limit for the outbox in bytes.*/
} outbox; /*!< Outbox configuration. */
} esp_mqtt_client_config_t;
/**
* Topic definition struct
*/
typedef struct topic_t {
const char *filter; /*!< Topic filter to subscribe */
int qos; /*!< Max QoS level of the subscription */
} esp_mqtt_topic_t;
/**
* @brief Creates *MQTT* client handle based on the configuration
*
@ -417,6 +441,33 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
*/
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
#ifdef __cplusplus
#define esp_mqtt_client_subscribe esp_mqtt_client_subscribe_single
#else
/**
* @brief Convenience macro to select subscribe function to use.
*
* Notes:
* - Usage of `esp_mqtt_client_subscribe_single` is the same as previous
* esp_mqtt_client_subscribe, refer to it for details.
*
* @param client_handle *MQTT* client handle
* @param topic_type Needs to be char* for single subscription or `esp_mqtt_topic_t` for multiple topics
* @param qos_or_size It's either a qos when subscribing to a single topic or the size of the subscription array when subscribing to multiple topics.
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
char *: esp_mqtt_client_subscribe_single, \
const char *: esp_mqtt_client_subscribe_single, \
esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \
)(client_handle, topic_type, qos_or_size)
#endif /* __cplusplus*/
/**
* @brief Subscribe the client to defined topic with defined qos
*
@ -426,23 +477,46 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
* from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
*
* @param client *MQTT* client handle
* @param topic
* @param qos // TODO describe parameters
* @param topic topic filter to subscribe
* @param qos Max qos level of the subscription
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client,
const char *topic, int qos);
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
const char *topic, int qos);
/**
* @brief Subscribe the client to a list of defined topics with defined qos
*
* Notes:
* - Client must be connected to send subscribe message
* - This API is could be executed from a user task or
* from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
*
* @param client *MQTT* client handle
* @param topic_list List of topics to subscribe
* @param size size of topic_list
*
* @return message_id of the subscribe message on success
* -1 on failure
* -2 in case of full outbox.
*/
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size);
/**
* @brief Unsubscribe the client from defined topic
*
* Notes:
* - Client must be connected to send unsubscribe message
* - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details
* - It is thread safe, please refer to `esp_mqtt_client_subscribe_single` for details
*
* @param client *MQTT* client handle
* @param topic
@ -476,7 +550,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client,
* @param retain retain flag
*
* @return message_id of the publish message (for QoS 0 message_id will always
* be zero) on success. -1 on failure.
* be zero) on success. -1 on failure, -2 in case of full outbox.
*/
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
const char *data, int len, int qos, int retain);
@ -501,7 +575,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
* @param store if true, all messages are enqueued; otherwise only QoS 1 and
* QoS 2 are enqueued
*
* @return message_id if queued successfully, -1 otherwise
* @return message_id if queued successfully, -1 on failure, -2 in case of full outbox.
*/
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
const char *data, int len, int qos, int retain,
@ -524,6 +598,9 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client);
* @brief Set configuration structure, typically used when updating the config
* (i.e. on "before_connect" event
*
* Notes:
* - When calling this function make sure to have all the intendend configurations
* set, otherwise default values are set.
* @param client *MQTT* client handle
*
* @param config *MQTT* configuration structure
@ -548,9 +625,9 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client,
* ESP_OK on success
*/
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client,
esp_mqtt_event_id_t event,
esp_event_handler_t event_handler,
void *event_handler_arg);
esp_mqtt_event_id_t event,
esp_event_handler_t event_handler,
void *event_handler_arg);
/**
* @brief Unregisters mqtt event

View File

@ -64,5 +64,11 @@
#define MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
#endif
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
// Features supported in 5.1.0
#define MQTT_SUPPORTED_FEATURE_CRT_CMN_NAME
#endif
#endif /* ESP_IDF_VERSION */
#endif // _MQTT_SUPPORTED_FEATURES_H_

View File

@ -36,7 +36,8 @@ typedef struct {
mqtt5_topic_alias_handle_t peer_topic_alias;
} mqtt5_config_storage_t;
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client);
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client);

View File

@ -126,7 +126,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info);
esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property);
int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info);
mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);

View File

@ -7,6 +7,7 @@
#ifndef _MQTT_CLIENT_PRIV_H_
#define _MQTT_CLIENT_PRIV_H_
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
@ -38,6 +39,14 @@
extern "C" {
#endif
#if CONFIG_NEWLIB_NANO_FORMAT
#define NEWLIB_NANO_COMPAT_FORMAT PRIu32
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) (uint32_t)size_t_var
#else
#define NEWLIB_NANO_COMPAT_FORMAT "zu"
#define NEWLIB_NANO_COMPAT_CAST(size_t_var) size_t_var
#endif
#ifdef MQTT_DISABLE_API_LOCKS
# define MQTT_API_LOCK(c)
# define MQTT_API_UNLOCK(c)
@ -48,21 +57,16 @@ extern "C" {
typedef struct mqtt_state {
uint8_t *in_buffer;
uint8_t *out_buffer;
int in_buffer_length;
int out_buffer_length;
size_t message_length;
size_t in_buffer_read_len;
mqtt_message_t *outbound_message;
mqtt_connection_t mqtt_connection;
mqtt_connection_t connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
int pending_msg_count;
} mqtt_state_t;
typedef struct {
mqtt_event_callback_t event_handle;
esp_event_loop_handle_t event_loop_handle;
int task_stack;
int task_prio;
@ -89,9 +93,13 @@ typedef struct {
size_t clientkey_bytes;
const struct psk_key_hint *psk_hint_key;
bool skip_cert_common_name_check;
const char *common_name;
bool use_secure_element;
void *ds_data;
int message_retransmit_timeout;
uint64_t outbox_limit;
esp_transport_handle_t transport;
struct ifreq * if_name;
} mqtt_config_storage_t;
typedef enum {
@ -106,8 +114,7 @@ struct esp_mqtt_client {
esp_transport_handle_t transport;
mqtt_config_storage_t *config;
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
mqtt_client_state_t state;
_Atomic mqtt_client_state_t state;
uint64_t refresh_connection_tick;
int64_t keepalive_tick;
uint64_t reconnect_tick;

View File

@ -17,7 +17,12 @@
#endif
#define MQTT_RECON_DEFAULT_MS (10*1000)
#ifdef CONFIG_MQTT_POLL_READ_TIMEOUT_MS
#define MQTT_POLL_READ_TIMEOUT_MS CONFIG_MQTT_POLL_READ_TIMEOUT_MS
#else
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
#endif
#define MQTT_MSG_ID_INCREMENTAL CONFIG_MQTT_MSG_ID_INCREMENTAL
@ -95,6 +100,7 @@
#define MQTT_ENABLE_SSL CONFIG_MQTT_TRANSPORT_SSL
#define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET
#define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE
#define MQTT_DEFAULT_RETRANSMIT_TIMEOUT_MS 1000
#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE
#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE
@ -102,5 +108,11 @@
#define MQTT_EVENT_QUEUE_SIZE 1
#endif
#ifdef CONFIG_MQTT_OUTBOX_DATA_ON_EXTERNAL_MEMORY
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_SPIRAM
#else
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_DEFAULT
#endif
#define OUTBOX_MAX_SIZE (4*1024)
#endif

View File

@ -68,16 +68,6 @@ typedef struct mqtt_message {
size_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
} mqtt_message_t;
typedef struct mqtt_connection {
mqtt_message_t message;
#if MQTT_MSG_ID_INCREMENTAL
uint16_t last_message_id; /*!< last used id if incremental message id configured */
#endif
uint8_t *buffer;
size_t buffer_length;
} mqtt_connection_t;
typedef struct mqtt_connect_info {
char *client_id;
char *username;
@ -90,9 +80,18 @@ typedef struct mqtt_connect_info {
int will_retain;
int clean_session;
esp_mqtt_protocol_ver_t protocol_ver;
} mqtt_connect_info_t;
typedef struct mqtt_connection {
mqtt_message_t outbound_message;
#if MQTT_MSG_ID_INCREMENTAL
uint16_t last_message_id; /*!< last used id if incremental message id configured */
#endif
uint8_t *buffer;
size_t buffer_length;
mqtt_connect_info_t information;
} mqtt_connection_t;
static inline int mqtt_get_type(const uint8_t *buffer)
{
@ -123,7 +122,6 @@ static inline int mqtt_get_retain(const uint8_t *buffer)
return (buffer[0] & 0x01);
}
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length);
bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length);
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len);
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length);
@ -132,19 +130,20 @@ char *mqtt_get_suback_data(uint8_t *buffer, size_t *length);
uint16_t mqtt_get_id(uint8_t *buffer, size_t length);
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length);
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size);
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info);
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id);
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull));
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id);
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection);
#ifdef __cplusplus
}
#endif

View File

@ -14,7 +14,7 @@ extern "C" {
struct outbox_item;
typedef struct outbox_list_t *outbox_handle_t;
typedef struct outbox_t *outbox_handle_t;
typedef struct outbox_item *outbox_item_handle_t;
typedef struct outbox_message *outbox_message_handle_t;
typedef long long outbox_tick_t;
@ -42,8 +42,6 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
/**
@ -56,7 +54,7 @@ int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_t
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
pending_state_t outbox_item_get_pending(outbox_item_handle_t item);
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
int outbox_get_size(outbox_handle_t outbox);
uint64_t outbox_get_size(outbox_handle_t outbox);
void outbox_destroy(outbox_handle_t outbox);
void outbox_delete_all_items(outbox_handle_t outbox);

View File

@ -1,5 +1,6 @@
#include <string.h>
#include "mqtt5_msg.h"
#include "mqtt_client.h"
#include "mqtt_config.h"
#include "platform.h"
#include "esp_log.h"
@ -68,8 +69,8 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
generate_variable_len(len, &len_bytes, encoded_lens);
int offset = len_bytes - 1;
connection->message.length += offset;
if (connection->message.length > connection->buffer_length) {
connection->outbound_message.length += offset;
if (connection->outbound_message.length > connection->buffer_length) {
return -1;
}
@ -88,33 +89,33 @@ static int update_property_len_value(mqtt_connection_t *connection, size_t prope
static int append_property(mqtt_connection_t *connection, uint8_t property_type, uint8_t len_occupy, const char *data, size_t data_len)
{
if ((connection->message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
if ((connection->outbound_message.length + len_occupy + (data ? data_len : 0) + (property_type ? 1 : 0)) > connection->buffer_length) {
return -1;
}
size_t origin_message_len = connection->message.length;
size_t origin_message_len = connection->outbound_message.length;
if (property_type) {
connection->buffer[connection->message.length ++] = property_type;
connection->buffer[connection->outbound_message.length ++] = property_type;
}
if (len_occupy == 0) {
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
generate_variable_len(data_len, &len_bytes, encoded_lens);
for (int j = 0; j < len_bytes; j ++) {
connection->buffer[connection->message.length ++] = encoded_lens[j];
connection->buffer[connection->outbound_message.length ++] = encoded_lens[j];
}
} else {
for (int i = 1; i <= len_occupy; i ++) {
connection->buffer[connection->message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
connection->buffer[connection->outbound_message.length ++] = (data_len >> (8 * (len_occupy - i))) & 0xff;
}
}
if (data) {
memcpy(connection->buffer + connection->message.length, data, data_len);
connection->message.length += data_len;
memcpy(connection->buffer + connection->outbound_message.length, data, data_len);
connection->outbound_message.length += data_len;
}
return connection->message.length - origin_message_len;
return connection->outbound_message.length - origin_message_len;
}
static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id)
@ -129,36 +130,36 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
#endif
}
if (connection->message.length + 2 > connection->buffer_length) {
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], message_id)
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], message_id)
return message_id;
}
static int init_message(mqtt_connection_t *connection)
{
connection->message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
connection->outbound_message.length = MQTT5_MAX_FIXED_HEADER_SIZE;
return MQTT5_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
int message_length = connection->outbound_message.length - MQTT5_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
uint8_t encoded_lens[4] = {0}, len_bytes = 0;
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT5_MAX_FIXED_HEADER_SIZE;
}
// Encode MQTT message length
@ -170,10 +171,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT5_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs ++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
@ -181,7 +182,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
connection->buffer[offs ++] = encoded_lens[j];
}
return &connection->message;
return &connection->outbound_message;
}
static esp_err_t mqtt5_msg_set_user_property(mqtt5_user_property_handle_t *user_property, char *key, size_t key_len, char *value, size_t value_len)
@ -338,7 +339,7 @@ char *mqtt5_get_publish_property_payload(uint8_t *buffer, size_t buffer_length,
continue;
case MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL:
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(resp_property->message_expiry_interval, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
ESP_LOGD(TAG, "MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL %d", resp_property->message_expiry_interval);
ESP_LOGD(TAG, "MQTT5_PROPERTY_MESSAGE_EXPIRY_INTERVAL %"PRIu32, resp_property->message_expiry_interval);
continue;
case MQTT5_PROPERTY_TOPIC_ALIAS:
MQTT5_CONVERT_ONE_BYTE_TO_TWO(resp_property->topic_alias, property[property_offset ++], property[property_offset ++])
@ -464,24 +465,24 @@ char *mqtt5_get_puback_data(uint8_t *buffer, size_t *length, mqtt5_user_property
mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info, esp_mqtt5_connection_property_storage_t *property, esp_mqtt5_connection_will_property_storage_t *will_property)
{
init_message(connection);
connection->buffer[connection->message.length ++] = 0; // Variable header length MSB
connection->buffer[connection->outbound_message.length ++] = 0; // Variable header length MSB
/* Defaults to protocol version 5 values */
connection->buffer[connection->message.length ++] = 4; // Variable header length LSB
memcpy(&connection->buffer[connection->message.length], "MQTT", 4); // Protocol name
connection->message.length += 4;
connection->buffer[connection->message.length ++] = 5; // Protocol version
connection->buffer[connection->outbound_message.length ++] = 4; // Variable header length LSB
memcpy(&connection->buffer[connection->outbound_message.length], "MQTT", 4); // Protocol name
connection->outbound_message.length += 4;
connection->buffer[connection->outbound_message.length ++] = 5; // Protocol version
int flags_offset = connection->message.length;
connection->buffer[connection->message.length ++] = 0; // Flags
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->message.length ++], info->keepalive) // Keep-alive
int flags_offset = connection->outbound_message.length;
connection->buffer[connection->outbound_message.length ++] = 0; // Flags
MQTT5_CONVERT_TWO_BYTE(connection->buffer[connection->outbound_message.length ++], info->keepalive) // Keep-alive
if (info->clean_session) {
connection->buffer[flags_offset] |= MQTT5_CONNECT_FLAG_CLEAN_SESSION;
}
//Add properties
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property->session_expiry_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, property->session_expiry_interval), fail_message(connection));
}
@ -507,7 +508,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (info->client_id != NULL && info->client_id[0] != '\0') {
APPEND_CHECK(append_property(connection, 0, 2, info->client_id, strlen(info->client_id)), fail_message(connection));
@ -517,8 +518,8 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
//Add will properties
if (info->will_topic != NULL && info->will_topic[0] != '\0') {
properties_offset = connection->message.length;
connection->message.length ++;
properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (will_property->will_delay_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_WILL_DELAY_INTERVAL, 4, NULL, will_property->will_delay_interval), fail_message(connection));
}
@ -544,7 +545,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
APPEND_CHECK(append_property(connection, 0, 2, item->value, strlen(item->value)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(append_property(connection, 0, 2, info->will_topic, strlen(info->will_topic)), fail_message(connection));
APPEND_CHECK(append_property(connection, 0, 2, info->will_message, info->will_length), fail_message(connection));
@ -602,7 +603,7 @@ esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, m
switch (property_id) {
case MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL:
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(connection_property->session_expiry_interval, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
ESP_LOGD(TAG, "MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL %d", connection_property->session_expiry_interval);
ESP_LOGD(TAG, "MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL %"PRIu32, connection_property->session_expiry_interval);
continue;
case MQTT5_PROPERTY_RECEIVE_MAXIMUM:
MQTT5_CONVERT_ONE_BYTE_TO_TWO(resp_property->receive_maximum, property[property_offset ++], property[property_offset ++])
@ -618,7 +619,7 @@ esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, m
continue;
case MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE:
MQTT5_CONVERT_ONE_BYTE_TO_FOUR(resp_property->maximum_packet_size, property[property_offset ++], property[property_offset ++], property[property_offset ++], property[property_offset ++])
ESP_LOGD(TAG, "MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE %d", resp_property->maximum_packet_size);
ESP_LOGD(TAG, "MQTT5_PROPERTY_MAXIMUM_PACKET_SIZE %"PRIu32, resp_property->maximum_packet_size);
continue;
case MQTT5_PROPERTY_ASSIGNED_CLIENT_IDENTIFIER:
MQTT5_CONVERT_ONE_BYTE_TO_TWO(len, property[property_offset ++], property[property_offset ++])
@ -741,8 +742,8 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
*message_id = 0;
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->payload_format_indicator) {
@ -760,11 +761,11 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
char *response_topic = calloc(1, response_topic_size);
if (!response_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", response_topic_size);
fail_message(connection);
return fail_message(connection);
}
snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info);
if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(response_topic);
return fail_message(connection);
}
@ -787,20 +788,20 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_CONTENT_TYPE, 2, property->content_type, strlen(property->content_type)), fail_message(connection));
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (connection->message.length + data_length > connection->buffer_length) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
} else {
if (data != NULL) {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
}
connection->message.fragmented_msg_total_length = 0;
connection->outbound_message.fragmented_msg_total_length = 0;
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
@ -849,20 +850,16 @@ int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length)
return -1;
}
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic_list, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
{
init_message(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->subscribe_id) {
@ -876,52 +873,58 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *t
}
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
char *shared_topic = calloc(1, shared_topic_size);
if (!shared_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
fail_message(connection);
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
free(shared_topic);
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
return fail_message(connection);
}
free(shared_topic);
} else {
APPEND_CHECK(append_property(connection, 0, 2, topic, strlen(topic)), fail_message(connection));
}
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic_list[topic_number].filter) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
char *shared_topic = calloc(1, shared_topic_size);
if (!shared_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
fail_message(connection);
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic_list[topic_number].filter);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(shared_topic);
return fail_message(connection);
}
free(shared_topic);
} else {
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
}
if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection);
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->outbound_message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->outbound_message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->outbound_message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->outbound_message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->outbound_message.length] |= (topic_list[topic_number].qos & 3);
connection->outbound_message.length ++;
}
connection->buffer[connection->message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->message.length] |= (qos & 3);
connection->message.length ++;
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info)
{
init_message(connection);
int reason_offset = connection->message.length;
connection->buffer[connection->message.length ++] = 0;
int properties_offset = connection->message.length;
connection->message.length ++;
int reason_offset = connection->outbound_message.length;
connection->buffer[connection->outbound_message.length ++] = 0;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (disconnect_property_info) {
if (disconnect_property_info->session_expiry_interval) {
APPEND_CHECK(append_property(connection, MQTT5_PROPERTY_SESSION_EXPIRY_INTERVAL, 4, NULL, disconnect_property_info->session_expiry_interval), fail_message(connection));
@ -937,7 +940,7 @@ mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_di
connection->buffer[reason_offset] = disconnect_property_info->disconnect_reason;
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
@ -953,8 +956,8 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
return fail_message(connection);
}
int properties_offset = connection->message.length;
connection->message.length ++;
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
if (property) {
if (property->user_property) {
mqtt5_user_property_item_t item;
@ -965,7 +968,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
char *shared_topic = calloc(1, shared_topic_size);
@ -975,7 +978,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(shared_topic);
return fail_message(connection);
}
@ -993,10 +996,10 @@ mqtt_message_t *mqtt5_msg_puback(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}
@ -1006,10 +1009,10 @@ mqtt_message_t *mqtt5_msg_pubrec(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}
@ -1019,10 +1022,10 @@ mqtt_message_t *mqtt5_msg_pubrel(mqtt_connection_t *connection, uint16_t message
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}
@ -1032,9 +1035,9 @@ mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t messag
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
connection->buffer[connection->message.length ++] = 0; // Regard it is success
int properties_offset = connection->message.length;
connection->message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
connection->buffer[connection->outbound_message.length ++] = 0; // Regard it is success
int properties_offset = connection->outbound_message.length;
connection->outbound_message.length ++;
APPEND_CHECK(update_property_len_value(connection, connection->outbound_message.length - properties_offset - 1, properties_offset), fail_message(connection));
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}

View File

@ -29,6 +29,7 @@
*
*/
#include <string.h>
#include "mqtt_client.h"
#include "mqtt_msg.h"
#include "mqtt_config.h"
#include "platform.h"
@ -47,14 +48,14 @@ enum mqtt_connect_flag {
static int append_string(mqtt_connection_t *connection, const char *string, int len)
{
if (connection->message.length + len + 2 > connection->buffer_length) {
if (connection->outbound_message.length + len + 2 > connection->buffer_length) {
return -1;
}
connection->buffer[connection->message.length++] = len >> 8;
connection->buffer[connection->message.length++] = len & 0xff;
memcpy(connection->buffer + connection->message.length, string, len);
connection->message.length += len;
connection->buffer[connection->outbound_message.length++] = len >> 8;
connection->buffer[connection->outbound_message.length++] = len & 0xff;
memcpy(connection->buffer + connection->outbound_message.length, string, len);
connection->outbound_message.length += len;
return len + 2;
}
@ -71,38 +72,38 @@ static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t messag
#endif
}
if (connection->message.length + 2 > connection->buffer_length) {
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}
connection->buffer[connection->message.length++] = message_id >> 8;
connection->buffer[connection->message.length++] = message_id & 0xff;
connection->buffer[connection->outbound_message.length++] = message_id >> 8;
connection->buffer[connection->outbound_message.length++] = message_id & 0xff;
return message_id;
}
static int init_message(mqtt_connection_t *connection)
static int set_message_header_size(mqtt_connection_t *connection)
{
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
int encoded_length = 0;
uint8_t encoded_lens[4] = {0};
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
}
// Encode MQTT message length
@ -123,10 +124,10 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
connection->outbound_message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
@ -134,14 +135,7 @@ static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int
connection->buffer[offs++] = encoded_lens[j];
}
return &connection->message;
}
void mqtt_msg_init(mqtt_connection_t *connection, uint8_t *buffer, size_t buffer_length)
{
memset(connection, 0, sizeof(mqtt_connection_t));
connection->buffer = buffer;
connection->buffer_length = buffer_length;
return &connection->outbound_message;
}
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len)
@ -346,7 +340,7 @@ uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
{
init_message(connection);
set_message_header_size(connection);
int header_len;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
@ -355,11 +349,11 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE;
}
if (connection->message.length + header_len > connection->buffer_length) {
if (connection->outbound_message.length + header_len > connection->buffer_length) {
return fail_message(connection);
}
char *variable_header = (char *)(connection->buffer + connection->message.length);
connection->message.length += header_len;
char *variable_header = (char *)(connection->buffer + connection->outbound_message.length);
connection->outbound_message.length += header_len;
int header_idx = 0;
variable_header[header_idx++] = 0; // Variable header length MSB
@ -444,7 +438,7 @@ mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_inf
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
@ -466,25 +460,25 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
*message_id = 0;
}
if (connection->message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
} else {
if (data != NULL) {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
if (data != NULL) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
} else {
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
connection->outbound_message.fragmented_msg_total_length = 0;
}
connection->message.fragmented_msg_total_length = 0;
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -493,7 +487,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -502,7 +496,7 @@ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
@ -511,40 +505,43 @@ mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
{
init_message(connection);
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
{
init_message(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
set_message_header_size(connection);
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}
if (append_string(connection, topic, strlen(topic)) < 0) {
return fail_message(connection);
}
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
return fail_message(connection);
}
if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection);
if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) {
return fail_message(connection);
}
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos;
connection->outbound_message.length ++;
}
connection->buffer[connection->message.length++] = qos;
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
{
init_message(connection);
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
@ -563,19 +560,19 @@ mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection)
{
init_message(connection);
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
@ -618,3 +615,23 @@ int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length)
return 0;
}
}
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size)
{
memset(&connection->outbound_message, 0, sizeof(mqtt_message_t));
connection->buffer = (uint8_t *)calloc(buffer_size, sizeof(uint8_t));
if (!connection->buffer) {
return ESP_ERR_NO_MEM;
}
connection->buffer_length = buffer_size;
return ESP_OK;
}
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection)
{
if (connection) {
free(connection->buffer);
}
}

View File

@ -1,7 +1,10 @@
#include "mqtt_outbox.h"
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "mqtt_config.h"
#include "sys/queue.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#ifndef CONFIG_MQTT_CUSTOM_OUTBOX
@ -20,12 +23,19 @@ typedef struct outbox_item {
STAILQ_HEAD(outbox_list_t, outbox_item);
struct outbox_t {
_Atomic uint64_t size;
struct outbox_list_t *list;
};
outbox_handle_t outbox_init(void)
{
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t));
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t));
ESP_MEM_CHECK(TAG, outbox, return NULL);
STAILQ_INIT(outbox);
outbox->list = calloc(1, sizeof(struct outbox_list_t));
ESP_MEM_CHECK(TAG, outbox->list, {free(outbox); return NULL;});
outbox->size = 0;
STAILQ_INIT(outbox->list);
return outbox;
}
@ -39,7 +49,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
item->tick = tick;
item->len = message->len + message->remaining_len;
item->pending = QUEUED;
item->buffer = malloc(message->len + message->remaining_len);
item->buffer = heap_caps_malloc(message->len + message->remaining_len, MQTT_OUTBOX_MEMORY);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
return NULL;
@ -48,15 +58,16 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl
if (message->remaining_data) {
memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
}
STAILQ_INSERT_TAIL(outbox, item, next);
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
STAILQ_INSERT_TAIL(outbox->list, item, next);
outbox->size += item->len;
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%"PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
return item;
}
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item->msg_id == msg_id) {
return item;
}
@ -67,7 +78,7 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item->pending == pending) {
if (tick) {
*tick = item->tick;
@ -81,9 +92,10 @@ outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pend
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (item == item_to_delete) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
return ESP_OK;
@ -107,31 +119,20 @@ uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox));
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%"PRIu64, msg_id, msg_type, outbox_get_size(outbox));
return ESP_OK;
}
}
return ESP_FAIL;
}
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_id == msg_id) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
}
}
return ESP_OK;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
@ -160,27 +161,15 @@ esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick
return ESP_FAIL;
}
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_type == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
}
}
return ESP_OK;
}
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
int msg_id = -1;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
STAILQ_FOREACH(item, outbox->list, next) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
msg_id = item->msg_id;
free(item);
return msg_id;
@ -194,10 +183,11 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
{
int deleted_items = 0;
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
free(item);
deleted_items ++;
}
@ -206,23 +196,17 @@ int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, ou
return deleted_items;
}
int outbox_get_size(outbox_handle_t outbox)
uint64_t outbox_get_size(outbox_handle_t outbox)
{
int siz = 0;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
// Suppressing "use after free" warning as this could happen only if queue is in inconsistent state
// which never happens if STAILQ interface used
siz += item->len; // NOLINT(clang-analyzer-unix.Malloc)
}
return siz;
return outbox->size;
}
void outbox_delete_all_items(outbox_handle_t outbox)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
}
@ -230,6 +214,7 @@ void outbox_delete_all_items(outbox_handle_t outbox)
void outbox_destroy(outbox_handle_t outbox)
{
outbox_delete_all_items(outbox);
free(outbox->list);
free(outbox);
}

View File

@ -3,6 +3,7 @@
#ifdef ESP_PLATFORM
#include "esp_log.h"
#include "esp_mac.h"
#include "soc/soc_caps.h"
#include "esp_timer.h"
#include "esp_random.h"
#include <stdlib.h>
@ -12,13 +13,25 @@ static const char *TAG = "platform";
#define MAX_ID_STRING (32)
#if defined SOC_WIFI_SUPPORTED
#define MAC_TYPE ESP_MAC_WIFI_STA
#elif defined SOC_EMAC_SUPPORTED
#define MAC_TYPE ESP_MAC_ETH
#elif defined SOC_IEEE802154_SUPPORTED
#define MAC_TYPE ESP_MAC_IEEE802154
#endif
char *platform_create_id_string(void)
{
uint8_t mac[6];
char *id_string = calloc(1, MAX_ID_STRING);
ESP_MEM_CHECK(TAG, id_string, return NULL);
esp_read_mac(mac, ESP_MAC_WIFI_STA);
#ifndef MAC_TYPE
ESP_LOGW(TAG, "Soc doesn't provide MAC, client could be disconnected in case of device with same name in the broker.");
sprintf(id_string, "esp_mqtt_client_id");
#else
uint8_t mac[6];
esp_read_mac(mac, MAC_TYPE);
sprintf(id_string, "ESP32_%02x%02X%02X", mac[3], mac[4], mac[5]);
#endif
return id_string;
}

View File

@ -16,23 +16,26 @@ static char *esp_mqtt5_client_get_topic_alias(mqtt5_topic_alias_handle_t topic_a
static void esp_mqtt5_client_delete_topic_alias(mqtt5_topic_alias_handle_t topic_alias_handle);
static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_property_new, const mqtt5_user_property_handle_t user_property_old);
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client)
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
int msg_type = mqtt5_get_type(client->mqtt_state.outbound_message->data);
if (msg_type == MQTT_MSG_TYPE_PUBLISH) {
int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data);
if (msg_qos > 0) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
}
}
bool msg_dup = mqtt5_get_dup(client->mqtt_state.connection.outbound_message.data);
if (msg_dup == false) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
}
}
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
{
if (client->send_publish_packet_count > 0) {
client->send_publish_packet_count --;
ESP_LOGD(TAG, "Receive (%d) qos > 0 publish packet with ack", client->send_publish_packet_count);
}
}
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBCOMP return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_pubcomp_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -44,20 +47,19 @@ void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_puback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.current_data_offset = 0;
client->send_publish_packet_count --;
}
}
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_UNSUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
client->event.data = mqtt5_get_unsuback_data(client->mqtt_state.in_buffer, &msg_data_len, &client->event.property->user_property);
@ -69,7 +71,7 @@ void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client)
void esp_mqtt5_parse_suback(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
ESP_LOGI(TAG, "MQTT_MSG_TYPE_SUBACK return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
}
}
@ -79,13 +81,14 @@ esp_err_t esp_mqtt5_parse_connack(esp_mqtt5_client_handle_t client, int *connect
size_t len = client->mqtt_state.in_buffer_read_len;
client->mqtt_state.in_buffer_read_len = 0;
uint8_t ack_flag = 0;
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->connect_info, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
if (mqtt5_msg_parse_connack_property(client->mqtt_state.in_buffer, len, &client->mqtt_state.
connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->server_resp_property_info, connect_rsp_code, &ack_flag, &client->event.property->user_property) != ESP_OK) {
ESP_LOGE(TAG, "Failed to parse CONNACK packet");
return ESP_FAIL;
}
if (*connect_rsp_code == MQTT_CONNECTION_ACCEPTED) {
ESP_LOGD(TAG, "Connected");
client->event.session_present = ack_flag & 0x01;
client->event.session_present = ack_flag & 0x01;
return ESP_OK;
}
esp_mqtt5_print_error_code(client, *connect_rsp_code);
@ -98,7 +101,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
uint16_t property_len = 0;
esp_mqtt5_publish_resp_property_t property = {0};
*msg_data = mqtt5_get_publish_property_payload(msg_buf, msg_read_len, msg_topic, msg_topic_len, &property, &property_len, msg_data_len, &client->event.property->user_property);
if (*msg_data_len == 0 || *msg_data == NULL) {
if (*msg_data == NULL) {
ESP_LOGE(TAG, "%s: mqtt5_get_publish_property_payload() failed", __func__);
return ESP_FAIL;
}
@ -114,7 +117,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
*msg_topic = esp_mqtt5_client_get_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, msg_topic_len);
if (!*msg_topic) {
ESP_LOGE(TAG, "%s: esp_mqtt5_client_get_topic_alias() failed", __func__);
return ESP_FAIL;
return ESP_FAIL;
}
} else {
if (esp_mqtt5_client_update_topic_alias(client->mqtt5_config->peer_topic_alias, property.topic_alias, *msg_topic, *msg_topic_len) != ESP_OK) {
@ -131,12 +134,13 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
client->event.property->correlation_data_len = property.correlation_data_len;
client->event.property->content_type = property.content_type;
client->event.property->content_type_len = property.content_type_len;
client->event.property->subscribe_id = property.subscribe_id;
return ESP_OK;
}
esp_err_t esp_mqtt5_create_default_config(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
client->event.property = calloc(1, sizeof(esp_mqtt5_event_property_t));
ESP_MEM_CHECK(TAG, client->event.property, return ESP_FAIL)
client->mqtt5_config = calloc(1, sizeof(mqtt5_config_storage_t));
@ -169,7 +173,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_UNSUPPORTED_PROTOCOL_VER:
ESP_LOGW(TAG, "Unsupported Protocol Version");
break;
case MQTT5_INVAILD_CLIENT_ID:
case MQTT5_INVALID_CLIENT_ID:
ESP_LOGW(TAG, "Client Identifier not valid");
break;
case MQTT5_BAD_USERNAME_OR_PWD:
@ -199,10 +203,10 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_SESSION_TAKEN_OVER:
ESP_LOGW(TAG, "Session taken over");
break;
case MQTT5_TOPIC_FILTER_INVAILD:
case MQTT5_TOPIC_FILTER_INVALID:
ESP_LOGW(TAG, "Topic Filter invalid");
break;
case MQTT5_TOPIC_NAME_INVAILD:
case MQTT5_TOPIC_NAME_INVALID:
ESP_LOGW(TAG, "Topic Name invalid");
break;
case MQTT5_PACKET_IDENTIFIER_IN_USE:
@ -214,7 +218,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_RECEIVE_MAXIMUM_EXCEEDED:
ESP_LOGW(TAG, "Receive Maximum exceeded");
break;
case MQTT5_TOPIC_ALIAS_INVAILD:
case MQTT5_TOPIC_ALIAS_INVALID:
ESP_LOGW(TAG, "Topic Alias invalid");
break;
case MQTT5_PACKET_TOO_LARGE:
@ -229,7 +233,7 @@ static void esp_mqtt5_print_error_code(esp_mqtt5_client_handle_t client, int cod
case MQTT5_ADMINISTRATIVE_ACTION:
ESP_LOGW(TAG, "Administrative action");
break;
case MQTT5_PAYLOAD_FORMAT_INVAILD:
case MQTT5_PAYLOAD_FORMAT_INVALID:
ESP_LOGW(TAG, "Payload format invalid");
break;
case MQTT5_RETAIN_NOT_SUPPORT:
@ -291,7 +295,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
}
/* Flow control to check PUBLISH(No PUBACK or PUBCOMP received) packet sent count(Only record QoS1 and QoS2)*/
if (client->send_publish_packet_count >= client->mqtt5_config->server_resp_property_info.receive_maximum) {
if (client->send_publish_packet_count > client->mqtt5_config->server_resp_property_info.receive_maximum) {
ESP_LOGE(TAG, "Client send more than %d QoS1 and QoS2 PUBLISH packet without no ack", client->mqtt5_config->server_resp_property_info.receive_maximum);
return ESP_FAIL;
}
@ -301,7 +305,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
void esp_mqtt5_client_destory(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt5_config) {
free(client->mqtt5_config->will_property_info.content_type);
free(client->mqtt5_config->will_property_info.response_topic);
@ -413,7 +417,7 @@ esp_err_t esp_mqtt5_client_set_publish_property(esp_mqtt5_client_handle_t client
MQTT_API_LOCK(client);
/* Check protocol version */
if(client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -442,7 +446,7 @@ esp_err_t esp_mqtt5_client_set_subscribe_property(esp_mqtt5_client_handle_t clie
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -479,7 +483,7 @@ esp_err_t esp_mqtt5_client_set_unsubscribe_property(esp_mqtt5_client_handle_t cl
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -510,7 +514,7 @@ esp_err_t esp_mqtt5_client_set_disconnect_property(esp_mqtt5_client_handle_t cli
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -553,7 +557,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
MQTT_API_LOCK(client);
/* Check protocol version */
if (client->connect_info.protocol_ver != MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
ESP_LOGE(TAG, "MQTT protocol version is not v5");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
@ -569,7 +573,7 @@ esp_err_t esp_mqtt5_client_set_connect_property(esp_mqtt5_client_handle_t client
return ESP_FAIL;
} else {
client->mqtt5_config->connect_property_info.maximum_packet_size = connect_property->maximum_packet_size;
}
}
} else {
client->mqtt5_config->connect_property_info.maximum_packet_size = client->mqtt_state.in_buffer_length;
}
@ -754,4 +758,4 @@ void esp_mqtt5_client_delete_user_property(mqtt5_user_property_handle_t user_pro
}
}
free(user_property);
}
}

File diff suppressed because it is too large Load Diff