Merge branch 'feature/multiple_subscribe' into 'master'

Feature:  Enable SUBSCRIBE to multiple topics

See merge request espressif/esp-mqtt!156
This commit is contained in:
Rocha Euripedes
2023-02-08 15:35:31 +08:00
6 changed files with 251 additions and 177 deletions

View File

@ -32,7 +32,7 @@ typedef struct esp_mqtt_client *esp_mqtt_client_handle_t;
* @brief *MQTT* event types. * @brief *MQTT* event types.
* *
* User event handler receives context data in `esp_mqtt_event_t` structure with * User event handler receives context data in `esp_mqtt_event_t` structure with
* - `client` - *MQTT* client handle * - client - *MQTT* client handle
* - various other data depending on event type * - various other data depending on event type
* *
*/ */
@ -223,132 +223,140 @@ typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
* character and the related len field set to 0. DER format requires a related len field set to the correct length. * character and the related len field set to 0. DER format requires a related len field set to the correct length.
*/ */
typedef struct esp_mqtt_client_config_t { typedef struct esp_mqtt_client_config_t {
/** /**
* Broker related configuration * Broker related configuration
*/ */
struct broker_t { struct broker_t {
/** /**
* Broker address * Broker address
* *
* - uri have precedence over other fields * - uri have precedence over other fields
* - If uri isn't set at least hostname, transport and port should. * - If uri isn't set at least hostname, transport and port should.
*/ */
struct address_t { struct address_t {
const char *uri; /*!< Complete *MQTT* broker URI */ const char *uri; /*!< Complete *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */ const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< Selects transport*/ esp_mqtt_transport_t transport; /*!< Selects transport*/
const char *path; /*!< Path in the URI*/ const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */ uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker address configuration */ } address; /*!< Broker address configuration */
/** /**
* Broker identity verification * Broker identity verification
* *
* If fields are not set broker's identity isn't verified. it's recommended * If fields are not set broker's identity isn't verified. it's recommended
* to set the options in this struct for security reasons. * to set the options in this struct for security reasons.
*/ */
struct verification_t { struct verification_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
documentation for details. */ documentation for details. */
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
the usage of certificate bundles. */ the usage of certificate bundles. */
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */ const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */ size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
authentication (as alternative to certificate verification). authentication (as alternative to certificate verification).
PSK is enabled only if there are no other ways to PSK is enabled only if there are no other ways to
verify broker.*/ verify broker.*/
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
security of TLS and makes the *MQTT* client susceptible to MITM attacks */ security of TLS and makes the *MQTT* client susceptible to MITM attacks */
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */ const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
} verification; /*!< Security verification of the broker */ } verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */ } broker; /*!< Broker address and security verification */
/** /**
* Client related credentials for authentication. * Client related credentials for authentication.
*/ */
struct credentials_t { struct credentials_t {
const char *username; /*!< *MQTT* username */ const char *username; /*!< *MQTT* username */
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are
last 3 bytes of MAC address in hex format */ last 3 bytes of MAC address in hex format */
bool set_null_client_id; /*!< Selects a NULL client id */ bool set_null_client_id; /*!< Selects a NULL client id */
/** /**
* Client authentication * Client authentication
* *
* Fields related to client authentication by broker * Fields related to client authentication by broker
* *
* For mutual authentication using TLS, user could select certificate and key, * For mutual authentication using TLS, user could select certificate and key,
* secure element or digital signature peripheral if available. * secure element or digital signature peripheral if available.
* *
*/ */
struct authentication_t { struct authentication_t {
const char *password; /*!< *MQTT* password */ const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
authentication is not needed. Must be provided with `key`.*/ authentication is not needed. Must be provided with `key`.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/ size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
is not needed. If it is not NULL, also `certificate` has to be provided.*/ is not needed. If it is not NULL, also `certificate` has to be provided.*/
size_t key_len; /*!< Length of the buffer pointed to by key.*/ size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
`key_password_len` must be correctly set. */ `key_password_len` must be correctly set. */
int key_password_len; /*!< Length of the password pointed to by `key_password` */ int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */ bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
available in some Espressif devices. */ available in some Espressif devices. */
} authentication; /*!< Client authentication */ } authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */ } credentials; /*!< User credentials for broker */
/** /**
* *MQTT* Session related configuration * *MQTT* Session related configuration
*/ */
struct session_t { struct session_t {
/** /**
* Last Will and Testament message configuration. * Last Will and Testament message configuration.
*/ */
struct last_will_t { struct last_will_t {
const char *topic; /*!< LWT (Last Will and Testament) message topic */ const char *topic; /*!< LWT (Last Will and Testament) message topic */
const char *msg; /*!< LWT message, may be NULL terminated*/ const char *msg; /*!< LWT message, may be NULL terminated*/
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */ int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
int qos; /*!< LWT message QoS */ int qos; /*!< LWT message QoS */
int retain; /*!< LWT retained message flag */ int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */ } last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */ bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */ int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
by default. Note: setting the config value `keepalive` to `0` doesn't disable by default. Note: setting the config value `keepalive` to `0` doesn't disable
keepalive feature, but uses a default keepalive period */ keepalive feature, but uses a default keepalive period */
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/ esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */ int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
} session; /*!< *MQTT* session configuration. */ } session; /*!< *MQTT* session configuration. */
/** /**
* Network related configuration * Network related configuration
*/ */
struct network_t { struct network_t {
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
disabled (defaults to 10s) */ disabled (defaults to 10s) */
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
(defaults to 10s). */ (defaults to 10s). */
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */ int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
`disable_auto_reconnect=true` to disable */ `disable_auto_reconnect=true` to disable */
} network; /*!< Network configuration */ } network; /*!< Network configuration */
/** /**
* Client task configuration * Client task configuration
*/ */
struct task_t { struct task_t {
int priority; /*!< *MQTT* task priority*/ int priority; /*!< *MQTT* task priority*/
int stack_size; /*!< *MQTT* task stack size*/ int stack_size; /*!< *MQTT* task stack size*/
} task; /*!< FreeRTOS task configuration.*/ } task; /*!< FreeRTOS task configuration.*/
/** /**
* Client buffer size configuration * Client buffer size configuration
* *
* Client have two buffers for input and output respectivelly. * Client have two buffers for input and output respectivelly.
*/ */
struct buffer_t { struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/ int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
``buffer_size`` */ ``buffer_size`` */
} buffer; /*!< Buffer size configuration.*/ } buffer; /*!< Buffer size configuration.*/
} esp_mqtt_client_config_t; } esp_mqtt_client_config_t;
/**
* Topic definition struct
*/
typedef struct topic_t {
const char *filter; /*!< Topic filter to subscribe */
int qos; /*!< Max QoS level of the subscription */
} esp_mqtt_topic_t;
/** /**
* @brief Creates *MQTT* client handle based on the configuration * @brief Creates *MQTT* client handle based on the configuration
* *
@ -417,6 +425,26 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
*/ */
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
/**
* @brief Convenience macro to select subscribe function to use.
*
* Notes:
* - Usage of `esp_mqtt_client_subscribe_single` is the same as previous
* esp_mqtt_client_subscribe, refer to it for details.
*
* @param client_handle *MQTT* client handle
* @param topic_type Needs to be char* for single subscription or `esp_mqtt_topic_t` for multiple topics
* @param qos_or_size It's either a qos when subscribing to a single topic or the size of the subscription array when subscribing to multiple topics.
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
char *: esp_mqtt_client_subscribe_single, \
esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \
)(client_handle, topic_type, qos_or_size)
/** /**
* @brief Subscribe the client to defined topic with defined qos * @brief Subscribe the client to defined topic with defined qos
* *
@ -426,23 +454,44 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
* from a *MQTT* event callback i.e. internal *MQTT* task * from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block * (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress. * if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
* *
* @param client *MQTT* client handle * @param client *MQTT* client handle
* @param topic * @param topic topic filter to subscribe
* @param qos // TODO describe parameters * @param qos Max qos level of the subscription
* *
* @return message_id of the subscribe message on success * @return message_id of the subscribe message on success
* -1 on failure * -1 on failure
*/ */
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
const char *topic, int qos); const char *topic, int qos);
/**
* @brief Subscribe the client to a list of defined topics with defined qos
*
* Notes:
* - Client must be connected to send subscribe message
* - This API is could be executed from a user task or
* from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
*
* @param client *MQTT* client handle
* @param topic_list List of topics to subscribe
* @param size size of topic_list
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size);
/** /**
* @brief Unsubscribe the client from defined topic * @brief Unsubscribe the client from defined topic
* *
* Notes: * Notes:
* - Client must be connected to send unsubscribe message * - Client must be connected to send unsubscribe message
* - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details * - It is thread safe, please refer to `esp_mqtt_client_subscribe_single` for details
* *
* @param client *MQTT* client handle * @param client *MQTT* client handle
* @param topic * @param topic

View File

@ -126,7 +126,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info); mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info);
esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property); esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property);
int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length); int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property); mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property); mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info); mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info);
mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id); mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);

View File

@ -138,7 +138,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id); mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id); mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id); mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id); mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull));
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id); mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id);
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);

View File

@ -1,5 +1,6 @@
#include <string.h> #include <string.h>
#include "mqtt5_msg.h" #include "mqtt5_msg.h"
#include "mqtt_client.h"
#include "mqtt_config.h" #include "mqtt_config.h"
#include "platform.h" #include "platform.h"
#include "esp_log.h" #include "esp_log.h"
@ -764,7 +765,7 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
} }
snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info); snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info);
if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) { if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__); ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(response_topic); free(response_topic);
return fail_message(connection); return fail_message(connection);
} }
@ -849,14 +850,10 @@ int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length)
return -1; return -1;
} }
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property) mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic_list, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
{ {
init_message(connection); init_message(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
if ((*message_id = append_message_id(connection, 0)) == 0) { if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection); return fail_message(connection);
} }
@ -877,41 +874,47 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *t
} }
} }
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
if (property && property->is_share_subscribe) {
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name); for (int topic_number = 0; topic_number < size; ++topic_number) {
char *shared_topic = calloc(1, shared_topic_size); if (topic_list[topic_number].filter[0] == '\0') {
if (!shared_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
fail_message(connection);
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
free(shared_topic);
return fail_message(connection); return fail_message(connection);
} }
free(shared_topic); if (property && property->is_share_subscribe) {
} else { uint16_t shared_topic_size = strlen(topic_list[topic_number].filter) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
APPEND_CHECK(append_property(connection, 0, 2, topic, strlen(topic)), fail_message(connection)); char *shared_topic = calloc(1, shared_topic_size);
} if (!shared_topic) {
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
fail_message(connection);
}
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic_list[topic_number].filter);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(shared_topic);
return fail_message(connection);
}
free(shared_topic);
} else {
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
}
if (connection->message.length + 1 > connection->buffer_length) { if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection); return fail_message(connection);
}
connection->buffer[connection->message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3);
connection->message.length ++;
} }
connection->buffer[connection->message.length] = 0;
if (property) {
if (property->retain_handle > 0 && property->retain_handle < 3) {
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
}
if (property->no_local_flag) {
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
}
if (property->retain_as_published_flag) {
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
}
}
connection->buffer[connection->message.length] |= (qos & 3);
connection->message.length ++;
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
} }
@ -975,7 +978,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
} }
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic); snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) { if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__); ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
free(shared_topic); free(shared_topic);
return fail_message(connection); return fail_message(connection);
} }

View File

@ -29,6 +29,7 @@
* *
*/ */
#include <string.h> #include <string.h>
#include "mqtt_client.h"
#include "mqtt_msg.h" #include "mqtt_msg.h"
#include "mqtt_config.h" #include "mqtt_config.h"
#include "platform.h" #include "platform.h"
@ -518,26 +519,29 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
} }
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id) mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
{ {
init_message(connection); init_message(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}
if ((*message_id = append_message_id(connection, 0)) == 0) { if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection); return fail_message(connection);
} }
if (append_string(connection, topic, strlen(topic)) < 0) { for (int topic_number = 0; topic_number < size; ++topic_number) {
return fail_message(connection); if (topic_list[topic_number].filter[0] == '\0') {
} return fail_message(connection);
}
if (connection->message.length + 1 > connection->buffer_length) { if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) {
return fail_message(connection); return fail_message(connection);
}
if (connection->message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}
connection->buffer[connection->message.length] = topic_list[topic_number].qos;
connection->message.length ++;
} }
connection->buffer[connection->message.length++] = qos;
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
} }

