Compare commits

...

28 Commits

Author SHA1 Message Date
490bde044f ci: Fix Jira sync action (action setting based on standard boilerplate) 2023-05-24 15:23:21 +02:00
0e4cec9497 Merge branch 'ci/fix_qemu_build' into 'master'
ci: Fix qemu build against 5.1

See merge request espressif/esp-mqtt!170
2023-05-04 16:01:19 +08:00
94defb867e ci: Fix qemu build against 5.1
Also adds build against v5.1 and master separetely
2023-05-03 15:48:41 +02:00
14b11ad07e Merge branch 'merge_enqueue' into 'master'
Minor cleanups on mqtt client

See merge request espressif/esp-mqtt!168
2023-04-21 15:11:48 +08:00
da6d38a17e Removes pending message count
The information was used only to log remaining messages on debug log.
It was checked on writing but updated prior to every call making the
verification meaningless.
2023-04-21 08:40:12 +02:00
5729048683 Bugfix: Dispatch transport error on all write operations
- During connect the error wasn't dispatched.
- Merged esp_mqtt_write with mqtt_write_data since the only difference
  was the error dispatched.
2023-04-20 09:29:33 +02:00
72833c7f8a Merge enqueue functions
- enqueue and oversized enqueue did the same work with small differences
  this clean up the extra unnecessary function
2023-04-20 09:29:33 +02:00
0b315a01b1 Merge branch 'feature/outbox_memory_selection' into 'master'
Adds a configuration for outbox data destination

See merge request espressif/esp-mqtt!166
2023-04-13 15:20:43 +08:00
2c71f9e69b feat: Adds a configuration for outbox data destination
Allow user to move outbox data to external SPI RAM.
2023-04-12 08:43:40 +00:00
6bcd906b8a Merge branch 'ci/fix_qemu_test' into 'master'
CI: Add configuration for ttfw

See merge request espressif/esp-mqtt!167
2023-04-12 16:43:06 +08:00
d71dcf372a CI: Add configuration for ttfw
File was removed from idf. Adding it here to fix CI before we move the
tests to pytest embedded
2023-04-12 08:49:23 +02:00
9c1826d152 Merge branch 'bugfix/fix_mqtt5_flow_control' into 'master'
mqtt5: Fix flow control will increase count when send fragmented packet

See merge request espressif/esp-mqtt!164
2023-03-29 20:36:13 +08:00
5cce2c4f35 mqtt5: Fix flow control will increase count when send fragmented packet
Closes https://github.com/espressif/esp-mqtt/issues/255
2023-03-17 16:55:25 +08:00
9f2db7b4b6 Merge branch 'bugfix/queue_license' into 'master'
Add license information to queue

See merge request espressif/esp-mqtt!163
2023-03-09 14:37:30 +08:00
36f0faa80d Add license information to queue
File was copied from BSD header without the license information.
2023-03-02 08:02:57 +01:00
7089fac9c0 Merge branch 'prs/mqtt5_fixes' into 'master'
MQTTv5: Fixes and additions from GitHub PRs

See merge request espressif/esp-mqtt!162
2023-02-28 16:25:53 +08:00
65a4fdaff5 fix: Allow MQTT v5 zero length payload
Merges https://github.com/espressif/esp-mqtt/pull/250
2023-02-27 12:46:22 +01:00
1011e63cbe feature: Include subscribe_id in esp_mqtt5_event_property_t 2023-02-09 18:44:47 +00:00
5a156f56b4 Merge branch 'feature/multiple_subscribe' into 'master'
Feature:  Enable SUBSCRIBE to multiple topics

See merge request espressif/esp-mqtt!156
2023-02-08 15:35:31 +08:00
32102558d3 Feature: Enable SUBSCRIBE to multiple topics
- Adds an api for multiple topics on SUBSCRIBE message.

Apply 2 suggestion(s) to 1 file(s)

Removing headers

y
2023-02-03 14:19:24 +01:00
5adbe11aaf Merge branch 'feature/config_read_poll' into 'master'
Adds Kconfig option to configure poll read timeout

See merge request espressif/esp-mqtt!159
2023-02-02 20:52:02 +08:00
2fa945d0b8 Adds Kconfig option to configure poll read timeout
A new Kconfig option was added to allow users to configure poll read
timeout.

