Files
esp-mqtt/mqtt_client.c

2392 lines
98 KiB
C
Raw Normal View History

#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include "esp_err.h"
#include "esp_log.h"
#include "esp_heap_caps.h"
#include "esp_transport.h"
#include "mqtt_client.h"
#include "mqtt_client_priv.h"
#include "mqtt_msg.h"
#include "mqtt_outbox.h"
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
#ifdef ESP_EVENT_ANY_ID
_Static_assert(MQTT_EVENT_ANY == ESP_EVENT_ANY_ID, "mqtt-client event enum does not match the global EVENT_ANY_ID");
#endif
static const char *TAG = "mqtt_client";
2018-02-16 02:40:16 +07:00
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
/**
* @brief Define of MQTT Event base
*
*/
ESP_EVENT_DEFINE_BASE(MQTT_EVENTS);
#endif
const static int STOPPED_BIT = (1 << 0);
const static int RECONNECT_BIT = (1 << 1);
const static int DISCONNECT_BIT = (1 << 2);
2018-02-16 02:40:16 +07:00
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client);
2018-02-16 02:40:16 +07:00
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms);
static void esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
static char *create_string(const char *ptr, int len);
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms);
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client);
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client);
2018-02-16 02:40:16 +07:00
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
/**
* @brief Processes error reported from transport layer (considering the message read status)
*
* @param err: Error reported from TCP transport
* @param client: MQTT client handle
* @param mid_message: True if the error occured when reading incomplete message
*
* @return - 0 on Timeout
* - -1 on Timeout with incomplete message
* - -2 on Error or EOF
*/
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client, bool mid_message)
{
if (err == ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT) {
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
if (mid_message) {
// No error message, because we could've read with timeout 0 (caller will decide)
return -1;
}
// Not an error, continue
ESP_LOGV(TAG, "%s: transport_read(): call timed out before data was ready!", __func__);
return 0;
}
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
ESP_LOGE(TAG, "%s: transport_read(): EOF", __func__);
}
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
esp_mqtt_client_dispatch_transport_error(client);
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
return -2;
}
#if MQTT_ENABLE_SSL
enum esp_mqtt_ssl_cert_key_api {
MQTT_SSL_DATA_API_CA_CERT,
MQTT_SSL_DATA_API_CLIENT_CERT,
MQTT_SSL_DATA_API_CLIENT_KEY,
MQTT_SSL_DATA_API_MAX,
};
static esp_err_t esp_mqtt_set_cert_key_data(esp_transport_handle_t ssl, enum esp_mqtt_ssl_cert_key_api what, const char *cert_key_data, int cert_key_len)
{
char *data = (char *)cert_key_data;
int ssl_transport_api_id = what;
int len = cert_key_len;
if (!data) {
return ESP_OK;
}
if (len == 0) {
// if length not specified, expect 0-terminated PEM string
// and the original transport_api_id (by convention after the last api_id in the enum)
ssl_transport_api_id += MQTT_SSL_DATA_API_MAX;
len = strlen(data);
}
#ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
else {
ESP_LOGE(TAG, "Explicit cert-/key-len is not available in IDF version %s", IDF_VER);
return ESP_ERR_NOT_SUPPORTED;
}
#endif
// option to force the cert/key config to null (i.e. skip validation) when existing config updates
if (0 == strcmp(data, "NULL")) {
data = NULL;
len = 0;
}
switch (ssl_transport_api_id) {
#ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
case MQTT_SSL_DATA_API_CA_CERT:
esp_transport_ssl_set_cert_data_der(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_CERT:
esp_transport_ssl_set_client_cert_data_der(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_KEY:
esp_transport_ssl_set_client_key_data_der(ssl, data, len);
break;
#endif
case MQTT_SSL_DATA_API_CA_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_cert_data(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_cert_data(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_KEY + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_key_data(ssl, data, len);
break;
default:
return ESP_ERR_INVALID_ARG;
}
return ESP_OK;
}
static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle_t transport_list, mqtt_config_storage_t *cfg)
{
esp_transport_handle_t ssl = esp_transport_list_get_transport(transport_list, MQTT_OVER_SSL_SCHEME);
if (cfg->use_global_ca_store == true) {
esp_transport_ssl_enable_global_ca_store(ssl);
} else if (cfg->crt_bundle_attach != NULL) {
#ifdef MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
#ifdef CONFIG_MBEDTLS_CERTIFICATE_BUNDLE
esp_transport_ssl_crt_bundle_attach(ssl, cfg->crt_bundle_attach);
#else
ESP_LOGE(TAG, "Certificate bundle is not enabled for mbedTLS in menuconfig");
goto esp_mqtt_set_transport_failed;
#endif /* CONFIG_MBEDTLS_CERTIFICATE_BUNDLE */
#else
ESP_LOGE(TAG, "Certificate bundle feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif /* MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE */
} else {
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes),
goto esp_mqtt_set_transport_failed);
}
if(cfg->ciphersuites_list)
{
#if defined(MQTT_SUPPORTED_FEATURE_CIPHERSUITES_LIST)
esp_transport_ssl_set_ciphersuites_list(ssl,cfg->ciphersuites_list);
#else
ESP_LOGE(TAG, "Cipher suites list feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif /* MQTT_SUPPORTED_FEATURE_CIPHERSUITES_LIST */
}
if (cfg->psk_hint_key) {
#if defined(MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION) && MQTT_ENABLE_SSL
#ifdef CONFIG_ESP_TLS_PSK_VERIFICATION
esp_transport_ssl_set_psk_key_hint(ssl, cfg->psk_hint_key);
#else
ESP_LOGE(TAG, "PSK authentication configured but not enabled in menuconfig: Please enable ESP_TLS_PSK_VERIFICATION option");
goto esp_mqtt_set_transport_failed;
#endif
#else
ESP_LOGE(TAG, "PSK authentication is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->alpn_protos) {
#if defined(MQTT_SUPPORTED_FEATURE_ALPN) && MQTT_ENABLE_SSL
#if defined(CONFIG_MBEDTLS_SSL_ALPN) || defined(CONFIG_WOLFSSL_HAVE_ALPN)
esp_transport_ssl_set_alpn_protocol(ssl, (const char **)cfg->alpn_protos);
#else
ESP_LOGE(TAG, "APLN configured but not enabled in menuconfig: Please enable MBEDTLS_SSL_ALPN or WOLFSSL_HAVE_ALPN option");
goto esp_mqtt_set_transport_failed;
#endif
#else
ESP_LOGE(TAG, "APLN is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->skip_cert_common_name_check) {
#if defined(MQTT_SUPPORTED_FEATURE_SKIP_CRT_CMN_NAME_CHECK) && MQTT_ENABLE_SSL
esp_transport_ssl_skip_common_name_check(ssl);
#else
ESP_LOGE(TAG, "Skip certificate common name check is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->common_name) {
#if defined(MQTT_SUPPORTED_FEATURE_CRT_CMN_NAME) && MQTT_ENABLE_SSL
esp_transport_ssl_set_common_name(ssl, cfg->common_name);
#else
ESP_LOGE(TAG, "Setting expected certificate common name is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->use_secure_element) {
#ifdef MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT
#ifdef CONFIG_ESP_TLS_USE_SECURE_ELEMENT
esp_transport_ssl_use_secure_element(ssl);
#else
ESP_LOGE(TAG, "Secure element not enabled for esp-tls in menuconfig");
goto esp_mqtt_set_transport_failed;
#endif /* CONFIG_ESP_TLS_USE_SECURE_ELEMENT */
#else
ESP_LOGE(TAG, "Secure element feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif /* MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT */
}
if (cfg->ds_data != NULL) {
#ifdef MQTT_SUPPORTED_FEATURE_DIGITAL_SIGNATURE
#ifdef CONFIG_ESP_TLS_USE_DS_PERIPHERAL
esp_transport_ssl_set_ds_data(ssl, cfg->ds_data);
#else
ESP_LOGE(TAG, "Digital Signature not enabled for esp-tls in menuconfig");
goto esp_mqtt_set_transport_failed;
#endif /* CONFIG_ESP_TLS_USE_DS_PERIPHERAL */
#else
ESP_LOGE(TAG, "Digital Signature feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->use_ecdsa_peripheral) {
#ifdef MQTT_SUPPORTED_FEATURE_ECDSA_PERIPHERAL
#ifdef CONFIG_MBEDTLS_HARDWARE_ECDSA_SIGN
esp_transport_ssl_set_client_key_ecdsa_peripheral(ssl, cfg->ecdsa_key_efuse_blk);
#else
ESP_LOGE(TAG, "ECDSA peripheral not enabled for esp-tls in menuconfig");
goto esp_mqtt_set_transport_failed;
#endif /* CONFIG_MBEDTLS_HARDWARE_ECDSA_SIGN */
#else
ESP_LOGE(TAG, "ECDSA peripheral feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif /* MQTT_SUPPORTED_FEATURE_ECDSA_PERIPHERAL */
}
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_CERT, cfg->clientcert_buf, cfg->clientcert_bytes),
goto esp_mqtt_set_transport_failed);
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_KEY, cfg->clientkey_buf, cfg->clientkey_bytes),
goto esp_mqtt_set_transport_failed);
if (cfg->clientkey_password && cfg->clientkey_password_len) {
#if defined(MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD) && MQTT_ENABLE_SSL
esp_transport_ssl_set_client_key_password(ssl,
cfg->clientkey_password,
cfg->clientkey_password_len);
#else
ESP_LOGE(TAG, "Password protected keys are not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
return ESP_OK;
esp_mqtt_set_transport_failed:
return ESP_FAIL;
}
#endif // MQTT_ENABLE_SSL
/* Checks if the user supplied config values are internally consistent */
static esp_err_t esp_mqtt_check_cfg_conflict(const mqtt_config_storage_t *cfg, const esp_mqtt_client_config_t *user_cfg)
{
if (cfg == NULL || user_cfg == NULL) {
ESP_LOGE(TAG, "Invalid configuration");
return ESP_ERR_INVALID_ARG;
}
esp_err_t ret = ESP_OK;
bool ssl_cfg_enabled = cfg->use_global_ca_store || cfg->cacert_buf || cfg->clientcert_buf || cfg->psk_hint_key || cfg->alpn_protos;
bool is_ssl_scheme = false;
if (cfg->scheme) {
is_ssl_scheme = (strncasecmp(cfg->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(cfg->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0);
}
if (!is_ssl_scheme && ssl_cfg_enabled) {
if (cfg->uri) {
ESP_LOGW(TAG, "SSL related configs set, but the URI scheme specifies a non-SSL scheme, scheme = %s", cfg->scheme);
} else {
ESP_LOGW(TAG, "SSL related configs set, but the transport protocol is a non-SSL scheme, transport = %d", user_cfg->broker.address.transport);
}
ret = ESP_ERR_INVALID_ARG;
}
if (cfg->uri && user_cfg->broker.address.transport) {
ESP_LOGW(TAG, "Transport config set, but overridden by scheme from URI: transport = %d, uri scheme = %s", user_cfg->broker.address.transport, cfg->scheme);
ret = ESP_ERR_INVALID_ARG;
}
return ret;
}
2022-05-20 11:55:28 +08:00
bool esp_mqtt_set_if_config(char const *const new_config, char **old_config)
{
if (new_config) {
free(*old_config);
*old_config = strdup(new_config);
if (*old_config == NULL) {
return false;
}
}
return true;
}
static esp_err_t esp_mqtt_client_create_transport(esp_mqtt_client_handle_t client)
{
esp_err_t ret = ESP_OK;
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
client->transport_list = NULL;
}
if (client->config->scheme) {
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, return ESP_ERR_NO_MEM);
if ((strncasecmp(client->config->scheme, MQTT_OVER_TCP_SCHEME, sizeof(MQTT_OVER_TCP_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0)) {
esp_transport_handle_t tcp = esp_transport_tcp_init();
ESP_MEM_CHECK(TAG, tcp, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
if (client->config->if_name) {
esp_transport_tcp_set_interface_name(tcp, client->config->if_name);
}
esp_transport_list_add(client->transport_list, tcp, MQTT_OVER_TCP_SCHEME);
if (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
ESP_MEM_CHECK(TAG, ws, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(ws, MQTT_WS_DEFAULT_PORT);
if (client->config->path) {
esp_transport_ws_set_path(ws, client->config->path);
}
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(ws, MQTT_OVER_TCP_SCHEME);
#endif
esp_transport_list_add(client->transport_list, ws, MQTT_OVER_WS_SCHEME);
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_WS to use %s", client->config->scheme);
ret = ESP_FAIL;
#endif
}
} else if ((strncasecmp(client->config->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0)) {
#if MQTT_ENABLE_SSL
esp_transport_handle_t ssl = esp_transport_ssl_init();
ESP_MEM_CHECK(TAG, ssl, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
if (client->config->if_name) {
esp_transport_ssl_set_interface_name(ssl, client->config->if_name);
}
esp_transport_list_add(client->transport_list, ssl, MQTT_OVER_SSL_SCHEME);
if (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t wss = esp_transport_ws_init(ssl);
ESP_MEM_CHECK(TAG, wss, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT);
if (client->config->path) {
esp_transport_ws_set_path(wss, client->config->path);
}
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(wss, MQTT_OVER_TCP_SCHEME);
#endif
esp_transport_list_add(client->transport_list, wss, MQTT_OVER_WSS_SCHEME);
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_WS to use %s", client->config->scheme);
ret = ESP_FAIL;
#endif
}
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_SSL to use %s", client->config->scheme);
ret = ESP_FAIL;
#endif
} else {
ESP_LOGE(TAG, "Not support this mqtt scheme %s", client->config->scheme);
ret = ESP_FAIL;
}
} else {
ESP_LOGE(TAG, "No scheme found");
ret = ESP_FAIL;
}
return ret;
}
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
2018-02-16 02:40:16 +07:00
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK(client);
2018-02-16 02:40:16 +07:00
//Copy user configurations to client context
esp_err_t err = ESP_OK;
if (!client->config) {
client->config = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, client->config, {
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
});
}
2018-02-16 02:40:16 +07:00
mqtt_msg_buffer_destroy(&client->mqtt_state.connection);
int buffer_size = config->buffer.size;
if (buffer_size <= 0) {
buffer_size = MQTT_BUFFER_SIZE_BYTE;
}
// use separate value for output buffer size if configured
int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size;
if (mqtt_msg_buffer_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) {
goto _mqtt_set_config_failed;
}
free(client->mqtt_state.in_buffer);
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_set_config_failed);
client->mqtt_state.in_buffer_length = buffer_size;
client->config->message_retransmit_timeout = config->session.message_retransmit_timeout;
if (config->session.message_retransmit_timeout <= 0) {
client->config->message_retransmit_timeout = MQTT_DEFAULT_RETRANSMIT_TIMEOUT_MS;
}
client->config->task_prio = config->task.priority;
if (client->config->task_prio <= 0) {
client->config->task_prio = MQTT_TASK_PRIORITY;
}
client->config->task_stack = config->task.stack_size;
if (client->config->task_stack <= 0) {
client->config->task_stack = MQTT_TASK_STACK;
2018-02-16 02:40:16 +07:00
}
if (config->broker.address.port) {
client->config->port = config->broker.address.port;
2018-02-16 02:40:16 +07:00
}
2024-08-06 10:28:29 +02:00
if (config->network.tcp_keep_alive_cfg.keep_alive_enable) {
client->config->tcp_keep_alive_cfg = config->network.tcp_keep_alive_cfg;
}
err = ESP_ERR_NO_MEM;
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.hostname, &client->config->host), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.path, &client->config->path), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->mqtt_state.connection.information.username), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->mqtt_state.connection.information.password), goto _mqtt_set_config_failed);
if (!config->credentials.set_null_client_id) {
if (config->credentials.client_id) {
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->mqtt_state.connection.information.client_id), goto _mqtt_set_config_failed);
} else if (client->mqtt_state.connection.information.client_id == NULL) {
client->mqtt_state.connection.information.client_id = platform_create_id_string();
}
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.client_id, goto _mqtt_set_config_failed);
ESP_LOGD(TAG, "MQTT client_id=%s", client->mqtt_state.connection.information.client_id);
2018-02-16 02:40:16 +07:00
}
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.uri, &client->config->uri), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->mqtt_state.connection.information.will_topic), goto _mqtt_set_config_failed);
2018-02-16 02:40:16 +07:00
if (config->session.last_will.msg_len && config->session.last_will.msg) {
free(client->mqtt_state.connection.information.will_message);
client->mqtt_state.connection.information.will_message = malloc(config->session.last_will.msg_len);
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
memcpy(client->mqtt_state.connection.information.will_message, config->session.last_will.msg, config->session.last_will.msg_len);
client->mqtt_state.connection.information.will_length = config->session.last_will.msg_len;
} else if (config->session.last_will.msg) {
free(client->mqtt_state.connection.information.will_message);
client->mqtt_state.connection.information.will_message = strdup(config->session.last_will.msg);
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
client->mqtt_state.connection.information.will_length = strlen(config->session.last_will.msg);
2018-02-16 02:40:16 +07:00
}
if (config->session.last_will.qos) {
client->mqtt_state.connection.information.will_qos = config->session.last_will.qos;
}
if (config->session.last_will.retain) {
client->mqtt_state.connection.information.will_retain = config->session.last_will.retain;
}
2018-02-16 02:40:16 +07:00
if (config->session.disable_clean_session == client->mqtt_state.connection.information.clean_session) {
client->mqtt_state.connection.information.clean_session = !config->session.disable_clean_session;
if (!client->mqtt_state.connection.information.clean_session && config->credentials.set_null_client_id) {
ESP_LOGE(TAG, "Clean Session flag must be true if client has a null id");
}
}
if (config->session.keepalive) {
client->mqtt_state.connection.information.keepalive = config->session.keepalive;
2018-02-16 18:46:13 +07:00
}
if (client->mqtt_state.connection.information.keepalive == 0) {
client->mqtt_state.connection.information.keepalive = MQTT_KEEPALIVE_TICK;
2018-02-16 02:40:16 +07:00
}
if (config->session.disable_keepalive) {
// internal `keepalive` value (in connect_info) is in line with 3.1.2.10 Keep Alive from mqtt spec:
// * keepalive=0: Keep alive mechanism disabled (server not to disconnect the client on its inactivity)
// * period in seconds to send a Control packet if inactive
client->mqtt_state.connection.information.keepalive = 0;
}
if (config->session.protocol_ver) {
client->mqtt_state.connection.information.protocol_ver = config->session.protocol_ver;
}
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_UNDEFINED) {
#ifdef MQTT_PROTOCOL_311
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
#else
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1;
2022-02-25 10:12:30 +08:00
#endif
} else if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifndef MQTT_PROTOCOL_5
ESP_LOGE(TAG, "Please first enable MQTT_PROTOCOL_5 feature in menuconfig");
goto _mqtt_set_config_failed;
#endif
}
client->config->network_timeout_ms = config->network.timeout_ms;
if (client->config->network_timeout_ms <= 0) {
client->config->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS;
}
if (config->network.refresh_connection_after_ms) {
client->config->refresh_connection_after_ms = config->network.refresh_connection_after_ms;
}
client->config->auto_reconnect = true;
if (config->network.disable_auto_reconnect == client->config->auto_reconnect) {
client->config->auto_reconnect = !config->network.disable_auto_reconnect;
}
if (config->network.reconnect_timeout_ms) {
client->config->reconnect_timeout_ms = config->network.reconnect_timeout_ms;
} else {
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
}
client->config->transport = config->network.transport;
if (config->network.if_name) {
client->config->if_name = calloc(1, sizeof(struct ifreq) + 1);
ESP_MEM_CHECK(TAG, client->config->if_name, goto _mqtt_set_config_failed);
memcpy(client->config->if_name, config->network.if_name, sizeof(struct ifreq));
}
if (config->broker.verification.alpn_protos) {
for (int i = 0; i < client->config->num_alpn_protos; i++) {
free(client->config->alpn_protos[i]);
}
free(client->config->alpn_protos);
client->config->num_alpn_protos = 0;
const char **p;
for (p = config->broker.verification.alpn_protos; *p != NULL; p++ ) {
client->config->num_alpn_protos++;
}
// mbedTLS expects the list to be null-terminated
client->config->alpn_protos = calloc(client->config->num_alpn_protos + 1, sizeof(*config->broker.verification.alpn_protos));
ESP_MEM_CHECK(TAG, client->config->alpn_protos, goto _mqtt_set_config_failed);
for (int i = 0; i < client->config->num_alpn_protos; i++) {
client->config->alpn_protos[i] = strdup(config->broker.verification.alpn_protos[i]);
ESP_MEM_CHECK(TAG, client->config->alpn_protos[i], goto _mqtt_set_config_failed);
}
}
// configure ssl related parameters
client->config->use_global_ca_store = config->broker.verification.use_global_ca_store;
client->config->cacert_buf = config->broker.verification.certificate;
client->config->cacert_bytes = config->broker.verification.certificate_len;
client->config->psk_hint_key = config->broker.verification.psk_hint_key;
client->config->crt_bundle_attach = config->broker.verification.crt_bundle_attach;
client->config->ciphersuites_list = config->broker.verification.ciphersuites_list;
client->config->clientcert_buf = config->credentials.authentication.certificate;
client->config->clientcert_bytes = config->credentials.authentication.certificate_len;
client->config->clientkey_buf = config->credentials.authentication.key;
client->config->clientkey_bytes = config->credentials.authentication.key_len;
client->config->skip_cert_common_name_check = config->broker.verification.skip_cert_common_name_check;
client->config->common_name = config->broker.verification.common_name;
client->config->use_secure_element = config->credentials.authentication.use_secure_element;
client->config->ds_data = config->credentials.authentication.ds_data;
client->config->use_ecdsa_peripheral = config->credentials.authentication.use_ecdsa_peripheral;
client->config->ecdsa_key_efuse_blk = config->credentials.authentication.ecdsa_key_efuse_blk;
if (config->credentials.authentication.key_password && config->credentials.authentication.key_password_len) {
client->config->clientkey_password_len = config->credentials.authentication.key_password_len;
client->config->clientkey_password = malloc(client->config->clientkey_password_len);
ESP_MEM_CHECK(TAG, client->config->clientkey_password, goto _mqtt_set_config_failed);
memcpy(client->config->clientkey_password, config->credentials.authentication.key_password, client->config->clientkey_password_len);
}
if (config->broker.address.transport) {
free(client->config->scheme);
client->config->scheme = NULL;
if (config->broker.address.transport == MQTT_TRANSPORT_OVER_TCP) {
client->config->scheme = create_string(MQTT_OVER_TCP_SCHEME, strlen(MQTT_OVER_TCP_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#if MQTT_ENABLE_WS
else if (config->broker.address.transport == MQTT_TRANSPORT_OVER_WS) {
client->config->scheme = create_string(MQTT_OVER_WS_SCHEME, strlen(MQTT_OVER_WS_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
#if MQTT_ENABLE_SSL
else if (config->broker.address.transport == MQTT_TRANSPORT_OVER_SSL) {
client->config->scheme = create_string(MQTT_OVER_SSL_SCHEME, strlen(MQTT_OVER_SSL_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
#if MQTT_ENABLE_WSS
else if (config->broker.address.transport == MQTT_TRANSPORT_OVER_WSS) {
client->config->scheme = create_string(MQTT_OVER_WSS_SCHEME, strlen(MQTT_OVER_WSS_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
}
// Set uri at the end of config to override separately configured uri elements
if (config->broker.address.uri) {
if (esp_mqtt_client_set_uri(client, client->config->uri) != ESP_OK) {
err = ESP_FAIL;
goto _mqtt_set_config_failed;
}
}
client->config->outbox_limit = config->outbox.limit;
esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict(client->config, config);
MQTT_API_UNLOCK(client);
return config_has_conflict;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
MQTT_API_UNLOCK(client);
return err;
2018-02-16 02:40:16 +07:00
}
2022-05-20 11:55:28 +08:00
void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
2018-02-16 02:40:16 +07:00
{
if (client->config == NULL) {
return;
}
free(client->mqtt_state.in_buffer);
mqtt_msg_buffer_destroy(&client->mqtt_state.connection);
free(client->config->host);
free(client->config->uri);
free(client->config->path);
free(client->config->scheme);
for (int i = 0; i < client->config->num_alpn_protos; i++) {
free(client->config->alpn_protos[i]);
}
free(client->config->alpn_protos);
free(client->config->clientkey_password);
free(client->config->if_name);
free(client->mqtt_state.connection.information.will_topic);
free(client->mqtt_state.connection.information.will_message);
free(client->mqtt_state.connection.information.client_id);
free(client->mqtt_state.connection.information.username);
free(client->mqtt_state.connection.information.password);
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_destory(client);
#endif
memset(&client->mqtt_state.connection.information, 0, sizeof(mqtt_connect_info_t));
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if (client->config->event_loop_handle) {
esp_event_loop_delete(client->config->event_loop_handle);
}
#endif
esp_transport_destroy(client->config->transport);
memset(client->config, 0, sizeof(mqtt_config_storage_t));
2018-02-16 02:40:16 +07:00
free(client->config);
client->config = NULL;
2018-02-16 02:40:16 +07:00
}
static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout)
{
uint64_t next = last_tick + timeout;
return (int64_t)(next - platform_tick_get_ms()) <= 0;
}
static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
{
if (client->mqtt_state.connection.information.keepalive > 0) {
const uint64_t keepalive_ms = client->mqtt_state.connection.information.keepalive * 1000;
if (client->wait_for_ping_resp == true ) {
if (has_timed_out(client->keepalive_tick, keepalive_ms)) {
ESP_LOGE(TAG, "No PING_RESP, disconnected");
esp_mqtt_abort_connection(client);
client->wait_for_ping_resp = false;
return ESP_FAIL;
}
return ESP_OK;
}
if (has_timed_out(client->keepalive_tick, keepalive_ms / 2)) {
if (esp_mqtt_client_ping(client) == ESP_FAIL) {
ESP_LOGE(TAG, "Can't send ping, disconnected");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
client->wait_for_ping_resp = true;
return ESP_OK;
}
}
return ESP_OK;
}
static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
{
int wlen = 0, widx = 0, len = client->mqtt_state.connection.outbound_message.length;
while (len > 0) {
wlen = esp_transport_write(client->transport,
(char *)client->mqtt_state.connection.outbound_message.data + widx,
len,
client->config->network_timeout_ms);
if (wlen < 0) {
ESP_LOGE(TAG, "Writing failed: errno=%d", errno);
esp_mqtt_client_dispatch_transport_error(client);
return ESP_FAIL;
}
if (wlen == 0) {
ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno);
return ESP_ERR_TIMEOUT;
}
widx += wlen;
len -= wlen;
}
return ESP_OK;
}
2018-02-16 02:40:16 +07:00
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms)
{
2022-02-25 10:12:30 +08:00
int read_len, connect_rsp_code = 0;
client->wait_for_ping_resp = false;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_connect(&client->mqtt_state.connection,
&client->mqtt_state.connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);
#endif
} else {
mqtt_msg_connect(&client->mqtt_state.connection,
&client->mqtt_state.connection.information);
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Connect message cannot be created");
return ESP_FAIL;
}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.connection.outbound_message.data,
client->mqtt_state.connection.outbound_message.length);
2022-02-25 10:12:30 +08:00
#endif
} else {
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.connection.outbound_message.data,
client->mqtt_state.connection.outbound_message.length);
2022-02-25 10:12:30 +08:00
}
ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X",
2018-02-16 02:40:16 +07:00
client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
if (esp_mqtt_write(client) != ESP_OK) {
2018-02-16 02:40:16 +07:00
return ESP_FAIL;
}
client->mqtt_state.in_buffer_read_len = 0;
client->mqtt_state.message_length = 0;
/* wait configured network timeout for broker connection response */
uint64_t connack_recv_started = platform_tick_get_ms();
do {
read_len = mqtt_message_receive(client, client->config->network_timeout_ms);
} while (read_len == 0 && platform_tick_get_ms() - connack_recv_started < client->config->network_timeout_ms);
if (read_len <= 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len);
return ESP_FAIL;
2018-02-16 02:40:16 +07:00
}
if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) {
ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len);
return ESP_FAIL;
}
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
client->send_publish_packet_count = 0;
2022-02-25 10:12:30 +08:00
return ESP_OK;
}
#endif
} else {
client->mqtt_state.in_buffer_read_len = 0;
connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
if (connect_rsp_code == MQTT_CONNECTION_ACCEPTED) {
ESP_LOGD(TAG, "Connected");
return ESP_OK;
}
switch (connect_rsp_code) {
case MQTT_CONNECTION_REFUSE_PROTOCOL:
ESP_LOGW(TAG, "Connection refused, bad protocol");
break;
case MQTT_CONNECTION_REFUSE_SERVER_UNAVAILABLE:
ESP_LOGW(TAG, "Connection refused, server unavailable");
break;
case MQTT_CONNECTION_REFUSE_BAD_USERNAME:
ESP_LOGW(TAG, "Connection refused, bad username or password");
break;
case MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED:
ESP_LOGW(TAG, "Connection refused, not authorized");
break;
default:
ESP_LOGW(TAG, "Connection refused, Unknow reason");
break;
}
2018-02-16 02:40:16 +07:00
}
/* propagate event with connection refused error */
client->event.event_id = MQTT_EVENT_ERROR;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_CONNECTION_REFUSED;
client->event.error_handle->connect_return_code = connect_rsp_code;
client->event.error_handle->esp_tls_stack_err = 0;
client->event.error_handle->esp_tls_last_esp_err = 0;
client->event.error_handle->esp_tls_cert_verify_flags = 0;
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_FAIL;
2018-02-16 02:40:16 +07:00
}
static void esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
2018-02-16 02:40:16 +07:00
{
MQTT_API_LOCK(client);
esp_transport_close(client->transport);
client->wait_timeout_ms = client->config->reconnect_timeout_ms;
2018-02-16 02:46:20 +07:00
client->reconnect_tick = platform_tick_get_ms();
client->state = MQTT_STATE_WAIT_RECONNECT;
ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
2018-02-16 02:40:16 +07:00
client->event.event_id = MQTT_EVENT_DISCONNECTED;
client->wait_for_ping_resp = false;
esp_mqtt_dispatch_event_with_msgid(client);
MQTT_API_UNLOCK(client);
2018-02-16 02:40:16 +07:00
}
static bool create_client_data(esp_mqtt_client_handle_t client)
2018-02-16 02:40:16 +07:00
{
client->event.error_handle = calloc(1, sizeof(esp_mqtt_error_codes_t));
ESP_MEM_CHECK(TAG, client->event.error_handle, return false)
client->api_lock = xSemaphoreCreateRecursiveMutex();
ESP_MEM_CHECK(TAG, client->api_lock, return false);
client->outbox = outbox_init();
ESP_MEM_CHECK(TAG, client->outbox, return false);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, return false);
return true;
}
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
{
esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client),
#if MQTT_EVENT_QUEUE_SIZE > 1
// if supporting multiple queued events, we keep track of them
// using atomic variable, so need to make sure it won't get allocated in PSRAM
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
#else
MALLOC_CAP_DEFAULT);
#endif
ESP_MEM_CHECK(TAG, client, return NULL);
if (!create_client_data(client)) {
goto _mqtt_init_failed;
}
if (esp_mqtt_set_config(client, config) != ESP_OK) {
goto _mqtt_init_failed;
2018-02-16 02:40:16 +07:00
}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = MQTT_EVENT_QUEUE_SIZE,
.task_name = NULL,
};
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_init(&client->queued_events, 0);
#endif
#endif
2018-02-16 02:40:16 +07:00
client->keepalive_tick = platform_tick_get_ms();
client->reconnect_tick = platform_tick_get_ms();
client->refresh_connection_tick = platform_tick_get_ms();
client->wait_for_ping_resp = false;
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_create_default_config(client) != ESP_OK) {
goto _mqtt_init_failed;
}
#endif
2018-02-16 02:40:16 +07:00
return client;
_mqtt_init_failed:
esp_mqtt_client_destroy(client);
return NULL;
2018-02-16 02:40:16 +07:00
}
esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}
if (client->run) {
esp_mqtt_client_stop(client);
}
2018-02-16 02:40:16 +07:00
esp_mqtt_destroy_config(client);
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
}
if (client->outbox) {
outbox_destroy(client->outbox);
}
if (client->status_bits) {
vEventGroupDelete(client->status_bits);
}
if (client->api_lock) {
vSemaphoreDelete(client->api_lock);
}
free(client->event.error_handle);
2018-02-16 02:40:16 +07:00
free(client);
return ESP_OK;
}
static char *create_string(const char *ptr, int len)
{
char *ret;
if (len <= 0) {
return NULL;
}
ret = calloc(1, len + 1);
ESP_MEM_CHECK(TAG, ret, return NULL);
2018-02-16 02:40:16 +07:00
memcpy(ret, ptr, len);
return ret;
}
esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *uri)
{
struct http_parser_url puri;
http_parser_url_init(&puri);
int parser_status = http_parser_parse_url(uri, strlen(uri), 0, &puri);
if (parser_status != 0) {
ESP_LOGE(TAG, "Error parse uri = %s", uri);
return ESP_FAIL;
}
// This API could be also executed when client is active (need to protect config fields)
MQTT_API_LOCK(client);
// set uri overrides actual scheme, host, path if configured previously
2024-05-16 23:58:30 +04:00
// False-positive leak detection. TODO: GCC-366
#pragma GCC diagnostic push // TODO: IDF-10105
#if __clang__
#pragma clang diagnostic ignored "-Wunknown-warning-option"
#else
#pragma GCC diagnostic ignored "-Wpragmas"
#endif
2024-05-16 23:58:30 +04:00
#pragma GCC diagnostic ignored "-Wanalyzer-malloc-leak"
free(client->config->scheme);
free(client->config->host);
free(client->config->path);
2018-02-16 02:40:16 +07:00
client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len);
client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len);
client->config->path = NULL;
2024-05-16 23:58:30 +04:00
#pragma GCC diagnostic pop
if (puri.field_data[UF_PATH].len || puri.field_data[UF_QUERY].len) {
int asprintf_ret_value;
if (puri.field_data[UF_QUERY].len == 0) {
asprintf_ret_value = asprintf(&client->config->path,
"%.*s",
puri.field_data[UF_PATH].len, uri + puri.field_data[UF_PATH].off);
} else if (puri.field_data[UF_PATH].len == 0) {
asprintf_ret_value = asprintf(&client->config->path,
"/?%.*s",
puri.field_data[UF_QUERY].len, uri + puri.field_data[UF_QUERY].off);
} else {
asprintf_ret_value = asprintf(&client->config->path,
"%.*s?%.*s",
puri.field_data[UF_PATH].len, uri + puri.field_data[UF_PATH].off,
puri.field_data[UF_QUERY].len, uri + puri.field_data[UF_QUERY].off);
}
if (asprintf_ret_value == -1) {
ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Memory exhausted");
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
}
}
2018-02-16 02:40:16 +07:00
if (puri.field_data[UF_PORT].len) {
client->config->port = strtol((const char *)(uri + puri.field_data[UF_PORT].off), NULL, 10);
2018-02-16 02:40:16 +07:00
}
char *user_info = create_string(uri + puri.field_data[UF_USERINFO].off, puri.field_data[UF_USERINFO].len);
if (user_info) {
char *pass = strchr(user_info, ':');
if (pass) {
pass[0] = 0; //terminal username
pass ++;
client->mqtt_state.connection.information.password = strdup(pass);
2018-02-16 02:40:16 +07:00
}
client->mqtt_state.connection.information.username = strdup(user_info);
2018-02-16 02:40:16 +07:00
free(user_info);
}
MQTT_API_UNLOCK(client);
2018-02-16 02:40:16 +07:00
return ESP_OK;
}
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
2018-02-16 02:40:16 +07:00
{
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
client->event.msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
#endif
} else {
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
}
return esp_mqtt_dispatch_event(client);
}
esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event)
{
esp_err_t ret = esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, MQTT_USER_EVENT, event, sizeof(*event), 0);
#if MQTT_EVENT_QUEUE_SIZE > 1
if (ret == ESP_OK) {
atomic_fetch_add(&client->queued_events, 1);
}
#endif
return ret;
}
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
2018-02-16 02:40:16 +07:00
client->event.client = client;
client->event.protocol_ver = client->mqtt_state.connection.information.protocol_ver;
2022-02-25 10:12:30 +08:00
esp_err_t ret = ESP_FAIL;
2018-02-16 02:40:16 +07:00
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
ret = esp_event_loop_run(client->config->event_loop_handle, 0);
#else
return ESP_FAIL;
#endif
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_delete_user_property(client->event.property->user_property);
client->event.property->user_property = NULL;
#endif
}
return ret;
}
2018-02-16 02:40:16 +07:00
static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
2018-02-16 02:40:16 +07:00
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
size_t msg_total_len = client->mqtt_state.message_length;
size_t msg_topic_len = msg_read_len;
size_t msg_data_len = msg_read_len;
size_t msg_data_offset = 0;
size_t saved_msg_topic_len = 0;
char *saved_msg_topic = NULL;
char *msg_topic = NULL;
char *msg_data = NULL;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_get_publish_data(client, msg_buf, msg_read_len, &msg_topic, &msg_topic_len, &msg_data, &msg_data_len) != ESP_OK) {
ESP_LOGE(TAG, "%s: esp_mqtt5_get_publish_data() failed", __func__);
return ESP_FAIL;
}
#endif
} else {
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
return ESP_FAIL;
}
ESP_LOGD(TAG, "%s: msg_topic_len=%"NEWLIB_NANO_COMPAT_FORMAT, __func__, NEWLIB_NANO_COMPAT_CAST(msg_topic_len));
2018-02-16 02:40:16 +07:00
2022-02-25 10:12:30 +08:00
// get payload
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
if (msg_data_len > 0 && msg_data == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__);
return ESP_FAIL;
}
}
client->event.retain = mqtt_get_retain(msg_buf);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
client->event.msg_id = mqtt5_get_id(msg_buf, msg_read_len);
#endif
} else {
client->event.msg_id = mqtt_get_id(msg_buf, msg_read_len);
}
2021-09-20 12:07:53 +02:00
client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
bool send_event = true;
while (send_event) {
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = msg_data_len > 0 ? msg_data : NULL;
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
client->event.topic = msg_topic;
client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client);
send_event = false;
if (msg_read_len < msg_total_len) {
send_event = true;
#ifdef CONFIG_MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS
if (!saved_msg_topic) {
saved_msg_topic = strndup(msg_topic, msg_topic_len);
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
saved_msg_topic_len = msg_topic_len;
}
#endif
size_t buf_len = client->mqtt_state.in_buffer_length;
msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = saved_msg_topic;
msg_topic_len = saved_msg_topic_len;
msg_data_offset += msg_data_len;
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
}
msg_data_len = ret;
msg_read_len += msg_data_len;
}
}
free(saved_msg_topic);
return ESP_OK;
2018-02-16 02:40:16 +07:00
}
static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
char *msg_data = NULL;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property);
#else
// SUBACK Using MQTT5 received but MQTT5 is disabled, This is unlikely to happen.
return ESP_FAIL;
2022-02-25 10:12:30 +08:00
#endif
} else {
msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len);
}
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Failed to acquire suback data");
return ESP_FAIL;
}
client->event.error_handle->esp_tls_stack_err = 0;
client->event.error_handle->esp_tls_last_esp_err = 0;
client->event.error_handle->esp_tls_cert_verify_flags = 0;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE;
client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED;
// post data event
for (int topic = 0; topic < msg_data_len; ++topic) {
if ((uint8_t)msg_data[topic] >= 0x80) {
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
break;
}
}
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
client->event.data = msg_data;
client->event.current_data_offset = 0;
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_OK;
}
// Deletes the initial message in MQTT communication protocol
// Return false when message is not found, making the received counterpart invalid.
static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
2018-02-16 02:40:16 +07:00
{
if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) {
ESP_LOGD(TAG, "Removed pending_id=%d", client->mqtt_state.pending_msg_id);
2018-02-16 02:40:16 +07:00
return true;
}
ESP_LOGD(TAG, "Failed to remove pending_id=%d", client->mqtt_state.pending_msg_id);
2018-02-16 02:40:16 +07:00
return false;
}
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
2018-02-16 02:40:16 +07:00
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.connection.outbound_message.data;
msg.len = client->mqtt_state.connection.outbound_message.length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
2018-02-16 02:40:16 +07:00
}
/*
* Returns:
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
* -2 in case of failure or EOF (clean connection closure)
* -1 timeout while in-the-middle of the messge
* 0 if no message has been received
* 1 if a message has been received and placed to client->mqtt_state:
* message length: client->mqtt_state.message_length
* message content: client->mqtt_state.in_buffer
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
*
*/
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
{
int read_len, total_len, fixed_header_len;
uint8_t *buf = client->mqtt_state.in_buffer + client->mqtt_state.in_buffer_read_len;
esp_transport_handle_t t = client->transport;
client->mqtt_state.message_length = 0;
if (client->mqtt_state.in_buffer_read_len == 0) {
/*
* Read first byte of the mqtt packet fixed header, it contains packet
* type and flags.
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
return esp_mqtt_handle_transport_read_error(read_len, client, false);
}
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
/*
* Verify the flags and act according to MQTT protocol: close connection
* if the flags are set incorrectly.
*/
if (!mqtt_has_valid_msg_hdr(buf, read_len)) {
ESP_LOGE(TAG, "%s: received a message with an invalid header=0x%x", __func__, *buf);
goto err;
}
buf++;
client->mqtt_state.in_buffer_read_len++;
}
if ((client->mqtt_state.in_buffer_read_len == 1) ||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
do {
/*
* Read the "remaining length" part of mqtt packet fixed header. It
* starts at second byte and spans up to 4 bytes, but we accept here
* only up to 2 bytes of remaining length, i.e. messages with
* maximal remaining length value = 16383 (maximal total message
* size of 16386 bytes).
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
buf++;
client->mqtt_state.in_buffer_read_len++;
} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
}
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
ESP_LOGD(TAG, "%s: total message length: %d (already read: %"NEWLIB_NANO_COMPAT_FORMAT")", __func__, total_len, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len));
client->mqtt_state.message_length = total_len;
if (client->mqtt_state.in_buffer_length < total_len) {
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
/*
* In case larger publish messages, we only need to read full topic, data can be split to multiple data event.
* Evaluate and correct total_len to read only publish message header, so data can be read separately
*/
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
/* read next 2 bytes - topic length to get minimum portion of publish packet */
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"NEWLIB_NANO_COMPAT_FORMAT")",
__func__, total_len, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len));
return 0;
}
}
int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8;
topic_len |= client->mqtt_state.in_buffer[fixed_header_len + 1];
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer) > 0 ? 2 : 0);
ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len);
if (client->mqtt_state.in_buffer_length < total_len) {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
} else {
total_len = client->mqtt_state.in_buffer_length;
}
/* free to continue with reading */
} else {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
}
}
if (client->mqtt_state.in_buffer_read_len < total_len) {
/* read the rest of the mqtt message */
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"NEWLIB_NANO_COMPAT_FORMAT")",
__func__, total_len, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len));
return 0;
}
}
ESP_LOGV(TAG, "%s: transport_read():%"NEWLIB_NANO_COMPAT_FORMAT" %"NEWLIB_NANO_COMPAT_FORMAT, __func__,
NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len), NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.message_length));
return 1;
err:
esp_mqtt_client_dispatch_transport_error(client);
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
return -2;
}
2018-02-16 02:40:16 +07:00
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
{
2022-02-25 10:12:30 +08:00
uint8_t msg_type = 0, msg_qos = 0;
uint16_t msg_id = 0;
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
size_t previous_in_buffer_read_len = client->mqtt_state.in_buffer_read_len;
2018-02-16 02:40:16 +07:00
/* non-blocking receive in order not to block other tasks */
int recv = mqtt_message_receive(client, 0);
client: Report failure on timeout in mid-message timeout Fixes error processing on network reads: 1) Treat EOF as an error, since the connection is closed (FIN) from the server side. If we didn't we would try to read (in the next iteration) from the same socket that has been already closed and get an error ENOTCONN. Before the fix: D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (13800) transport_base: tcp_read error, errno=Socket is not connected E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80 I (13840) esp_mqtt_demo: Last errno string (Socket is not connected) E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (13860) mqtt_client: Reconnect after 10000 ms D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128 D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008 I (12450) esp_mqtt_demo: Last errno string (Success) E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1 D (12470) mqtt_client: Reconnect after 10000 ms D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED 2) Treat timeouts in the middle of MQTT message reading as errors (if timeouted for the second time and didn't read a byte) Before the fix: D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2 D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2) D (19190) mqtt_client: mqtt_message_receive: read_len=0 D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready! E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0 E (19210) mqtt_client: MQTT connect failed D (19220) mqtt_client: Reconnect after 10000 ms D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED After the fix: D (19190) mqtt_client: mqtt_message_receive: read_len=0 E (19190) mqtt_client: Network timeout while reading MQTT message E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119 D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0 I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR I (19220) esp_mqtt_demo: Last errno string (Success) E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1 E (19240) mqtt_client: MQTT connect failed D (19240) mqtt_client: Reconnect after 10000 ms D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2 I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED (Note that the above log is from mid-message timeout of CONNECT message, which was hadled before the fix. If the mid-message timeout ocurs with for example SUBACK, the current version would repeatably resend susbscribe message) Merges https://github.com/espressif/esp-mqtt/pull/232
2023-11-10 12:57:26 +01:00
if (recv == 0) { // Timeout
return ESP_OK;
}
if (recv == -1) { // Mid-message timeout
if (previous_in_buffer_read_len == client->mqtt_state.in_buffer_read_len) {
// Report error only if didn't receive anything since previous iteration
ESP_LOGE(TAG, "%s: Network timeout while reading MQTT message", __func__);
return ESP_FAIL;
}
return ESP_OK; // Treat as standard timeout (keep reading the message)
}
if (recv < 0) { // Other error
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
2018-02-16 02:40:16 +07:00
return ESP_FAIL;
}
int read_len = client->mqtt_state.message_length;
2018-02-16 02:40:16 +07:00
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, read_len);
#endif
} else {
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, read_len);
}
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
2018-02-16 02:40:16 +07:00
switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK:
if (remove_initiator_message(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_suback(client);
#endif
ESP_LOGD(TAG, "deliver_suback, message_length_read=%"NEWLIB_NANO_COMPAT_FORMAT", message_length=%"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len), NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.message_length));
if (deliver_suback(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id);
return ESP_FAIL;
}
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (remove_initiator_message(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_unsuback(client);
#endif
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
ESP_LOGD(TAG, "deliver_publish, message_length_read=%"NEWLIB_NANO_COMPAT_FORMAT", message_length=%"NEWLIB_NANO_COMPAT_FORMAT, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len), NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.message_length));
if (deliver_publish(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
return ESP_FAIL;
}
if (msg_qos == 1 || msg_qos == 2) {
if (msg_qos == 1) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_puback(&client->mqtt_state.connection, msg_id);
2022-02-25 10:12:30 +08:00
#endif
} else {
mqtt_msg_puback(&client->mqtt_state.connection, msg_id);
}
} else if (msg_qos == 2) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_pubrec(&client->mqtt_state.connection, msg_id);
2022-02-25 10:12:30 +08:00
#endif
} else {
mqtt_msg_pubrec(&client->mqtt_state.connection, msg_id);
}
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created");
return ESP_FAIL;
2022-02-25 10:12:30 +08:00
}
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
return ESP_FAIL;
}
}
break;
case MQTT_MSG_TYPE_PUBACK:
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_puback(client);
#endif
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREC return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
mqtt5_msg_pubrel(&client->mqtt_state.connection, msg_id);
2022-02-25 10:12:30 +08:00
#endif
} else {
mqtt_msg_pubrel(&client->mqtt_state.connection, msg_id);
2022-02-25 10:12:30 +08:00
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBREL cannot be created");
return ESP_FAIL;
}
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
esp_mqtt_write(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREL return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
mqtt5_msg_pubcomp(&client->mqtt_state.connection, msg_id);
2022-02-25 10:12:30 +08:00
#endif
} else {
mqtt_msg_pubcomp(&client->mqtt_state.connection, msg_id);
2022-02-25 10:12:30 +08:00
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created");
return ESP_FAIL;
}
esp_mqtt_write(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_pubcomp(client);
#endif
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
/* It is the responsibility of the Client to ensure that the interval between Control Packets
* being sent does not exceed the Keep Alive value. In the absence of sending any other Control
* Packets, the Client MUST send a PINGREQ Packet [MQTT-3.1.2-23].
* [MQTT-3.1.2-23]
*/
client->keepalive_tick = platform_tick_get_ms();
break;
case MQTT_MSG_TYPE_DISCONNECT:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_DISCONNECT");
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
int disconnect_rsp_code;
esp_mqtt5_parse_disconnect(client, &disconnect_rsp_code);
client->event.event_id = MQTT_EVENT_DISCONNECTED;
client->event.error_handle->disconnect_return_code = disconnect_rsp_code;
esp_mqtt_dispatch_event_with_msgid(client);
}
#endif
break;
}
client->mqtt_state.in_buffer_read_len = 0;
2018-02-16 02:40:16 +07:00
return ESP_OK;
}
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
// decode queued data
client->mqtt_state.connection.outbound_message.data = outbox_item_get_data(item, &client->mqtt_state.connection.outbound_message.length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
// set duplicate flag for QoS-1 and QoS-2 messages
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) {
mqtt_set_dup(client->mqtt_state.connection.outbound_message.data);
ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id);
}
// try to resend the data
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to resend data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
return ESP_OK;
}
static esp_err_t mqtt_resend_pubrel(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
client->mqtt_state.connection.outbound_message.data = outbox_item_get_data(item, &client->mqtt_state.connection.outbound_message.length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREC return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
mqtt5_msg_pubrel(&client->mqtt_state.connection, client->mqtt_state.pending_msg_id);
#endif
} else {
mqtt_msg_pubrel(&client->mqtt_state.connection, client->mqtt_state.pending_msg_id);
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBREL cannot be created");
return ESP_FAIL;
}
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to resend data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
return ESP_OK;
}
static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
{
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
#if MQTT_REPORT_DELETED_MESSAGES
// also report the deleted items as MQTT_EVENT_DELETED events if enabled
int msg_id = 0;
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) >= 0) {
client->event.event_id = MQTT_EVENT_DELETED;
client->event.msg_id = msg_id;
if (esp_mqtt_dispatch_event(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
}
}
#else
outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
#endif
}
/**
* @brief When using multiple queued item, we'd like to reduce the poll timeout to proceed with event loop exacution
*/
static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_timeout)
{
return
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_load(&client->queued_events) > 0 ? 10 : max_timeout;
#else
max_timeout;
#endif
}
static inline void run_event_loop(esp_mqtt_client_handle_t client)
{
#if MQTT_EVENT_QUEUE_SIZE > 1
if (atomic_load(&client->queued_events) > 0) {
atomic_fetch_sub(&client->queued_events, 1);
#else
{
#endif
esp_err_t ret = esp_event_loop_run(client->config->event_loop_handle, 0);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Error in running event_loop %d", ret);
}
}
}
2018-02-16 02:40:16 +07:00
static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
uint64_t last_retransmit = 0;
outbox_tick_t msg_tick = 0;
2018-02-16 02:40:16 +07:00
client->run = true;
client->state = MQTT_STATE_INIT;
2018-02-16 22:48:22 +07:00
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
2018-02-16 02:40:16 +07:00
while (client->run) {
MQTT_API_LOCK(client);
run_event_loop(client);
// delete long pending messages
mqtt_delete_expired_messages(client);
mqtt_client_state_t state = client->state;
switch (state) {
case MQTT_STATE_DISCONNECTED:
break;
case MQTT_STATE_INIT:
xEventGroupClearBits(client->status_bits, RECONNECT_BIT | DISCONNECT_BIT);
client->transport = client->config->transport;
if (!client->transport) {
if (esp_mqtt_client_create_transport(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to create transport list");
client->run = false;
break;
}
//get transport by scheme
client->transport = esp_transport_list_get_transport(client->transport_list, client->config->scheme);
if (client->transport == NULL) {
ESP_LOGE(TAG, "There are no transports valid, stop mqtt client, config scheme = %s", client->config->scheme);
client->run = false;
break;
}
}
//default port
if (client->config->port == 0) {
client->config->port = esp_transport_get_default_port(client->transport);
}
#if MQTT_ENABLE_SSL
esp_mqtt_set_ssl_transport_properties(client->transport_list, client->config);
#endif
2018-02-16 02:40:16 +07:00
2024-08-06 10:28:29 +02:00
if(client->config->tcp_keep_alive_cfg.keep_alive_enable) {
esp_transport_tcp_set_keep_alive(client->transport, &client->config->tcp_keep_alive_cfg);
}
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
esp_mqtt_dispatch_event_with_msgid(client);
if (esp_transport_connect(client->transport,
2018-02-16 02:40:16 +07:00
client->config->host,
client->config->port,
client->config->network_timeout_ms) < 0) {
ESP_LOGE(TAG, "Error transport connect");
esp_mqtt_client_dispatch_transport_error(client);
esp_mqtt_abort_connection(client);
break;
}
ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port);
if (esp_mqtt_connect(client, client->config->network_timeout_ms) != ESP_OK) {
ESP_LOGE(TAG, "MQTT connect failed");
esp_mqtt_abort_connection(client);
break;
}
client->event.event_id = MQTT_EVENT_CONNECTED;
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
}
client->state = MQTT_STATE_CONNECTED;
esp_mqtt_dispatch_event_with_msgid(client);
client->refresh_connection_tick = platform_tick_get_ms();
client->keepalive_tick = platform_tick_get_ms();
2018-02-16 02:40:16 +07:00
break;
case MQTT_STATE_CONNECTED:
// check for disconnection request
if (xEventGroupWaitBits(client->status_bits, DISCONNECT_BIT, true, true, 0) & DISCONNECT_BIT) {
send_disconnect_msg(client); // ignore error, if clean disconnect fails, just abort the connection
esp_mqtt_abort_connection(client);
break;
}
// receive and process data
if (mqtt_process_receive(client) == ESP_FAIL) {
esp_mqtt_abort_connection(client);
2018-02-16 02:40:16 +07:00
break;
}
2018-02-16 02:40:16 +07:00
if (last_retransmit == 0) {
// connected for first time, set last_retransmit to now, avoid retransmit
last_retransmit = platform_tick_get_ms();
}
// resend all non-transmitted messages first
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
if (item) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
// delete all qos0 publish messages once we process them
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
}
}
if (client->mqtt_state.pending_publish_qos > 0) {
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
}
}
// resend other "transmitted" messages after 1s
} else if (has_timed_out(last_retransmit, client->config->message_retransmit_timeout)) {
last_retransmit = platform_tick_get_ms();
item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick);
if (item && (last_retransmit - msg_tick > client->config->message_retransmit_timeout)) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
}
}
item = outbox_dequeue(client->outbox, ACKNOWLEDGED, &msg_tick);
if (item && (last_retransmit - msg_tick > client->config->message_retransmit_timeout)) {
if (mqtt_resend_pubrel(client, item) == ESP_OK) {
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
}
2018-02-16 02:40:16 +07:00
}
}
2018-02-17 01:03:35 +07:00
if (process_keepalive(client) != ESP_OK) {
break;
}
if (client->config->refresh_connection_after_ms &&
has_timed_out(client->refresh_connection_tick, client->config->refresh_connection_after_ms)) {
ESP_LOGD(TAG, "Refreshing the connection...");
esp_mqtt_abort_connection(client);
client->state = MQTT_STATE_INIT;
}
break;
case MQTT_STATE_WAIT_RECONNECT:
if (!client->config->auto_reconnect && xEventGroupGetBits(client->status_bits)&RECONNECT_BIT) {
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
client->state = MQTT_STATE_INIT;
client->wait_timeout_ms = MQTT_RECON_DEFAULT_MS;
ESP_LOGD(TAG, "Reconnecting per user request...");
break;
} else if (client->config->auto_reconnect &&
platform_tick_get_ms() - client->reconnect_tick > client->wait_timeout_ms) {
client->state = MQTT_STATE_INIT;
client->reconnect_tick = platform_tick_get_ms();
ESP_LOGD(TAG, "Reconnecting...");
break;
}
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
max_poll_timeout(client, client->wait_timeout_ms / 2 / portTICK_PERIOD_MS));
// continue the while loop instead of break, as the mutex is unlocked
continue;
default:
ESP_LOGE(TAG, "MQTT client error, client is in an unrecoverable state.");
break;
}
MQTT_API_UNLOCK(client);
if (MQTT_STATE_CONNECTED == client->state) {
if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
esp_mqtt_abort_connection(client);
}
2018-02-16 02:40:16 +07:00
}
2018-02-16 02:40:16 +07:00
}
esp_transport_close(client->transport);
outbox_delete_all_items(client->outbox);
client->state = MQTT_STATE_DISCONNECTED;
xEventGroupSetBits(client->status_bits, STOPPED_BIT);
2018-02-16 02:40:16 +07:00
vTaskDelete(NULL);
}
esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_INIT && client->state != MQTT_STATE_DISCONNECTED) {
2018-02-16 02:40:16 +07:00
ESP_LOGE(TAG, "Client has started");
MQTT_API_UNLOCK(client);
2018-02-16 02:40:16 +07:00
return ESP_FAIL;
}
esp_err_t err = ESP_OK;
#if MQTT_CORE_SELECTION_ENABLED
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
err = ESP_FAIL;
}
#else
ESP_LOGD(TAG, "Core selection disabled");
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
err = ESP_FAIL;
}
#endif
MQTT_API_UNLOCK(client);
return err;
2018-02-16 02:40:16 +07:00
}
esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
ESP_LOGI(TAG, "Client asked to disconnect");
xEventGroupSetBits(client->status_bits, DISCONNECT_BIT);
return ESP_OK;
}
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
ESP_LOGI(TAG, "Client force reconnect requested");
if (client->state != MQTT_STATE_WAIT_RECONNECT) {
ESP_LOGD(TAG, "The client is not waiting for reconnection. Ignore the request");
return ESP_FAIL;
}
client->wait_timeout_ms = 0;
xEventGroupSetBits(client->status_bits, RECONNECT_BIT);
return ESP_OK;
}
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
{
// Notify the broker we are disconnecting
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_disconnect(&client->mqtt_state.connection, &client->mqtt5_config->disconnect_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
2022-02-25 10:12:30 +08:00
esp_mqtt5_client_delete_user_property(client->mqtt5_config->disconnect_property_info.user_property);
client->mqtt5_config->disconnect_property_info.user_property = NULL;
memset(&client->mqtt5_config->disconnect_property_info, 0, sizeof(esp_mqtt5_disconnect_property_config_t));
}
#endif
} else {
mqtt_msg_disconnect(&client->mqtt_state.connection);
2022-02-25 10:12:30 +08:00
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Disconnect message cannot be created");
return ESP_FAIL;
}
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending disconnect message");
}
return ESP_OK;
}
2018-02-16 02:40:16 +07:00
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK(client);
if (client->run) {
/* A running client cannot be stopped from the MQTT task/event handler */
TaskHandle_t running_task = xTaskGetCurrentTaskHandle();
if (running_task == client->task_handle) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Client cannot be stopped from MQTT task");
return ESP_FAIL;
}
// Only send the disconnect message if the client is connected
if (client->state == MQTT_STATE_CONNECTED) {
if (send_disconnect_msg(client) != ESP_OK) {
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
}
client->run = false;
client->state = MQTT_STATE_DISCONNECTED;
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
return ESP_OK;
} else {
ESP_LOGW(TAG, "Client asked to stop, but was not started");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
2018-02-16 02:40:16 +07:00
}
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
2018-02-16 02:40:16 +07:00
{
mqtt_msg_pingreq(&client->mqtt_state.connection);
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Ping message cannot be created");
return ESP_FAIL;
}
2018-02-16 02:40:16 +07:00
if (esp_mqtt_write(client) != ESP_OK) {
2018-02-16 02:40:16 +07:00
ESP_LOGE(TAG, "Error sending ping");
return ESP_FAIL;
2018-02-16 02:40:16 +07:00
}
ESP_LOGD(TAG, "Sent PING successful");
return ESP_OK;
2018-02-16 02:40:16 +07:00
}
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size)
2018-02-16 02:40:16 +07:00
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
if (client->config->outbox_limit > 0 && outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}
2018-02-16 02:40:16 +07:00
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
MQTT_API_LOCK(client);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
int max_qos = topic_list[0].qos;
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].qos > max_qos) {
max_qos = topic_list[topic_number].qos;
}
}
if (esp_mqtt5_client_subscribe_check(client, max_qos) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 subscribe check fail: QoS %d not accepted by broker ", max_qos);
2022-02-25 10:12:30 +08:00
MQTT_API_UNLOCK(client);
return -1;
}
mqtt5_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
2022-02-25 10:12:30 +08:00
client->mqtt5_config->subscribe_property_info = NULL;
}
#endif
} else {
mqtt_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id);
2022-02-25 10:12:30 +08:00
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Subscribe message cannot be created");
MQTT_API_UNLOCK(client);
return -1;
}
2018-02-16 02:40:16 +07:00
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
//move pending msg to outbox (if have)
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
2018-02-16 02:40:16 +07:00
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos);
MQTT_API_UNLOCK(client);
2018-02-16 02:40:16 +07:00
return -1;
}
ESP_LOGD(TAG, "Sent subscribe, first topic=%s, id: %d", topic_list[0].filter, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK(client);
2018-02-16 02:40:16 +07:00
return client->mqtt_state.pending_msg_id;
}
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos)
{
esp_mqtt_topic_t user_topic = {.filter = topic, .qos = qos};
return esp_mqtt_client_subscribe_multiple(client, &user_topic, 1);
2018-02-16 02:40:16 +07:00
}
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
2018-02-16 02:40:16 +07:00
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
MQTT_API_LOCK(client);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
2022-02-25 10:12:30 +08:00
client->mqtt5_config->unsubscribe_property_info = NULL;
}
#endif
} else {
mqtt_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id);
2022-02-25 10:12:30 +08:00
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Unubscribe message cannot be created");
return -1;
}
2018-02-16 02:40:16 +07:00
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error
2018-02-16 02:40:16 +07:00
if (esp_mqtt_write(client) != ESP_OK) {
2018-02-16 02:40:16 +07:00
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK(client);
2018-02-16 02:40:16 +07:00
return -1;
}
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK(client);
2018-02-16 02:40:16 +07:00
return client->mqtt_state.pending_msg_id;
}
static int make_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain)
2018-02-16 02:40:16 +07:00
{
2018-02-16 22:48:22 +07:00
uint16_t pending_msg_id = 0;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
if (client->mqtt_state.connection.outbound_message.length) {
2022-02-25 10:12:30 +08:00
client->mqtt5_config->publish_property_info = NULL;
}
#endif
} else {
mqtt_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id);
2022-02-25 10:12:30 +08:00
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish message cannot be created");
return -1;
}
return pending_msg_id;
}
static inline int mqtt_client_enqueue_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain, bool store)
{
int pending_msg_id = make_publish(client, topic, data, len, qos, retain);
if (pending_msg_id < 0) {
return -1;
}
/* We have to set as pending all the qos>0 messages */
//TODO: client->mqtt_state.outbound_message = publish_msg;
if (qos > 0 || store) {
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
2018-02-16 02:50:29 +07:00
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.connection.outbound_message.fragmented_msg_total_length == 0) {
if (!mqtt_enqueue(client, NULL, 0)) {
return -1;
}
} else {
int first_fragment = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
return -1;
}
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
}
2018-02-16 02:50:29 +07:00
}
return pending_msg_id;
}
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
#if MQTT_SKIP_PUBLISH_IF_DISCONNECTED
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGI(TAG, "Publishing skipped: client is not connected");
return -1;
}
#endif
MQTT_API_LOCK(client);
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 publish check fail");
MQTT_API_UNLOCK(client);
return -1;
}
}
#endif
/* Acceptable publish messages:
data == NULL, len == 0: publish null message
data valid, len == 0: publish all data, payload len is determined from string length
data valid, len > 0: publish data with defined length
*/
if (len <= 0 && data != NULL) {
len = strlen(data);
}
if (client->config->outbox_limit > 0 && qos > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
MQTT_API_UNLOCK(client);
return -2;
}
}
int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false);
if (pending_msg_id < 0) {
MQTT_API_UNLOCK(client);
return -1;
}
int ret = 0;
2018-02-16 02:50:29 +07:00
/* Skip sending if not connected (rely on resending) */
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGD(TAG, "Publish: client is not connected");
if (qos > 0) {
ret = pending_msg_id;
} else {
ret = -1;
}
goto cannot_publish;
}
/* Provide support for sending fragmented message if it doesn't fit buffer */
int remaining_len = len;
const char *current_data = data;
bool sending = true;
while (sending) {
if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_abort_connection(client);
ret = -1;
goto cannot_publish;
}
int data_sent = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset = 0;
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
remaining_len -= data_sent;
current_data += data_sent;
if (remaining_len > 0) {
mqtt_connection_t *connection = &client->mqtt_state.connection;
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
int write_len = remaining_len > connection->buffer_length ? connection->buffer_length : remaining_len;
memcpy(connection->buffer, current_data, write_len);
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = write_len;
sending = true;
} else {
// Message was sent correctly
sending = false;
}
2018-02-16 02:40:16 +07:00
}
if (qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
//Tick is set after transmit to avoid retransmitting too early due slow network speed / big messages
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
}
MQTT_API_UNLOCK(client);
2018-02-16 02:50:29 +07:00
return pending_msg_id;
cannot_publish:
// clear out possible fragmented publish if failed or skipped
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
if (qos == 0) {
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
}
MQTT_API_UNLOCK(client);
return ret;
2018-02-16 02:40:16 +07:00
}
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
/* Acceptable publish messages:
data == NULL, len == 0: publish null message
data valid, len == 0: publish all data, payload len is determined from string length
data valid, len > 0: publish data with defined length
*/
if (len <= 0 && data != NULL) {
len = strlen(data);
}
if (client->config->outbox_limit > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}
}
MQTT_API_LOCK(client);
2022-02-25 10:12:30 +08:00
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
2022-02-25 10:12:30 +08:00
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
ESP_LOGI(TAG, "esp_mqtt_client_enqueue check fail");
MQTT_API_UNLOCK(client);
return -1;
}
}
#endif
int ret = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, store);
MQTT_API_UNLOCK(client);
if (ret == 0 && store == false) {
// messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error
return -1;
}
return ret;
}
2018-02-16 02:40:16 +07:00
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void *event_handler_arg)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
return esp_event_handler_register_with(client->config->event_loop_handle, MQTT_EVENTS, event, event_handler, event_handler_arg);
#else
ESP_LOGE(TAG, "Registering event handler while event loop not available in IDF version %s", IDF_VER);
return ESP_FAIL;
#endif
}
esp_err_t esp_mqtt_client_unregister_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
return esp_event_handler_unregister_with(client->config->event_loop_handle, MQTT_EVENTS, event, event_handler);
#else
ESP_LOGE(TAG, "Unregistering event handler while event loop not available in IDF version %s", IDF_VER);
return ESP_FAIL;
#endif
}
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client)
{
client->event.event_id = MQTT_EVENT_ERROR;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_TCP_TRANSPORT;
client->event.error_handle->connect_return_code = 0;
#ifdef MQTT_PROTOCOL_5
client->event.error_handle->disconnect_return_code = 0;
#endif
#ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_ERR_REPORTING
client->event.error_handle->esp_tls_last_esp_err = esp_tls_get_and_clear_last_error(esp_transport_get_error_handle(client->transport),
&client->event.error_handle->esp_tls_stack_err,
&client->event.error_handle->esp_tls_cert_verify_flags);
#ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_SOCK_ERRNO_REPORTING
client->event.error_handle->esp_transport_sock_errno = esp_transport_get_errno(client->transport);
#endif
#endif
esp_mqtt_dispatch_event_with_msgid(client);
}
int esp_mqtt_client_get_outbox_size(esp_mqtt_client_handle_t client)
{
int outbox_size = 0;
if (client == NULL) {
return 0;
}
MQTT_API_LOCK(client);
if (client->outbox) {
outbox_size = outbox_get_size(client->outbox);
}
MQTT_API_UNLOCK(client);
return outbox_size;
}
esp_transport_handle_t esp_mqtt_client_get_transport(esp_mqtt_client_handle_t client, char *transport_scheme){
if (client == NULL || (transport_scheme == NULL && client->config->transport == NULL)) {
return NULL;
}
if (transport_scheme == NULL && client->config->transport != NULL) {
return client->config->transport;
}
return esp_transport_list_get_transport(client->transport_list, transport_scheme);
}