View File

@ -1101,8 +1101,11 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE; client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE;
client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED; client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED;
// post data event // post data event
if ((uint8_t)*msg_data == 0x80) { for (int topic = 0; topic < msg_data_len; ++topic) {
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED; if ((uint8_t)msg_data[topic] == 0x80) {
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
break;
}
} }
client->event.data_len = msg_data_len; client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len; client->event.total_data_len = msg_data_len;
@ -1114,7 +1117,9 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
return ESP_OK; return ESP_OK;
} }
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) // Deletes the initial message in MQTT communication protocol
// Return false when message is not found, making the received counterpart invalid.
static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
{ {
ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count); ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count);
if (client->mqtt_state.pending_msg_count == 0) { if (client->mqtt_state.pending_msg_count == 0) {
@ -1311,7 +1316,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
switch (msg_type) { switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK: case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { if (remove_initiator_message(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
#ifdef MQTT_PROTOCOL_5 #ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_suback(client); esp_mqtt5_parse_suback(client);
#endif #endif
@ -1323,7 +1328,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
} }
break; break;
case MQTT_MSG_TYPE_UNSUBACK: case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { if (remove_initiator_message(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
#ifdef MQTT_PROTOCOL_5 #ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_unsuback(client); esp_mqtt5_parse_unsuback(client);
#endif #endif
@ -1375,7 +1380,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
esp_mqtt5_decrement_packet_counter(client); esp_mqtt5_decrement_packet_counter(client);
} }
#endif #endif
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
#ifdef MQTT_PROTOCOL_5 #ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_puback(client); esp_mqtt5_parse_puback(client);
@ -1426,7 +1431,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
esp_mqtt5_decrement_packet_counter(client); esp_mqtt5_decrement_packet_counter(client);
} }
#endif #endif
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
#ifdef MQTT_PROTOCOL_5 #ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_pubcomp(client); esp_mqtt5_parse_pubcomp(client);
@ -1820,7 +1825,8 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
return ESP_OK; return ESP_OK;
} }
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos) int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size)
{ {
if (!client) { if (!client) {
ESP_LOGE(TAG, "Client was not initialized"); ESP_LOGE(TAG, "Client was not initialized");
@ -1834,13 +1840,19 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
} }
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5 #ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_client_subscribe_check(client, qos) != ESP_OK) { int max_qos = topic_list[0].qos;
ESP_LOGI(TAG, "MQTT5 subscribe check fail"); for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].qos > max_qos) {
max_qos = topic_list[topic_number].qos;
}
}
if (esp_mqtt5_client_subscribe_check(client, max_qos) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 subscribe check fail: QoS %d not accepted by broker ", max_qos);
MQTT_API_UNLOCK(client); MQTT_API_UNLOCK(client);
return -1; return -1;
} }
client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection, client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos, topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
if (client->mqtt_state.outbound_message->length) { if (client->mqtt_state.outbound_message->length) {
client->mqtt5_config->subscribe_property_info = NULL; client->mqtt5_config->subscribe_property_info = NULL;
@ -1848,7 +1860,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
#endif #endif
} else { } else {
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos, topic_list, size,
&client->mqtt_state.pending_msg_id); &client->mqtt_state.pending_msg_id);
} }
if (client->mqtt_state.outbound_message->length == 0) { if (client->mqtt_state.outbound_message->length == 0) {
@ -1867,14 +1879,20 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
if (mqtt_write_data(client) != ESP_OK) { if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos);
MQTT_API_UNLOCK(client); MQTT_API_UNLOCK(client);
return -1; return -1;
} }
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); ESP_LOGD(TAG, "Sent subscribe, first topic=%s, id: %d", topic_list[0].filter, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK(client); MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id; return client->mqtt_state.pending_msg_id;
}
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos)
{
esp_mqtt_topic_t user_topic = {.filter = topic, .qos = qos};
return esp_mqtt_client_subscribe_multiple(client, &user_topic, 1);
} }
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic) int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)