Closes: https://github.com/espressif/esp-mqtt/issues/245
2023-02-02 20:52:02 +08:00
86d21e4902 Merge branch 'bugfix/nano_printf_format' into 'master'
Fix formatting when using printf nano

See merge request espressif/esp-mqtt!160
2023-02-02 20:24:24 +08:00
e9b865eb9d Fix formatting when using printf nano 2023-02-02 20:24:24 +08:00
acab02f2c5 Merge branch 'bugfix/fix_mqtt5_flow_control' into 'master'
mqtt5: Fix flow control will regard the DUP packet and not consider PUBCOMP packet

See merge request espressif/esp-mqtt!158
2023-01-18 16:03:31 +08:00
ed76036744 mqtt5: Fix flow control will regard the DUP packet and not consider PUBCOMP packet
Closes https://github.com/espressif/esp-mqtt/issues/243
2023-01-18 14:16:28 +08:00
c96f6f804c Merge branch 'bugfix/coverity_fix' into 'master'
Remove possible null pointer dereferences

See merge request espressif/esp-mqtt!157
2023-01-04 20:26:27 +08:00
f80772b8d7 Bugfix: Remove Remove possible null pointer dereferences
- Removed a possible derefrence on data in case of MQTT5 SUBACK with
  MQTT5 disabled.
- Covered a case of NULL data on message with negative size.
- Use correct type on calloc for alpn_protos
- Changed strcasecmp to strncasecmp.
2022-12-15 13:02:46 +01:00
17 changed files with 406 additions and 289 deletions

View File

@ -1,16 +1,25 @@
name: Sync PRs to JIRA
name: Sync remain PRs to Jira
# This workflow will be triggered when a pull request is opened
on: pull_request
# This workflow will be triggered every hour, to sync remaining PRs (i.e. PRs with zero comment) to Jira project
# Note that, PRs can also get synced when new PR comment is created
on:
schedule:
- cron: "0 * * * *"
# Limit to single concurrent run for workflows which can create Jira issues.
# Same concurrency group is used in issue_comment.yml
concurrency: jira_issues
jobs:
sync_prs_to_jira:
name: Sync PRs to Jira
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2
- name: Sync PRs to Jira project
uses: espressif/github-actions/sync_issues_to_jira@master
with:
cron_job: true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
JIRA_PASS: ${{ secrets.JIRA_PASS }}

View File

@ -40,6 +40,10 @@ build_idf_v5.0:
extends: .build_template
image: espressif/idf:release-v5.0
build_idf_v5.1:
extends: .build_template
image: espressif/idf:release-v5.1
build_idf_latest:
extends: .build_template
image: espressif/idf:latest
@ -55,7 +59,7 @@ build_and_test_qemu:
- export IDF_PATH=$CI_PROJECT_DIR/esp-idf
- git clone "${IDF_REPO}"
# switch to IDF and setup the tools
- $MQTT_PATH/ci/set_idf.sh master
- $MQTT_PATH/ci/set_idf.sh release/v5.1
- $IDF_PATH/tools/idf_tools.py install-python-env
- cd $IDF_PATH && tools/idf_tools.py --non-interactive install && eval "$(tools/idf_tools.py --non-interactive export)"
# Remove `debug_backend` and Add `paho-mqtt` to the required packages
@ -74,7 +78,7 @@ build_and_test_qemu:
- export MQTT_PUBLISH_MSG_len_1=2 MQTT_PUBLISH_MSG_repeat_1=50
- export MQTT_PUBLISH_MSG_len_2=128 MQTT_PUBLISH_MSG_repeat_2=2
- export MQTT_PUBLISH_MSG_len_3=20 MQTT_PUBLISH_MSG_repeat_3=20
- python Runner.py $TEST_PATH -c $TEST_PATH/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
- python Runner.py $TEST_PATH -c $MQTT_PATH/ci/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
push_master_to_github:
stage: deploy

14
Kconfig
View File

