forked from espressif/esp-mqtt
Compare commits
28 Commits
esp_tls_sk
...
fix/jira_s
Author | SHA1 | Date | |
---|---|---|---|
490bde044f | |||
0e4cec9497 | |||
94defb867e | |||
14b11ad07e | |||
da6d38a17e | |||
5729048683 | |||
72833c7f8a | |||
0b315a01b1 | |||
2c71f9e69b | |||
6bcd906b8a | |||
d71dcf372a | |||
9c1826d152 | |||
5cce2c4f35 | |||
9f2db7b4b6 | |||
36f0faa80d | |||
7089fac9c0 | |||
65a4fdaff5 | |||
1011e63cbe | |||
5a156f56b4 | |||
32102558d3 | |||
5adbe11aaf | |||
2fa945d0b8 | |||
86d21e4902 | |||
e9b865eb9d | |||
acab02f2c5 | |||
ed76036744 | |||
c96f6f804c | |||
f80772b8d7 |
17
.github/workflows/new_prs.yml
vendored
17
.github/workflows/new_prs.yml
vendored
@ -1,16 +1,25 @@
|
||||
name: Sync PRs to JIRA
|
||||
name: Sync remain PRs to Jira
|
||||
|
||||
# This workflow will be triggered when a pull request is opened
|
||||
on: pull_request
|
||||
# This workflow will be triggered every hour, to sync remaining PRs (i.e. PRs with zero comment) to Jira project
|
||||
# Note that, PRs can also get synced when new PR comment is created
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 * * * *"
|
||||
|
||||
# Limit to single concurrent run for workflows which can create Jira issues.
|
||||
# Same concurrency group is used in issue_comment.yml
|
||||
concurrency: jira_issues
|
||||
|
||||
jobs:
|
||||
sync_prs_to_jira:
|
||||
name: Sync PRs to Jira
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
- uses: actions/checkout@v2
|
||||
- name: Sync PRs to Jira project
|
||||
uses: espressif/github-actions/sync_issues_to_jira@master
|
||||
with:
|
||||
cron_job: true
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
JIRA_PASS: ${{ secrets.JIRA_PASS }}
|
||||
|
@ -40,6 +40,10 @@ build_idf_v5.0:
|
||||
extends: .build_template
|
||||
image: espressif/idf:release-v5.0
|
||||
|
||||
build_idf_v5.1:
|
||||
extends: .build_template
|
||||
image: espressif/idf:release-v5.1
|
||||
|
||||
build_idf_latest:
|
||||
extends: .build_template
|
||||
image: espressif/idf:latest
|
||||
@ -55,7 +59,7 @@ build_and_test_qemu:
|
||||
- export IDF_PATH=$CI_PROJECT_DIR/esp-idf
|
||||
- git clone "${IDF_REPO}"
|
||||
# switch to IDF and setup the tools
|
||||
- $MQTT_PATH/ci/set_idf.sh master
|
||||
- $MQTT_PATH/ci/set_idf.sh release/v5.1
|
||||
- $IDF_PATH/tools/idf_tools.py install-python-env
|
||||
- cd $IDF_PATH && tools/idf_tools.py --non-interactive install && eval "$(tools/idf_tools.py --non-interactive export)"
|
||||
# Remove `debug_backend` and Add `paho-mqtt` to the required packages
|
||||
@ -74,7 +78,7 @@ build_and_test_qemu:
|
||||
- export MQTT_PUBLISH_MSG_len_1=2 MQTT_PUBLISH_MSG_repeat_1=50
|
||||
- export MQTT_PUBLISH_MSG_len_2=128 MQTT_PUBLISH_MSG_repeat_2=2
|
||||
- export MQTT_PUBLISH_MSG_len_3=20 MQTT_PUBLISH_MSG_repeat_3=20
|
||||
- python Runner.py $TEST_PATH -c $TEST_PATH/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
|
||||
- python Runner.py $TEST_PATH -c $MQTT_PATH/ci/publish_connect_mqtt_qemu.yml -e $TEST_PATH/env.yml
|
||||
|
||||
push_master_to_github:
|
||||
stage: deploy
|
||||
|
14
Kconfig
14
Kconfig
@ -124,6 +124,13 @@ menu "ESP-MQTT Configurations"
|
||||
help
|
||||
MQTT task priority. Higher number denotes higher priority.
|
||||
|
||||
config MQTT_POLL_READ_TIMEOUT_MS
|
||||
int "MQTT transport poll read timeut"
|
||||
default 1000
|
||||
depends on MQTT_USE_CUSTOM_CONFIG
|
||||
help
|
||||
Timeout when polling underlying transport for read.
|
||||
|
||||
config MQTT_EVENT_QUEUE_SIZE
|
||||
int "Number of queued events."
|
||||
default 1
|
||||
@ -145,6 +152,13 @@ menu "ESP-MQTT Configurations"
|
||||
bool "Core 1"
|
||||
endchoice
|
||||
|
||||
config MQTT_OUTBOX_DATA_ON_EXTERNAL_MEMORY
|
||||
bool "Use external memory for outbox data"
|
||||
default n
|
||||
depends on MQTT_USE_CUSTOM_CONFIG
|
||||
help
|
||||
Set to true to use external memory for outbox data.
|
||||
|
||||
config MQTT_CUSTOM_OUTBOX
|
||||
bool "Enable custom outbox implementation"
|
||||
default n
|
||||
|
7
ci/publish_connect_mqtt_qemu.yml
Normal file
7
ci/publish_connect_mqtt_qemu.yml
Normal file
@ -0,0 +1,7 @@
|
||||
CaseConfig:
|
||||
- name: test_app_protocol_mqtt_publish_connect
|
||||
overwrite:
|
||||
dut:
|
||||
class: ESP32QEMUDUT
|
||||
package: ttfw_idf
|
||||
|
@ -1,3 +1,8 @@
|
||||
/*
|
||||
* SPDX-FileCopyrightText: 1991-1993 The Regents of the University of California
|
||||
*
|
||||
* SPDX-License-Identifier: BSD-3-Clause
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/* Implementation from BSD headers*/
|
||||
|
@ -138,6 +138,7 @@ typedef struct {
|
||||
uint16_t correlation_data_len; /*!< Correlation data length of the message */
|
||||
char *content_type; /*!< Content type of the message */
|
||||
int content_type_len; /*!< Content type length of the message */
|
||||
uint16_t subscribe_id; /*!< Subscription identifier of the message */
|
||||
mqtt5_user_property_handle_t user_property; /*!< The handle for user property, call function esp_mqtt5_client_delete_user_property to free the memory */
|
||||
} esp_mqtt5_event_property_t;
|
||||
|
||||
|
@ -32,7 +32,7 @@ typedef struct esp_mqtt_client *esp_mqtt_client_handle_t;
|
||||
* @brief *MQTT* event types.
|
||||
*
|
||||
* User event handler receives context data in `esp_mqtt_event_t` structure with
|
||||
* - `client` - *MQTT* client handle
|
||||
* - client - *MQTT* client handle
|
||||
* - various other data depending on event type
|
||||
*
|
||||
*/
|
||||
@ -223,132 +223,140 @@ typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
|
||||
* character and the related len field set to 0. DER format requires a related len field set to the correct length.
|
||||
*/
|
||||
typedef struct esp_mqtt_client_config_t {
|
||||
/**
|
||||
* Broker related configuration
|
||||
*/
|
||||
struct broker_t {
|
||||
/**
|
||||
* Broker address
|
||||
*
|
||||
* - uri have precedence over other fields
|
||||
* - If uri isn't set at least hostname, transport and port should.
|
||||
*/
|
||||
struct address_t {
|
||||
const char *uri; /*!< Complete *MQTT* broker URI */
|
||||
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
|
||||
esp_mqtt_transport_t transport; /*!< Selects transport*/
|
||||
const char *path; /*!< Path in the URI*/
|
||||
uint32_t port; /*!< *MQTT* server port */
|
||||
} address; /*!< Broker address configuration */
|
||||
/**
|
||||
* Broker identity verification
|
||||
*
|
||||
* If fields are not set broker's identity isn't verified. it's recommended
|
||||
* to set the options in this struct for security reasons.
|
||||
*/
|
||||
struct verification_t {
|
||||
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
|
||||
/**
|
||||
* Broker related configuration
|
||||
*/
|
||||
struct broker_t {
|
||||
/**
|
||||
* Broker address
|
||||
*
|
||||
* - uri have precedence over other fields
|
||||
* - If uri isn't set at least hostname, transport and port should.
|
||||
*/
|
||||
struct address_t {
|
||||
const char *uri; /*!< Complete *MQTT* broker URI */
|
||||
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
|
||||
esp_mqtt_transport_t transport; /*!< Selects transport*/
|
||||
const char *path; /*!< Path in the URI*/
|
||||
uint32_t port; /*!< *MQTT* server port */
|
||||
} address; /*!< Broker address configuration */
|
||||
/**
|
||||
* Broker identity verification
|
||||
*
|
||||
* If fields are not set broker's identity isn't verified. it's recommended
|
||||
* to set the options in this struct for security reasons.
|
||||
*/
|
||||
struct verification_t {
|
||||
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
|
||||
documentation for details. */
|
||||
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
|
||||
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
|
||||
the usage of certificate bundles. */
|
||||
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
|
||||
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
|
||||
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
|
||||
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
|
||||
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
|
||||
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
|
||||
authentication (as alternative to certificate verification).
|
||||
PSK is enabled only if there are no other ways to
|
||||
verify broker.*/
|
||||
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
|
||||
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
|
||||
security of TLS and makes the *MQTT* client susceptible to MITM attacks */
|
||||
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
|
||||
} verification; /*!< Security verification of the broker */
|
||||
} broker; /*!< Broker address and security verification */
|
||||
/**
|
||||
* Client related credentials for authentication.
|
||||
*/
|
||||
struct credentials_t {
|
||||
const char *username; /*!< *MQTT* username */
|
||||
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
|
||||
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
|
||||
} verification; /*!< Security verification of the broker */
|
||||
} broker; /*!< Broker address and security verification */
|
||||
/**
|
||||
* Client related credentials for authentication.
|
||||
*/
|
||||
struct credentials_t {
|
||||
const char *username; /*!< *MQTT* username */
|
||||
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
|
||||
the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are
|
||||
last 3 bytes of MAC address in hex format */
|
||||
bool set_null_client_id; /*!< Selects a NULL client id */
|
||||
/**
|
||||
* Client authentication
|
||||
*
|
||||
* Fields related to client authentication by broker
|
||||
*
|
||||
* For mutual authentication using TLS, user could select certificate and key,
|
||||
* secure element or digital signature peripheral if available.
|
||||
*
|
||||
*/
|
||||
struct authentication_t {
|
||||
const char *password; /*!< *MQTT* password */
|
||||
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
|
||||
bool set_null_client_id; /*!< Selects a NULL client id */
|
||||
/**
|
||||
* Client authentication
|
||||
*
|
||||
* Fields related to client authentication by broker
|
||||
*
|
||||
* For mutual authentication using TLS, user could select certificate and key,
|
||||
* secure element or digital signature peripheral if available.
|
||||
*
|
||||
*/
|
||||
struct authentication_t {
|
||||
const char *password; /*!< *MQTT* password */
|
||||
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
|
||||
authentication is not needed. Must be provided with `key`.*/
|
||||
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
|
||||
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
|
||||
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
|
||||
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
|
||||
is not needed. If it is not NULL, also `certificate` has to be provided.*/
|
||||
size_t key_len; /*!< Length of the buffer pointed to by key.*/
|
||||
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
|
||||
size_t key_len; /*!< Length of the buffer pointed to by key.*/
|
||||
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
|
||||
`key_password_len` must be correctly set. */
|
||||
int key_password_len; /*!< Length of the password pointed to by `key_password` */
|
||||
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
|
||||
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
|
||||
int key_password_len; /*!< Length of the password pointed to by `key_password` */
|
||||
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
|
||||
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
|
||||
available in some Espressif devices. */
|
||||
} authentication; /*!< Client authentication */
|
||||
} credentials; /*!< User credentials for broker */
|
||||
/**
|
||||
* *MQTT* Session related configuration
|
||||
*/
|
||||
struct session_t {
|
||||
/**
|
||||
* Last Will and Testament message configuration.
|
||||
*/
|
||||
struct last_will_t {
|
||||
const char *topic; /*!< LWT (Last Will and Testament) message topic */
|
||||
const char *msg; /*!< LWT message, may be NULL terminated*/
|
||||
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
|
||||
int qos; /*!< LWT message QoS */
|
||||
int retain; /*!< LWT retained message flag */
|
||||
} last_will; /*!< Last will configuration */
|
||||
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
|
||||
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
|
||||
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
|
||||
} authentication; /*!< Client authentication */
|
||||
} credentials; /*!< User credentials for broker */
|
||||
/**
|
||||
* *MQTT* Session related configuration
|
||||
*/
|
||||
struct session_t {
|
||||
/**
|
||||
* Last Will and Testament message configuration.
|
||||
*/
|
||||
struct last_will_t {
|
||||
const char *topic; /*!< LWT (Last Will and Testament) message topic */
|
||||
const char *msg; /*!< LWT message, may be NULL terminated*/
|
||||
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
|
||||
int qos; /*!< LWT message QoS */
|
||||
int retain; /*!< LWT retained message flag */
|
||||
} last_will; /*!< Last will configuration */
|
||||
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
|
||||
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
|
||||
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
|
||||
by default. Note: setting the config value `keepalive` to `0` doesn't disable
|
||||
keepalive feature, but uses a default keepalive period */
|
||||
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
|
||||
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
|
||||
} session; /*!< *MQTT* session configuration. */
|
||||
/**
|
||||
* Network related configuration
|
||||
*/
|
||||
struct network_t {
|
||||
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
|
||||
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
|
||||
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
|
||||
} session; /*!< *MQTT* session configuration. */
|
||||
/**
|
||||
* Network related configuration
|
||||
*/
|
||||
struct network_t {
|
||||
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
|
||||
disabled (defaults to 10s) */
|
||||
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
|
||||
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
|
||||
(defaults to 10s). */
|
||||
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
|
||||
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
|
||||
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
|
||||
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
|
||||
`disable_auto_reconnect=true` to disable */
|
||||
} network; /*!< Network configuration */
|
||||
/**
|
||||
* Client task configuration
|
||||
*/
|
||||
struct task_t {
|
||||
int priority; /*!< *MQTT* task priority*/
|
||||
int stack_size; /*!< *MQTT* task stack size*/
|
||||
} task; /*!< FreeRTOS task configuration.*/
|
||||
/**
|
||||
* Client buffer size configuration
|
||||
*
|
||||
* Client have two buffers for input and output respectivelly.
|
||||
*/
|
||||
struct buffer_t {
|
||||
int size; /*!< size of *MQTT* send/receive buffer*/
|
||||
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
|
||||
} network; /*!< Network configuration */
|
||||
/**
|
||||
* Client task configuration
|
||||
*/
|
||||
struct task_t {
|
||||
int priority; /*!< *MQTT* task priority*/
|
||||
int stack_size; /*!< *MQTT* task stack size*/
|
||||
} task; /*!< FreeRTOS task configuration.*/
|
||||
/**
|
||||
* Client buffer size configuration
|
||||
*
|
||||
* Client have two buffers for input and output respectivelly.
|
||||
*/
|
||||
struct buffer_t {
|
||||
int size; /*!< size of *MQTT* send/receive buffer*/
|
||||
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
|
||||
``buffer_size`` */
|
||||
} buffer; /*!< Buffer size configuration.*/
|
||||
} buffer; /*!< Buffer size configuration.*/
|
||||
} esp_mqtt_client_config_t;
|
||||
|
||||
/**
|
||||
* Topic definition struct
|
||||
*/
|
||||
typedef struct topic_t {
|
||||
const char *filter; /*!< Topic filter to subscribe */
|
||||
int qos; /*!< Max QoS level of the subscription */
|
||||
} esp_mqtt_topic_t;
|
||||
|
||||
/**
|
||||
* @brief Creates *MQTT* client handle based on the configuration
|
||||
*
|
||||
@ -417,6 +425,26 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
|
||||
*/
|
||||
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
|
||||
|
||||
|
||||
/**
|
||||
* @brief Convenience macro to select subscribe function to use.
|
||||
*
|
||||
* Notes:
|
||||
* - Usage of `esp_mqtt_client_subscribe_single` is the same as previous
|
||||
* esp_mqtt_client_subscribe, refer to it for details.
|
||||
*
|
||||
* @param client_handle *MQTT* client handle
|
||||
* @param topic_type Needs to be char* for single subscription or `esp_mqtt_topic_t` for multiple topics
|
||||
* @param qos_or_size It's either a qos when subscribing to a single topic or the size of the subscription array when subscribing to multiple topics.
|
||||
*
|
||||
* @return message_id of the subscribe message on success
|
||||
* -1 on failure
|
||||
*/
|
||||
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
|
||||
char *: esp_mqtt_client_subscribe_single, \
|
||||
esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \
|
||||
)(client_handle, topic_type, qos_or_size)
|
||||
|
||||
/**
|
||||
* @brief Subscribe the client to defined topic with defined qos
|
||||
*
|
||||
@ -426,23 +454,44 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
|
||||
* from a *MQTT* event callback i.e. internal *MQTT* task
|
||||
* (API is protected by internal mutex, so it might block
|
||||
* if a longer data receive operation is in progress.
|
||||
* - `esp_mqtt_client_subscribe` could be used to call this function.
|
||||
*
|
||||
* @param client *MQTT* client handle
|
||||
* @param topic
|
||||
* @param qos // TODO describe parameters
|
||||
* @param topic topic filter to subscribe
|
||||
* @param qos Max qos level of the subscription
|
||||
*
|
||||
* @return message_id of the subscribe message on success
|
||||
* -1 on failure
|
||||
*/
|
||||
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client,
|
||||
const char *topic, int qos);
|
||||
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
|
||||
const char *topic, int qos);
|
||||
/**
|
||||
* @brief Subscribe the client to a list of defined topics with defined qos
|
||||
*
|
||||
* Notes:
|
||||
* - Client must be connected to send subscribe message
|
||||
* - This API is could be executed from a user task or
|
||||
* from a *MQTT* event callback i.e. internal *MQTT* task
|
||||
* (API is protected by internal mutex, so it might block
|
||||
* if a longer data receive operation is in progress.
|
||||
* - `esp_mqtt_client_subscribe` could be used to call this function.
|
||||
*
|
||||
* @param client *MQTT* client handle
|
||||
* @param topic_list List of topics to subscribe
|
||||
* @param size size of topic_list
|
||||
*
|
||||
* @return message_id of the subscribe message on success
|
||||
* -1 on failure
|
||||
*/
|
||||
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
|
||||
const esp_mqtt_topic_t *topic_list, int size);
|
||||
|
||||
/**
|
||||
* @brief Unsubscribe the client from defined topic
|
||||
*
|
||||
* Notes:
|
||||
* - Client must be connected to send unsubscribe message
|
||||
* - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details
|
||||
* - It is thread safe, please refer to `esp_mqtt_client_subscribe_single` for details
|
||||
*
|
||||
* @param client *MQTT* client handle
|
||||
* @param topic
|
||||
|
@ -36,7 +36,8 @@ typedef struct {
|
||||
mqtt5_topic_alias_handle_t peer_topic_alias;
|
||||
} mqtt5_config_storage_t;
|
||||
|
||||
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client);
|
||||
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client);
|
||||
|
@ -126,7 +126,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
|
||||
mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info);
|
||||
esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property);
|
||||
int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length);
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
|
||||
mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property);
|
||||
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info);
|
||||
mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
|
||||
|
@ -58,7 +58,6 @@ typedef struct mqtt_state {
|
||||
uint16_t pending_msg_id;
|
||||
int pending_msg_type;
|
||||
int pending_publish_qos;
|
||||
int pending_msg_count;
|
||||
} mqtt_state_t;
|
||||
|
||||
typedef struct {
|
||||
|
@ -17,7 +17,12 @@
|
||||
#endif
|
||||
|
||||
#define MQTT_RECON_DEFAULT_MS (10*1000)
|
||||
|
||||
#ifdef CONFIG_MQTT_POLL_READ_TIMEOUT_MS
|
||||
#define MQTT_POLL_READ_TIMEOUT_MS CONFIG_MQTT_POLL_READ_TIMEOUT_MS
|
||||
#else
|
||||
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
|
||||
#endif
|
||||
|
||||
#define MQTT_MSG_ID_INCREMENTAL CONFIG_MQTT_MSG_ID_INCREMENTAL
|
||||
|
||||
@ -102,5 +107,11 @@
|
||||
#define MQTT_EVENT_QUEUE_SIZE 1
|
||||
#endif
|
||||
|
||||
#ifdef CONFIG_MQTT_OUTBOX_DATA_ON_EXTERNAL_MEMORY
|
||||
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_SPIRAM
|
||||
#else
|
||||
#define MQTT_OUTBOX_MEMORY MALLOC_CAP_DEFAULT
|
||||
#endif
|
||||
|
||||
#define OUTBOX_MAX_SIZE (4*1024)
|
||||
#endif
|
||||
|
@ -138,7 +138,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
|
||||
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
|
||||
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
|
||||
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id);
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull));
|
||||
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id);
|
||||
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
|
||||
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <string.h>
|
||||
#include "mqtt5_msg.h"
|
||||
#include "mqtt_client.h"
|
||||
#include "mqtt_config.h"
|
||||
#include "platform.h"
|
||||
#include "esp_log.h"
|
||||
@ -764,7 +765,7 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
|
||||
}
|
||||
snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info);
|
||||
if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) {
|
||||
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
|
||||
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
|
||||
free(response_topic);
|
||||
return fail_message(connection);
|
||||
}
|
||||
@ -849,14 +850,10 @@ int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length)
|
||||
return -1;
|
||||
}
|
||||
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic_list, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
|
||||
{
|
||||
init_message(connection);
|
||||
|
||||
if (topic == NULL || topic[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if ((*message_id = append_message_id(connection, 0)) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
@ -877,41 +874,47 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *t
|
||||
}
|
||||
}
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
if (property && property->is_share_subscribe) {
|
||||
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
|
||||
char *shared_topic = calloc(1, shared_topic_size);
|
||||
if (!shared_topic) {
|
||||
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
|
||||
fail_message(connection);
|
||||
}
|
||||
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
|
||||
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
|
||||
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
|
||||
free(shared_topic);
|
||||
|
||||
for (int topic_number = 0; topic_number < size; ++topic_number) {
|
||||
if (topic_list[topic_number].filter[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
free(shared_topic);
|
||||
} else {
|
||||
APPEND_CHECK(append_property(connection, 0, 2, topic, strlen(topic)), fail_message(connection));
|
||||
}
|
||||
if (property && property->is_share_subscribe) {
|
||||
uint16_t shared_topic_size = strlen(topic_list[topic_number].filter) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
|
||||
char *shared_topic = calloc(1, shared_topic_size);
|
||||
if (!shared_topic) {
|
||||
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
|
||||
fail_message(connection);
|
||||
}
|
||||
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic_list[topic_number].filter);
|
||||
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
|
||||
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
|
||||
free(shared_topic);
|
||||
return fail_message(connection);
|
||||
}
|
||||
free(shared_topic);
|
||||
} else {
|
||||
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length] = 0;
|
||||
if (property) {
|
||||
if (property->retain_handle > 0 && property->retain_handle < 3) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
|
||||
}
|
||||
if (property->no_local_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
|
||||
}
|
||||
if (property->retain_as_published_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
|
||||
}
|
||||
}
|
||||
connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3);
|
||||
connection->message.length ++;
|
||||
}
|
||||
connection->buffer[connection->message.length] = 0;
|
||||
if (property) {
|
||||
if (property->retain_handle > 0 && property->retain_handle < 3) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
|
||||
}
|
||||
if (property->no_local_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
|
||||
}
|
||||
if (property->retain_as_published_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
|
||||
}
|
||||
}
|
||||
connection->buffer[connection->message.length] |= (qos & 3);
|
||||
connection->message.length ++;
|
||||
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
|
||||
}
|
||||
|
||||
@ -975,7 +978,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
|
||||
}
|
||||
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
|
||||
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
|
||||
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
|
||||
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
|
||||
free(shared_topic);
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
*
|
||||
*/
|
||||
#include <string.h>
|
||||
#include "mqtt_client.h"
|
||||
#include "mqtt_msg.h"
|
||||
#include "mqtt_config.h"
|
||||
#include "platform.h"
|
||||
@ -466,18 +467,18 @@ mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topi
|
||||
*message_id = 0;
|
||||
}
|
||||
|
||||
if (connection->message.length + data_length > connection->buffer_length) {
|
||||
// Not enough size in buffer -> fragment this message
|
||||
connection->message.fragmented_msg_data_offset = connection->message.length;
|
||||
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
|
||||
connection->message.length = connection->buffer_length;
|
||||
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
|
||||
} else {
|
||||
if (data != NULL) {
|
||||
if (data != NULL) {
|
||||
if (connection->message.length + data_length > connection->buffer_length) {
|
||||
// Not enough size in buffer -> fragment this message
|
||||
connection->message.fragmented_msg_data_offset = connection->message.length;
|
||||
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
|
||||
connection->message.length = connection->buffer_length;
|
||||
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
|
||||
} else {
|
||||
memcpy(connection->buffer + connection->message.length, data, data_length);
|
||||
connection->message.length += data_length;
|
||||
connection->message.fragmented_msg_total_length = 0;
|
||||
}
|
||||
connection->message.fragmented_msg_total_length = 0;
|
||||
}
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
|
||||
}
|
||||
@ -518,26 +519,29 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
|
||||
if (topic == NULL || topic[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if ((*message_id = append_message_id(connection, 0)) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (append_string(connection, topic, strlen(topic)) < 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
for (int topic_number = 0; topic_number < size; ++topic_number) {
|
||||
if (topic_list[topic_number].filter[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length] = topic_list[topic_number].qos;
|
||||
connection->message.length ++;
|
||||
}
|
||||
connection->buffer[connection->message.length++] = qos;
|
||||
|
||||
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
|
||||
}
|
||||
|
@ -1,7 +1,9 @@
|
||||
#include "mqtt_outbox.h"
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "mqtt_config.h"
|
||||
#include "sys/queue.h"
|
||||
#include "esp_heap_caps.h"
|
||||
#include "esp_log.h"
|
||||
|
||||
#ifndef CONFIG_MQTT_CUSTOM_OUTBOX
|
||||
@ -31,7 +33,7 @@ outbox_handle_t outbox_init(void)
|
||||
|
||||
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
|
||||
{
|
||||
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
|
||||
outbox_item_handle_t item = heap_caps_calloc(1, sizeof(outbox_item_t), MQTT_OUTBOX_MEMORY);
|
||||
ESP_MEM_CHECK(TAG, item, return NULL);
|
||||
item->msg_id = message->msg_id;
|
||||
item->msg_type = message->msg_type;
|
||||
|
@ -16,17 +16,20 @@ static char *esp_mqtt5_client_get_topic_alias(mqtt5_topic_alias_handle_t topic_a
|
||||
static void esp_mqtt5_client_delete_topic_alias(mqtt5_topic_alias_handle_t topic_alias_handle);
|
||||
static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_property_new, const mqtt5_user_property_handle_t user_property_old);
|
||||
|
||||
void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client)
|
||||
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
int msg_type = mqtt5_get_type(client->mqtt_state.outbound_message->data);
|
||||
if (msg_type == MQTT_MSG_TYPE_PUBLISH) {
|
||||
int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data);
|
||||
if (msg_qos > 0) {
|
||||
client->send_publish_packet_count ++;
|
||||
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
|
||||
}
|
||||
}
|
||||
bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data);
|
||||
if (msg_dup == false) {
|
||||
client->send_publish_packet_count ++;
|
||||
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
|
||||
}
|
||||
}
|
||||
|
||||
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
|
||||
{
|
||||
if (client->send_publish_packet_count > 0) {
|
||||
client->send_publish_packet_count --;
|
||||
ESP_LOGD(TAG, "Receive (%d) qos > 0 publish packet with ack", client->send_publish_packet_count);
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,7 +54,6 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
|
||||
client->event.data_len = msg_data_len;
|
||||
client->event.total_data_len = msg_data_len;
|
||||
client->event.current_data_offset = 0;
|
||||
client->send_publish_packet_count --;
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,7 +100,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
|
||||
uint16_t property_len = 0;
|
||||
esp_mqtt5_publish_resp_property_t property = {0};
|
||||
*msg_data = mqtt5_get_publish_property_payload(msg_buf, msg_read_len, msg_topic, msg_topic_len, &property, &property_len, msg_data_len, &client->event.property->user_property);
|
||||
if (*msg_data_len == 0 || *msg_data == NULL) {
|
||||
if (*msg_data == NULL) {
|
||||
ESP_LOGE(TAG, "%s: mqtt5_get_publish_property_payload() failed", __func__);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@ -131,6 +133,7 @@ esp_err_t esp_mqtt5_get_publish_data(esp_mqtt5_client_handle_t client, uint8_t *
|
||||
client->event.property->correlation_data_len = property.correlation_data_len;
|
||||
client->event.property->content_type = property.content_type;
|
||||
client->event.property->content_type_len = property.content_type_len;
|
||||
client->event.property->subscribe_id = property.subscribe_id;
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
@ -291,7 +294,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
|
||||
}
|
||||
|
||||
/* Flow control to check PUBLISH(No PUBACK or PUBCOMP received) packet sent count(Only record QoS1 and QoS2)*/
|
||||
if (client->send_publish_packet_count >= client->mqtt5_config->server_resp_property_info.receive_maximum) {
|
||||
if (client->send_publish_packet_count > client->mqtt5_config->server_resp_property_info.receive_maximum) {
|
||||
ESP_LOGE(TAG, "Client send more than %d QoS1 and QoS2 PUBLISH packet without no ack", client->mqtt5_config->server_resp_property_info.receive_maximum);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
195
mqtt_client.c
195
mqtt_client.c
@ -244,7 +244,7 @@ static esp_err_t esp_mqtt_check_cfg_conflict(const mqtt_config_storage_t *cfg, c
|
||||
bool ssl_cfg_enabled = cfg->use_global_ca_store || cfg->cacert_buf || cfg->clientcert_buf || cfg->psk_hint_key || cfg->alpn_protos;
|
||||
bool is_ssl_scheme = false;
|
||||
if (cfg->scheme) {
|
||||
is_ssl_scheme = (strcasecmp(cfg->scheme, MQTT_OVER_SSL_SCHEME) == 0) || (strcasecmp(cfg->scheme, MQTT_OVER_WSS_SCHEME) == 0);
|
||||
is_ssl_scheme = (strncasecmp(cfg->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(cfg->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0);
|
||||
}
|
||||
|
||||
if (!is_ssl_scheme && ssl_cfg_enabled) {
|
||||
@ -287,12 +287,12 @@ static esp_err_t esp_mqtt_client_create_transport(esp_mqtt_client_handle_t clien
|
||||
client->transport_list = esp_transport_list_init();
|
||||
ESP_MEM_CHECK(TAG, client->transport_list, return ESP_ERR_NO_MEM);
|
||||
|
||||
if ((strcasecmp(client->config->scheme, MQTT_OVER_TCP_SCHEME) == 0) || (strcasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME) == 0)) {
|
||||
if ((strncasecmp(client->config->scheme, MQTT_OVER_TCP_SCHEME, sizeof(MQTT_OVER_TCP_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0)) {
|
||||
esp_transport_handle_t tcp = esp_transport_tcp_init();
|
||||
ESP_MEM_CHECK(TAG, tcp, return ESP_ERR_NO_MEM);
|
||||
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
|
||||
esp_transport_list_add(client->transport_list, tcp, MQTT_OVER_TCP_SCHEME);
|
||||
if (strcasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME) == 0) {
|
||||
if (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0) {
|
||||
#if MQTT_ENABLE_WS
|
||||
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
|
||||
ESP_MEM_CHECK(TAG, ws, return ESP_ERR_NO_MEM);
|
||||
@ -309,13 +309,13 @@ static esp_err_t esp_mqtt_client_create_transport(esp_mqtt_client_handle_t clien
|
||||
ret = ESP_FAIL;
|
||||
#endif
|
||||
}
|
||||
} else if ((strcasecmp(client->config->scheme, MQTT_OVER_SSL_SCHEME) == 0) || (strcasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME) == 0)) {
|
||||
} else if ((strncasecmp(client->config->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0)) {
|
||||
#if MQTT_ENABLE_SSL
|
||||
esp_transport_handle_t ssl = esp_transport_ssl_init();
|
||||
ESP_MEM_CHECK(TAG, ssl, return ESP_ERR_NO_MEM);
|
||||
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
|
||||
esp_transport_list_add(client->transport_list, ssl, MQTT_OVER_SSL_SCHEME);
|
||||
if (strcasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME) == 0) {
|
||||
if (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0) {
|
||||
#if MQTT_ENABLE_WS
|
||||
esp_transport_handle_t wss = esp_transport_ws_init(ssl);
|
||||
ESP_MEM_CHECK(TAG, wss, return ESP_ERR_NO_MEM);
|
||||
@ -489,7 +489,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
|
||||
client->config->num_alpn_protos++;
|
||||
}
|
||||
// mbedTLS expects the list to be null-terminated
|
||||
client->config->alpn_protos = calloc(client->config->num_alpn_protos + 1, sizeof(config->broker.verification.alpn_protos));
|
||||
client->config->alpn_protos = calloc(client->config->num_alpn_protos + 1, sizeof(*config->broker.verification.alpn_protos));
|
||||
ESP_MEM_CHECK(TAG, client->config->alpn_protos, goto _mqtt_set_config_failed);
|
||||
|
||||
for (int i = 0; i < client->config->num_alpn_protos; i++) {
|
||||
@ -641,8 +641,11 @@ static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
|
||||
client->config->network_timeout_ms);
|
||||
if (wlen < 0) {
|
||||
ESP_LOGE(TAG, "Writing failed: errno=%d", errno);
|
||||
esp_mqtt_client_dispatch_transport_error(client);
|
||||
return ESP_FAIL;
|
||||
} else if (wlen == 0) {
|
||||
}
|
||||
|
||||
if (wlen == 0) {
|
||||
ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno);
|
||||
return ESP_ERR_TIMEOUT;
|
||||
}
|
||||
@ -709,6 +712,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
|
||||
client->send_publish_packet_count = 0;
|
||||
return ESP_OK;
|
||||
}
|
||||
#endif
|
||||
@ -936,17 +940,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
esp_mqtt_client_dispatch_transport_error(client);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_flow_control(client);
|
||||
#endif
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
|
||||
{
|
||||
@ -1019,7 +1012,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
|
||||
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: msg_topic_len=%zu", __func__, msg_topic_len);
|
||||
ESP_LOGD(TAG, "%s: msg_topic_len=%"PRIu32, __func__, msg_topic_len);
|
||||
|
||||
// get payload
|
||||
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
|
||||
@ -1041,7 +1034,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
|
||||
client->event.dup = mqtt_get_dup(msg_buf);
|
||||
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
|
||||
post_data_event:
|
||||
ESP_LOGD(TAG, "Get data len= %zu, topic len=%zu, total_data: %d offset: %zu", msg_data_len, msg_topic_len,
|
||||
ESP_LOGD(TAG, "Get data len= %"PRIu32", topic len=%"PRIu32", total_data: %d offset: %"PRIu32, msg_data_len, msg_topic_len,
|
||||
client->event.total_data_len, msg_data_offset);
|
||||
client->event.event_id = MQTT_EVENT_DATA;
|
||||
client->event.data = msg_data_len > 0 ? msg_data : NULL;
|
||||
@ -1081,6 +1074,9 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property);
|
||||
#else
|
||||
// SUBACK Using MQTT5 received but MQTT5 is disabled, This is unlikely to happen.
|
||||
return ESP_FAIL;
|
||||
#endif
|
||||
} else {
|
||||
msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len);
|
||||
@ -1095,8 +1091,11 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
|
||||
client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE;
|
||||
client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED;
|
||||
// post data event
|
||||
if ((uint8_t)*msg_data == 0x80) {
|
||||
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
|
||||
for (int topic = 0; topic < msg_data_len; ++topic) {
|
||||
if ((uint8_t)msg_data[topic] == 0x80) {
|
||||
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
|
||||
break;
|
||||
}
|
||||
}
|
||||
client->event.data_len = msg_data_len;
|
||||
client->event.total_data_len = msg_data_len;
|
||||
@ -1108,53 +1107,33 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
|
||||
// Deletes the initial message in MQTT communication protocol
|
||||
// Return false when message is not found, making the received counterpart invalid.
|
||||
static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
|
||||
{
|
||||
ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count);
|
||||
if (client->mqtt_state.pending_msg_count == 0) {
|
||||
return false;
|
||||
}
|
||||
if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) {
|
||||
client->mqtt_state.pending_msg_count --;
|
||||
ESP_LOGD(TAG, "Removed pending_id=%d", client->mqtt_state.pending_msg_id);
|
||||
return true;
|
||||
}
|
||||
|
||||
ESP_LOGD(TAG, "Failed to remove pending_id=%d", client->mqtt_state.pending_msg_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
|
||||
{
|
||||
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
|
||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
//lock mutex
|
||||
outbox_message_t msg = { 0 };
|
||||
msg.data = client->mqtt_state.outbound_message->data;
|
||||
msg.len = client->mqtt_state.outbound_message->length;
|
||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
||||
msg.remaining_data = remaining_data;
|
||||
msg.remaining_len = remaining_len;
|
||||
//Copy to queue buffer
|
||||
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||
//unlock
|
||||
}
|
||||
|
||||
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client)
|
||||
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
|
||||
{
|
||||
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
if (client->mqtt_state.pending_msg_count > 0) {
|
||||
outbox_message_t msg = { 0 };
|
||||
msg.data = client->mqtt_state.outbound_message->data;
|
||||
msg.len = client->mqtt_state.outbound_message->length;
|
||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
||||
msg.remaining_data = remaining_data;
|
||||
msg.remaining_len = remaining_len;
|
||||
//Copy to queue buffer
|
||||
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
@ -1214,7 +1193,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
|
||||
}
|
||||
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
|
||||
ESP_LOGD(TAG, "%s: total message length: %d (already read: %zu)", __func__, total_len, client->mqtt_state.in_buffer_read_len);
|
||||
ESP_LOGD(TAG, "%s: total message length: %d (already read: %"PRIu32")", __func__, total_len, client->mqtt_state.in_buffer_read_len);
|
||||
client->mqtt_state.message_length = total_len;
|
||||
if (client->mqtt_state.in_buffer_length < total_len) {
|
||||
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
|
||||
@ -1232,7 +1211,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
client->mqtt_state.in_buffer_read_len += read_len;
|
||||
buf += read_len;
|
||||
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu)",
|
||||
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"PRIu32")",
|
||||
__func__, total_len, client->mqtt_state.in_buffer_read_len);
|
||||
return 0;
|
||||
}
|
||||
@ -1262,12 +1241,12 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
|
||||
}
|
||||
client->mqtt_state.in_buffer_read_len += read_len;
|
||||
if (client->mqtt_state.in_buffer_read_len < total_len) {
|
||||
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu)",
|
||||
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"PRIu32")",
|
||||
__func__, total_len, client->mqtt_state.in_buffer_read_len);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
ESP_LOGD(TAG, "%s: transport_read():%zu %zu", __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
ESP_LOGD(TAG, "%s: transport_read():%"PRIu32" %"PRIu32, __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
return 1;
|
||||
err:
|
||||
esp_mqtt_client_dispatch_transport_error(client);
|
||||
@ -1305,11 +1284,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
|
||||
switch (msg_type) {
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
if (remove_initiator_message(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_parse_suback(client);
|
||||
#endif
|
||||
ESP_LOGD(TAG, "deliver_suback, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
ESP_LOGD(TAG, "deliver_suback, message_length_read=%"PRIu32", message_length=%"PRIu32, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
if (deliver_suback(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id);
|
||||
return ESP_FAIL;
|
||||
@ -1317,7 +1296,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
if (remove_initiator_message(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_parse_unsuback(client);
|
||||
#endif
|
||||
@ -1327,7 +1306,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%"PRIu32", message_length=%"PRIu32, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
|
||||
if (deliver_publish(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
|
||||
return ESP_FAIL;
|
||||
@ -1357,14 +1336,19 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_decrement_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_parse_puback(client);
|
||||
@ -1389,7 +1373,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
|
||||
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
|
||||
mqtt_write_data(client);
|
||||
esp_mqtt_write(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
@ -1406,11 +1390,16 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
mqtt_write_data(client);
|
||||
esp_mqtt_write(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_decrement_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
esp_mqtt5_parse_pubcomp(client);
|
||||
@ -1447,17 +1436,25 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
|
||||
}
|
||||
|
||||
// try to resend the data
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error to resend data ");
|
||||
esp_mqtt_abort_connection(client);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
// check if it was QoS-0 publish message
|
||||
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
|
||||
// delete all qos0 publish messages once we process them
|
||||
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
|
||||
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
|
||||
if (client->mqtt_state.pending_publish_qos == 0) {
|
||||
// delete all qos0 publish messages once we process them
|
||||
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
|
||||
}
|
||||
} else if (client->mqtt_state.pending_publish_qos > 0) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_increment_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
return ESP_OK;
|
||||
@ -1468,7 +1465,6 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
|
||||
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
|
||||
#if MQTT_REPORT_DELETED_MESSAGES
|
||||
// also report the deleted items as MQTT_EVENT_DELETED events if enabled
|
||||
int deleted_items = 0;
|
||||
int msg_id = 0;
|
||||
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) {
|
||||
client->event.event_id = MQTT_EVENT_DELETED;
|
||||
@ -1476,16 +1472,10 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
|
||||
if (esp_mqtt_dispatch_event(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
|
||||
}
|
||||
deleted_items ++;
|
||||
}
|
||||
#else
|
||||
int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||
outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||
#endif
|
||||
client->mqtt_state.pending_msg_count -= deleted_items;
|
||||
|
||||
if (client->mqtt_state.pending_msg_count < 0) {
|
||||
client->mqtt_state.pending_msg_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1746,7 +1736,7 @@ static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
|
||||
ESP_LOGE(TAG, "Disconnect message cannot be created");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error sending disconnect message");
|
||||
}
|
||||
return ESP_OK;
|
||||
@ -1796,7 +1786,7 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error sending ping");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
@ -1804,7 +1794,8 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
|
||||
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
|
||||
const esp_mqtt_topic_t *topic_list, int size)
|
||||
{
|
||||
if (!client) {
|
||||
ESP_LOGE(TAG, "Client was not initialized");
|
||||
@ -1818,13 +1809,19 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
||||
}
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (esp_mqtt5_client_subscribe_check(client, qos) != ESP_OK) {
|
||||
ESP_LOGI(TAG, "MQTT5 subscribe check fail");
|
||||
int max_qos = topic_list[0].qos;
|
||||
for (int topic_number = 0; topic_number < size; ++topic_number) {
|
||||
if (topic_list[topic_number].qos > max_qos) {
|
||||
max_qos = topic_list[topic_number].qos;
|
||||
}
|
||||
}
|
||||
if (esp_mqtt5_client_subscribe_check(client, max_qos) != ESP_OK) {
|
||||
ESP_LOGI(TAG, "MQTT5 subscribe check fail: QoS %d not accepted by broker ", max_qos);
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||
topic, qos,
|
||||
topic_list, size,
|
||||
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
|
||||
if (client->mqtt_state.outbound_message->length) {
|
||||
client->mqtt5_config->subscribe_property_info = NULL;
|
||||
@ -1832,7 +1829,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
||||
#endif
|
||||
} else {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||
topic, qos,
|
||||
topic_list, size,
|
||||
&client->mqtt_state.pending_msg_id);
|
||||
}
|
||||
if (client->mqtt_state.outbound_message->length == 0) {
|
||||
@ -1842,23 +1839,28 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
|
||||
}
|
||||
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_count ++;
|
||||
//move pending msg to outbox (if have)
|
||||
if (!mqtt_enqueue(client)) {
|
||||
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos);
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||
ESP_LOGD(TAG, "Sent subscribe, first topic=%s, id: %d", topic_list[0].filter, client->mqtt_state.pending_msg_id);
|
||||
MQTT_API_UNLOCK(client);
|
||||
return client->mqtt_state.pending_msg_id;
|
||||
|
||||
}
|
||||
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos)
|
||||
{
|
||||
esp_mqtt_topic_t user_topic = {.filter = topic, .qos = qos};
|
||||
return esp_mqtt_client_subscribe_multiple(client, &user_topic, 1);
|
||||
}
|
||||
|
||||
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
|
||||
@ -1895,14 +1897,13 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
||||
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
|
||||
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_count ++;
|
||||
if (!mqtt_enqueue(client)) {
|
||||
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
}
|
||||
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
|
||||
MQTT_API_UNLOCK(client);
|
||||
return -1;
|
||||
@ -1944,15 +1945,14 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_id = pending_msg_id;
|
||||
client->mqtt_state.pending_publish_qos = qos;
|
||||
client->mqtt_state.pending_msg_count ++;
|
||||
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
||||
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
||||
if (!mqtt_enqueue(client)) {
|
||||
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||
if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
|
||||
if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
|
||||
return -1;
|
||||
}
|
||||
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||
@ -2022,7 +2022,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
|
||||
while (sending) {
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
if (esp_mqtt_write(client) != ESP_OK) {
|
||||
esp_mqtt_abort_connection(client);
|
||||
ret = -1;
|
||||
goto cannot_publish;
|
||||
@ -2056,6 +2056,11 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
||||
}
|
||||
|
||||
if (qos > 0) {
|
||||
#ifdef MQTT_PROTOCOL_5
|
||||
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
|
||||
esp_mqtt5_increment_packet_counter(client);
|
||||
}
|
||||
#endif
|
||||
//Tick is set after transmit to avoid retransmitting too early due slow network speed / big messages
|
||||
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
|
||||
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
|
||||
|
Reference in New Issue
Block a user