@ -124,6 +124,13 @@ menu "ESP-MQTT Configurations"
help
MQTT task priority. Higher number denotes higher priority.
config MQTT_POLL_READ_TIMEOUT_MS
int "MQTT transport poll read timeut"
default 1000
depends on MQTT_USE_CUSTOM_CONFIG
help
Timeout when polling underlying transport for read.
config MQTT_EVENT_QUEUE_SIZE
int "Number of queued events."
default 1
@ -145,6 +152,13 @@ menu "ESP-MQTT Configurations"
bool "Core 1"
endchoice
config MQTT_OUTBOX_DATA_ON_EXTERNAL_MEMORY
bool "Use external memory for outbox data"
default n
depends on MQTT_USE_CUSTOM_CONFIG
help
Set to true to use external memory for outbox data.
config MQTT_CUSTOM_OUTBOX
bool "Enable custom outbox implementation"
default n

View File

@ -0,0 +1,7 @@
CaseConfig:
- name: test_app_protocol_mqtt_publish_connect
overwrite:
dut:
class: ESP32QEMUDUT
package: ttfw_idf

View File

@ -1,3 +1,8 @@
/*
* SPDX-FileCopyrightText: 1991-1993 The Regents of the University of California
*
* SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once
/* Implementation from BSD headers*/

View File

@ -138,6 +138,7 @@ typedef struct {
uint16_t correlation_data_len; /*!< Correlation data length of the message */
char *content_type; /*!< Content type of the message */
int content_type_len; /*!< Content type length of the message */
uint16_t subscribe_id; /*!< Subscription identifier of the message */
mqtt5_user_property_handle_t user_property; /*!< The handle for user property, call function esp_mqtt5_client_delete_user_property to free the memory */
} esp_mqtt5_event_property_t;

View File

@ -32,7 +32,7 @@ typedef struct esp_mqtt_client *esp_mqtt_client_handle_t;
* @brief *MQTT* event types.
*
* User event handler receives context data in `esp_mqtt_event_t` structure with
* - `client` - *MQTT* client handle
* - client - *MQTT* client handle
* - various other data depending on event type
*
*/
@ -223,132 +223,140 @@ typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
* character and the related len field set to 0. DER format requires a related len field set to the correct length.
*/
typedef struct esp_mqtt_client_config_t {
/**
* Broker related configuration
*/
struct broker_t {
/**
* Broker address
*
* - uri have precedence over other fields
* - If uri isn't set at least hostname, transport and port should.
*/
struct address_t {
const char *uri; /*!< Complete *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< Selects transport*/
const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker address configuration */
/**
* Broker identity verification
*
* If fields are not set broker's identity isn't verified. it's recommended
* to set the options in this struct for security reasons.
*/
struct verification_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
/**
* Broker related configuration
*/
struct broker_t {
/**
* Broker address
*
* - uri have precedence over other fields
* - If uri isn't set at least hostname, transport and port should.
*/
struct address_t {
const char *uri; /*!< Complete *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< Selects transport*/
const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker address configuration */
/**
* Broker identity verification
*
* If fields are not set broker's identity isn't verified. it's recommended
* to set the options in this struct for security reasons.
*/
struct verification_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
documentation for details. */
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
the usage of certificate bundles. */
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
authentication (as alternative to certificate verification).
PSK is enabled only if there are no other ways to
verify broker.*/
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
security of TLS and makes the *MQTT* client susceptible to MITM attacks */
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
} verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */
/**
* Client related credentials for authentication.
*/
struct credentials_t {
const char *username; /*!< *MQTT* username */
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
} verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */
/**
* Client related credentials for authentication.
*/
struct credentials_t {
const char *username; /*!< *MQTT* username */
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are
last 3 bytes of MAC address in hex format */
bool set_null_client_id; /*!< Selects a NULL client id */
/**
* Client authentication
*
* Fields related to client authentication by broker
*
* For mutual authentication using TLS, user could select certificate and key,
* secure element or digital signature peripheral if available.
*
*/
struct authentication_t {
const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
bool set_null_client_id; /*!< Selects a NULL client id */
/**
* Client authentication
*
* Fields related to client authentication by broker
*
* For mutual authentication using TLS, user could select certificate and key,
* secure element or digital signature peripheral if available.
*
*/
struct authentication_t {
const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
authentication is not needed. Must be provided with `key`.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
is not needed. If it is not NULL, also `certificate` has to be provided.*/
size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
`key_password_len` must be correctly set. */
int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
available in some Espressif devices. */
} authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */
/**
* *MQTT* Session related configuration
*/
struct session_t {
/**
* Last Will and Testament message configuration.
*/
struct last_will_t {
const char *topic; /*!< LWT (Last Will and Testament) message topic */
const char *msg; /*!< LWT message, may be NULL terminated*/
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
int qos; /*!< LWT message QoS */
int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
} authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */
/**
* *MQTT* Session related configuration
*/
struct session_t {
/**
* Last Will and Testament message configuration.
*/
struct last_will_t {
const char *topic; /*!< LWT (Last Will and Testament) message topic */
const char *msg; /*!< LWT message, may be NULL terminated*/
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
int qos; /*!< LWT message QoS */
int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
by default. Note: setting the config value `keepalive` to `0` doesn't disable
keepalive feature, but uses a default keepalive period */
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
} session; /*!< *MQTT* session configuration. */
/**
* Network related configuration
*/
struct network_t {
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
} session; /*!< *MQTT* session configuration. */
/**
* Network related configuration
*/
struct network_t {
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
disabled (defaults to 10s) */
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
(defaults to 10s). */
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
`disable_auto_reconnect=true` to disable */
} network; /*!< Network configuration */
/**
* Client task configuration
*/
struct task_t {
int priority; /*!< *MQTT* task priority*/
int stack_size; /*!< *MQTT* task stack size*/
} task; /*!< FreeRTOS task configuration.*/
/**
* Client buffer size configuration
*
* Client have two buffers for input and output respectivelly.
*/
struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
} network; /*!< Network configuration */
/**
* Client task configuration
*/
struct task_t {
int priority; /*!< *MQTT* task priority*/
int stack_size; /*!< *MQTT* task stack size*/
} task; /*!< FreeRTOS task configuration.*/
/**
* Client buffer size configuration
*
* Client have two buffers for input and output respectivelly.
*/
struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
``buffer_size`` */
} buffer; /*!< Buffer size configuration.*/
} buffer; /*!< Buffer size configuration.*/
} esp_mqtt_client_config_t;
/**
* Topic definition struct
*/
typedef struct topic_t {
const char *filter; /*!< Topic filter to subscribe */
int qos; /*!< Max QoS level of the subscription */
} esp_mqtt_topic_t;
/**
* @brief Creates *MQTT* client handle based on the configuration
*
@ -417,6 +425,26 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
*/
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
/**
* @brief Convenience macro to select subscribe function to use.
*
* Notes:
* - Usage of `esp_mqtt_client_subscribe_single` is the same as previous
* esp_mqtt_client_subscribe, refer to it for details.
*
* @param client_handle *MQTT* client handle
* @param topic_type Needs to be char* for single subscription or `esp_mqtt_topic_t` for multiple topics
* @param qos_or_size It's either a qos when subscribing to a single topic or the size of the subscription array when subscribing to multiple topics.
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
char *: esp_mqtt_client_subscribe_single, \
esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \
)(client_handle, topic_type, qos_or_size)
/**
* @brief Subscribe the client to defined topic with defined qos
*
@ -426,23 +454,44 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
* from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
*
* @param client *MQTT* client handle
* @param topic
* @param qos // TODO describe parameters
* @param topic topic filter to subscribe
* @param qos Max qos level of the subscription
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client,
const char *topic, int qos);
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
const char *topic, int qos);
/**
* @brief Subscribe the client to a list of defined topics with defined qos
*
* Notes:
* - Client must be connected to send subscribe message
* - This API is could be executed from a user task or
* from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
*
* @param client *MQTT* client handle
* @param topic_list List of topics to subscribe
* @param size size of topic_list
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size);
/**
* @brief Unsubscribe the client from defined topic
*
* Notes:
* - Client must be connected to send unsubscribe message
* - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details
* - It is thread safe, please refer to `esp_mqtt_client_subscribe_single` for details
*
* @param client *MQTT* client handle
* @param topic

View File

@ -36,7 +36,8 @@ typedef struct {
mqtt5_topic_alias_handle_t peer_topic_alias;
} mqtt5_config_storage_t;
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client);
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client);

View File

@ -126,7 +126,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info);
esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property);
int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info);
mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);

View File

@ -58,7 +58,6 @@ typedef struct mqtt_state {
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
int pending_msg_count;
} mqtt_state_t;
typedef struct {

View File

@ -17,7 +17,12 @@
#endif
#define MQTT_RECON_DEFAULT_MS (10*1000)
#ifdef CONFIG_MQTT_POLL_READ_TIMEOUT_MS
#define MQTT_POLL_READ_TIMEOUT_MS CONFIG_MQTT_POLL_READ_TIMEOUT_MS
#else
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
#endif
#define MQTT_MSG_ID_INCREMENTAL CONFIG_MQTT_MSG_ID_INCREMENTAL
@ -102,5 +107,11 @@
#define MQTT_EVENT_QUEUE_SIZE 1
#endif
#ifdef CONFIG_MQTT_OUTBOX_DATA_ON_EXTERNAL_MEMORY
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_SPIRAM
#else
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_DEFAULT
#endif
#define OUTBOX_MAX_SIZE (4*1024)
#endif

View File

@ -138,7 +138,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull));
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id);
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);

View File

@ -1,5 +1,6 @@
#include <string.h>
#include "mqtt5_msg.h"
#include "mqtt_client.h"
#include "mqtt_config.h"
#include "platform.h"
#include "esp_log.h"
@ -764,7 +765,7 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
}
snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info);
if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(response_topic);
return fail_message(connection);
}
@ -849,14 +850,10 @@ int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length)
return -1;
}
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic_list, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
{
init_message(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}
@ -877,41 +874,47 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *t
}
}
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
char *shared_topic = calloc(1, shared_topic_size);
if (!shared_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
fail_message(connection);
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
free(shared_topic);
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
return fail_message(connection);
}
free(shared_topic);
} else {
APPEND_CHECK(append_property(connection, 0, 2, topic, strlen(topic)), fail_message(connection));
}
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic_list[topic_number].filter) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
char *shared_topic = calloc(1, shared_topic_size);
if (!shared_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
fail_message(connection);
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic_list[topic_number].filter);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(shared_topic);
return fail_message(connection);
}
free(shared_topic);
} else {
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
}
if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection);
if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3);
connection->message.length ++;
}
connection->buffer[connection->message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->message.length] |= (qos & 3);
connection->message.length ++;
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
@ -975,7 +978,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(shared_topic);
return fail_message(connection);
}

View File

@ -29,6 +29,7 @@
*
*/
#include <string.h>
#include "mqtt_client.h"
#include "mqtt_msg.h"
#include "mqtt_config.h"
#include "platform.h"
@ -466,18 +467,18 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
*message_id = 0;
}
if (connection->message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
} else {
if (data != NULL) {
if (data != NULL) {
if (connection->message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
} else {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
connection->message.fragmented_msg_total_length = 0;
}
connection->message.fragmented_msg_total_length = 0;
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
@ -518,26 +519,29 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
{
init_message(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}
if (append_string(connection, topic, strlen(topic)) < 0) {
return fail_message(connection);
}
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
return fail_message(connection);
}
if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection);
if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) {
return fail_message(connection);
}
if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->message.length] = topic_list[topic_number].qos;
connection->message.length ++;
}
connection->buffer[connection->message.length++] = qos;
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}

View File

@ -1,7 +1,9 @@
#include "mqtt_outbox.h"
#include <stdlib.h>
#include <string.h>
#include "mqtt_config.h"
#include "sys/queue.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#ifndef CONFIG_MQTT_CUSTOM_OUTBOX
@ -31,7 +33,7 @@ outbox_handle_t outbox_init(void)
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
{
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
outbox_item_handle_t item = heap_caps_calloc(1, sizeof(outbox_item_t), MQTT_OUTBOX_MEMORY);
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;

View File

@ -16,17 +16,20 @@ static char *esp_mqtt5_client_get_topic_alias(mqtt5_topic_alias_handle_t topic_a
static void esp_mqtt5_client_delete_topic_alias(mqtt5_topic_alias_handle_t topic_alias_handle);
static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_property_new, const mqtt5_user_property_handle_t user_property_old);
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client)
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
int msg_type = mqtt5_get_type(client->mqtt_state.outbound_message->data);
if (msg_type == MQTT_MSG_TYPE_PUBLISH) {
int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data);
if (msg_qos > 0) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
}
}
bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data);
if (msg_dup == false) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
}
}
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
{
if (client->send_publish_packet_count > 0) {
client->send_publish_packet_count --;
ESP_LOGD(TAG, "Receive (%d) qos > 0 publish packet with ack", client->send_publish_packet_count);
}
}
@ -51,7 +54,6 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.current_data_offset = 0;
client->send_publish_packet_count --;
}
}
@ -98,7 +100,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
uint16_t property_len = 0;
esp_mqtt5_publish_resp_property_t property = {0};
*msg_data = mqtt5_get_publish_property_payload(msg_buf, msg_read_len, msg_topic, msg_topic_len, &property, &property_len, msg_data_len, &client->event.property->user_property);
if (*msg_data_len == 0 || *msg_data == NULL) {
if (*msg_data == NULL) {
ESP_LOGE(TAG, "%s: mqtt5_get_publish_property_payload() failed", __func__);
return ESP_FAIL;
}
@ -131,6 +133,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
client->event.property->correlation_data_len = property.correlation_data_len;
client->event.property->content_type = property.content_type;
client->event.property->content_type_len = property.content_type_len;
client->event.property->subscribe_id = property.subscribe_id;
return ESP_OK;
}
@ -291,7 +294,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
}
/* Flow control to check PUBLISH(No PUBACK or PUBCOMP received) packet sent count(Only record QoS1 and QoS2)*/
if (client->send_publish_packet_count >= client->mqtt5_config->server_resp_property_info.receive_maximum) {
if (client->send_publish_packet_count > client->mqtt5_config->server_resp_property_info.receive_maximum) {
ESP_LOGE(TAG, "Client send more than %d QoS1 and QoS2 PUBLISH packet without no ack", client->mqtt5_config->server_resp_property_info.receive_maximum);
return ESP_FAIL;
}

View File

@ -244,7 +244,7 @@ static esp_err_t esp_mqtt_check_cfg_conflict(const mqtt_config_storage_t *cfg, c
bool ssl_cfg_enabled = cfg->use_global_ca_store || cfg->cacert_buf || cfg->clientcert_buf || cfg->psk_hint_key || cfg->alpn_protos;
bool is_ssl_scheme = false;
if (cfg->scheme) {
is_ssl_scheme = (strcasecmp(cfg->scheme, MQTT_OVER_SSL_SCHEME) == 0) || (strcasecmp(cfg->scheme, MQTT_OVER_WSS_SCHEME) == 0);
is_ssl_scheme = (strncasecmp(cfg->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(cfg->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0);
}
if (!is_ssl_scheme && ssl_cfg_enabled) {
@ -287,12 +287,12 @@ static esp_err_t esp_mqtt_client_create_transport(esp_mqtt_client_handle_t clien
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, return ESP_ERR_NO_MEM);
if ((strcasecmp(client->config->scheme, MQTT_OVER_TCP_SCHEME) == 0) || (strcasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME) == 0)) {
if ((strncasecmp(client->config->scheme, MQTT_OVER_TCP_SCHEME, sizeof(MQTT_OVER_TCP_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0)) {
esp_transport_handle_t tcp = esp_transport_tcp_init();
ESP_MEM_CHECK(TAG, tcp, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, tcp, MQTT_OVER_TCP_SCHEME);
if (strcasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME) == 0) {
if (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
ESP_MEM_CHECK(TAG, ws, return ESP_ERR_NO_MEM);
@ -309,13 +309,13 @@ static esp_err_t esp_mqtt_client_create_transport(esp_mqtt_client_handle_t clien
ret = ESP_FAIL;
#endif
}
} else if ((strcasecmp(client->config->scheme, MQTT_OVER_SSL_SCHEME) == 0) || (strcasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME) == 0)) {
} else if ((strncasecmp(client->config->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0)) {
#if MQTT_ENABLE_SSL
esp_transport_handle_t ssl = esp_transport_ssl_init();
ESP_MEM_CHECK(TAG, ssl, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, ssl, MQTT_OVER_SSL_SCHEME);
if (strcasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME) == 0) {
if (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t wss = esp_transport_ws_init(ssl);
ESP_MEM_CHECK(TAG, wss, return ESP_ERR_NO_MEM);
@ -489,7 +489,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
client->config->num_alpn_protos++;
}
// mbedTLS expects the list to be null-terminated
client->config->alpn_protos = calloc(client->config->num_alpn_protos + 1, sizeof(config->broker.verification.alpn_protos));
client->config->alpn_protos = calloc(client->config->num_alpn_protos + 1, sizeof(*config->broker.verification.alpn_protos));
ESP_MEM_CHECK(TAG, client->config->alpn_protos, goto _mqtt_set_config_failed);
for (int i = 0; i < client->config->num_alpn_protos; i++) {
@ -641,8 +641,11 @@ static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
client->config->network_timeout_ms);
if (wlen < 0) {
ESP_LOGE(TAG, "Writing failed: errno=%d", errno);
esp_mqtt_client_dispatch_transport_error(client);
return ESP_FAIL;
} else if (wlen == 0) {
}
if (wlen == 0) {
ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno);
return ESP_ERR_TIMEOUT;
}
@ -709,6 +712,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
client->send_publish_packet_count = 0;
return ESP_OK;
}
#endif
@ -936,17 +940,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
return ESP_OK;
}
static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
{
if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_client_dispatch_transport_error(client);
return ESP_FAIL;
}
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_flow_control(client);
#endif
return ESP_OK;
}
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{
@ -1019,7 +1012,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
return ESP_FAIL;
}
ESP_LOGD(TAG, "%s: msg_topic_len=%zu", __func__, msg_topic_len);
ESP_LOGD(TAG, "%s: msg_topic_len=%"PRIu32, __func__, msg_topic_len);
// get payload
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
@ -1041,7 +1034,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
ESP_LOGD(TAG, "Get data len= %zu, topic len=%zu, total_data: %d offset: %zu", msg_data_len, msg_topic_len,
ESP_LOGD(TAG, "Get data len= %"PRIu32", topic len=%"PRIu32", total_data: %d offset: %"PRIu32, msg_data_len, msg_topic_len,
client->event.total_data_len, msg_data_offset);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = msg_data_len > 0 ? msg_data : NULL;
@ -1081,6 +1074,9 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property);
#else
// SUBACK Using MQTT5 received but MQTT5 is disabled, This is unlikely to happen.
return ESP_FAIL;
#endif
} else {
msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len);
@ -1095,8 +1091,11 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE;
client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED;
// post data event
if ((uint8_t)*msg_data == 0x80) {
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
for (int topic = 0; topic < msg_data_len; ++topic) {
if ((uint8_t)msg_data[topic] == 0x80) {
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
break;
}
}
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
@ -1108,53 +1107,33 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
return ESP_OK;
}
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
// Deletes the initial message in MQTT communication protocol
// Return false when message is not found, making the received counterpart invalid.
static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
{
ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count);
if (client->mqtt_state.pending_msg_count == 0) {
return false;
}
if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) {
client->mqtt_state.pending_msg_count --;
ESP_LOGD(TAG, "Removed pending_id=%d", client->mqtt_state.pending_msg_id);
return true;
}
ESP_LOGD(TAG, "Failed to remove pending_id=%d", client->mqtt_state.pending_msg_id);
return false;
}
static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
//unlock
}
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client)
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
if (client->mqtt_state.pending_msg_count > 0) {
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
return NULL;
}
@ -1214,7 +1193,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
}
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
ESP_LOGD(TAG, "%s: total message length: %d (already read: %zu)", __func__, total_len, client->mqtt_state.in_buffer_read_len);
ESP_LOGD(TAG, "%s: total message length: %d (already read: %"PRIu32")", __func__, total_len, client->mqtt_state.in_buffer_read_len);
client->mqtt_state.message_length = total_len;
if (client->mqtt_state.in_buffer_length < total_len) {
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
@ -1232,7 +1211,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu)",
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"PRIu32")",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
@ -1262,12 +1241,12 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu)",
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"PRIu32")",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
}
ESP_LOGD(TAG, "%s: transport_read():%zu %zu", __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
ESP_LOGD(TAG, "%s: transport_read():%"PRIu32" %"PRIu32, __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
return 1;
err:
esp_mqtt_client_dispatch_transport_error(client);
@ -1305,11 +1284,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
if (remove_initiator_message(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_suback(client);
#endif
ESP_LOGD(TAG, "deliver_suback, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
ESP_LOGD(TAG, "deliver_suback, message_length_read=%"PRIu32", message_length=%"PRIu32, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
if (deliver_suback(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id);
return ESP_FAIL;
@ -1317,7 +1296,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
if (remove_initiator_message(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_unsuback(client);
#endif
@ -1327,7 +1306,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
}
break;
case MQTT_MSG_TYPE_PUBLISH:
ESP_LOGD(TAG, "deliver_publish, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
ESP_LOGD(TAG, "deliver_publish, message_length_read=%"PRIu32", message_length=%"PRIu32, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
if (deliver_publish(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
return ESP_FAIL;
@ -1357,14 +1336,19 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (mqtt_write_data(client) != ESP_OK) {
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
return ESP_FAIL;
}
}
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_puback(client);
@ -1389,7 +1373,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
}
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
mqtt_write_data(client);
esp_mqtt_write(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
@ -1406,11 +1390,16 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
mqtt_write_data(client);
esp_mqtt_write(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_pubcomp(client);
@ -1447,17 +1436,25 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
}
// try to resend the data
if (mqtt_write_data(client) != ESP_OK) {
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to resend data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
// check if it was QoS-0 publish message
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
// delete all qos0 publish messages once we process them
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
if (client->mqtt_state.pending_publish_qos == 0) {
// delete all qos0 publish messages once we process them
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
}
} else if (client->mqtt_state.pending_publish_qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
}
}
return ESP_OK;
@ -1468,7 +1465,6 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
#if MQTT_REPORT_DELETED_MESSAGES
// also report the deleted items as MQTT_EVENT_DELETED events if enabled
int deleted_items = 0;
int msg_id = 0;
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) {
client->event.event_id = MQTT_EVENT_DELETED;
@ -1476,16 +1472,10 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
if (esp_mqtt_dispatch_event(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
}
deleted_items ++;
}
#else
int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
#endif
client->mqtt_state.pending_msg_count -= deleted_items;
if (client->mqtt_state.pending_msg_count < 0) {
client->mqtt_state.pending_msg_count = 0;
}
}
/**
@ -1746,7 +1736,7 @@ static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
ESP_LOGE(TAG, "Disconnect message cannot be created");
return ESP_FAIL;
}
if (mqtt_write_data(client) != ESP_OK) {
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending disconnect message");
}
return ESP_OK;
@ -1796,7 +1786,7 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
if (mqtt_write_data(client) != ESP_OK) {
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending ping");
return ESP_FAIL;
}
@ -1804,7 +1794,8 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
return ESP_OK;
}
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
@ -1818,13 +1809,19 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
}
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_client_subscribe_check(client, qos) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 subscribe check fail");
int max_qos = topic_list[0].qos;
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].qos > max_qos) {
max_qos = topic_list[topic_number].qos;
}
}
if (esp_mqtt5_client_subscribe_check(client, max_qos) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 subscribe check fail: QoS %d not accepted by broker ", max_qos);
MQTT_API_UNLOCK(client);
return -1;
}
client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
if (client->mqtt_state.outbound_message->length) {
client->mqtt5_config->subscribe_property_info = NULL;
@ -1832,7 +1829,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
#endif
} else {
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
topic_list, size,
&client->mqtt_state.pending_msg_id);
}
if (client->mqtt_state.outbound_message->length == 0) {
@ -1842,23 +1839,28 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
//move pending msg to outbox (if have)
if (!mqtt_enqueue(client)) {
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos);
MQTT_API_UNLOCK(client);
return -1;
}
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
ESP_LOGD(TAG, "Sent subscribe, first topic=%s, id: %d", topic_list[0].filter, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id;
}
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos)
{
esp_mqtt_topic_t user_topic = {.filter = topic, .qos = qos};
return esp_mqtt_client_subscribe_multiple(client, &user_topic, 1);
}
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
@ -1895,14 +1897,13 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
if (!mqtt_enqueue(client)) {
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error
if (mqtt_write_data(client) != ESP_OK) {
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK(client);
return -1;
@ -1944,15 +1945,14 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
client->mqtt_state.pending_msg_count ++;
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
if (!mqtt_enqueue(client)) {
if (!mqtt_enqueue(client, NULL, 0)) {
return -1;
}
} else {
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
return -1;
}
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
@ -2022,7 +2022,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
while (sending) {
if (mqtt_write_data(client) != ESP_OK) {
if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_abort_connection(client);
ret = -1;
goto cannot_publish;
@ -2056,6 +2056,11 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
}
if (qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
//Tick is set after transmit to avoid retransmitting too early due slow network speed / big messages
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);