2022-09-08 15:03:04 +02:00
# include "mqtt_client.h"
2022-05-20 11:55:28 +08:00
# include "mqtt_client_priv.h"
2022-09-08 15:03:04 +02:00
# include "esp_log.h"
# include <stdint.h>
2022-11-03 15:26:43 +01:00
# include "esp_heap_caps.h"
2018-12-21 13:52:48 +01:00
2020-01-09 09:45:16 +01:00
_Static_assert ( sizeof ( uint64_t ) = = sizeof ( outbox_tick_t ) , " mqtt-client tick type size different from outbox tick type " ) ;
2020-02-20 12:37:16 +01:00
# ifdef ESP_EVENT_ANY_ID
_Static_assert ( MQTT_EVENT_ANY = = ESP_EVENT_ANY_ID , " mqtt-client event enum does not match the global EVENT_ANY_ID " ) ;
# endif
2020-01-09 09:45:16 +01:00
2022-02-08 15:22:48 +01:00
static const char * TAG = " mqtt_client " ;
2018-02-16 02:40:16 +07:00
2019-05-29 15:12:25 +02:00
# ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
2018-12-18 16:43:08 +01:00
/**
* @ brief Define of MQTT Event base
*
*/
ESP_EVENT_DEFINE_BASE ( MQTT_EVENTS ) ;
2019-05-29 15:12:25 +02:00
# endif
2018-12-18 16:43:08 +01:00
2022-05-23 22:19:24 +08:00
# define MQTT_OVER_TCP_SCHEME "mqtt"
# define MQTT_OVER_SSL_SCHEME "mqtts"
# define MQTT_OVER_WS_SCHEME "ws"
# define MQTT_OVER_WSS_SCHEME "wss"
2021-02-19 14:02:31 +00:00
const static int STOPPED_BIT = ( 1 < < 0 ) ;
const static int RECONNECT_BIT = ( 1 < < 1 ) ;
const static int DISCONNECT_BIT = ( 1 < < 2 ) ;
2018-02-16 02:40:16 +07:00
static esp_err_t esp_mqtt_dispatch_event ( esp_mqtt_client_handle_t client ) ;
2018-11-02 08:37:05 +01:00
static esp_err_t esp_mqtt_dispatch_event_with_msgid ( esp_mqtt_client_handle_t client ) ;
2018-02-16 02:40:16 +07:00
static esp_err_t esp_mqtt_connect ( esp_mqtt_client_handle_t client , int timeout_ms ) ;
2021-06-18 13:53:24 +01:00
static void esp_mqtt_abort_connection ( esp_mqtt_client_handle_t client ) ;
2018-03-16 12:25:34 +01:00
static esp_err_t esp_mqtt_client_ping ( esp_mqtt_client_handle_t client ) ;
2018-02-22 10:07:57 +07:00
static char * create_string ( const char * ptr , int len ) ;
2019-04-26 16:38:09 +02:00
static int mqtt_message_receive ( esp_mqtt_client_handle_t client , int read_poll_timeout_ms ) ;
2020-07-27 06:57:00 +02:00
static void esp_mqtt_client_dispatch_transport_error ( esp_mqtt_client_handle_t client ) ;
2021-10-02 10:34:45 +02:00
static esp_err_t send_disconnect_msg ( esp_mqtt_client_handle_t client ) ;
2018-02-16 02:40:16 +07:00
2021-12-15 15:11:15 +05:30
static int esp_mqtt_handle_transport_read_error ( int err , esp_mqtt_client_handle_t client )
{
if ( err = = ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN ) {
ESP_LOGD ( TAG , " %s: transport_read(): EOF " , __func__ ) ;
return 0 ;
}
if ( err = = ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT ) {
ESP_LOGD ( TAG , " %s: transport_read(): call timed out before data was ready! " , __func__ ) ;
return 0 ;
}
ESP_LOGE ( TAG , " %s: transport_read() error: errno=%d " , __func__ , errno ) ;
esp_mqtt_client_dispatch_transport_error ( client ) ;
return - 1 ;
}
2022-06-14 13:43:45 +05:30
# if MQTT_ENABLE_SSL
enum esp_mqtt_ssl_cert_key_api {
MQTT_SSL_DATA_API_CA_CERT ,
MQTT_SSL_DATA_API_CLIENT_CERT ,
MQTT_SSL_DATA_API_CLIENT_KEY ,
MQTT_SSL_DATA_API_MAX ,
} ;
2020-01-19 22:28:35 +01:00
static esp_err_t esp_mqtt_set_cert_key_data ( esp_transport_handle_t ssl , enum esp_mqtt_ssl_cert_key_api what , const char * cert_key_data , int cert_key_len )
2020-01-16 16:30:52 +01:00
{
char * data = ( char * ) cert_key_data ;
2020-01-19 22:28:35 +01:00
int ssl_transport_api_id = what ;
2020-01-16 16:30:52 +01:00
int len = cert_key_len ;
2020-01-19 22:28:35 +01:00
2020-01-16 16:30:52 +01:00
if ( ! data ) {
return ESP_OK ;
}
if ( len = = 0 ) {
2020-01-19 22:28:35 +01:00
// if length not specified, expect 0-terminated PEM string
// and the original transport_api_id (by convention after the last api_id in the enum)
ssl_transport_api_id + = MQTT_SSL_DATA_API_MAX ;
2020-01-16 16:30:52 +01:00
len = strlen ( data ) ;
}
# ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
else {
ESP_LOGE ( TAG , " Explicit cert-/key-len is not available in IDF version %s " , IDF_VER ) ;
2020-01-19 22:28:35 +01:00
return ESP_ERR_NOT_SUPPORTED ;
2020-01-16 16:30:52 +01:00
}
# endif
2020-01-19 22:28:35 +01:00
// option to force the cert/key config to null (i.e. skip validation) when existing config updates
2020-01-16 16:30:52 +01:00
if ( 0 = = strcmp ( data , " NULL " ) ) {
data = NULL ;
len = 0 ;
}
2020-01-19 22:28:35 +01:00
switch ( ssl_transport_api_id ) {
2020-01-16 16:30:52 +01:00
# ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
2020-12-31 12:24:25 +00:00
case MQTT_SSL_DATA_API_CA_CERT :
esp_transport_ssl_set_cert_data_der ( ssl , data , len ) ;
break ;
case MQTT_SSL_DATA_API_CLIENT_CERT :
esp_transport_ssl_set_client_cert_data_der ( ssl , data , len ) ;
break ;
case MQTT_SSL_DATA_API_CLIENT_KEY :
esp_transport_ssl_set_client_key_data_der ( ssl , data , len ) ;
break ;
2020-01-16 16:30:52 +01:00
# endif
2020-12-31 12:24:25 +00:00
case MQTT_SSL_DATA_API_CA_CERT + MQTT_SSL_DATA_API_MAX :
esp_transport_ssl_set_cert_data ( ssl , data , len ) ;
break ;
case MQTT_SSL_DATA_API_CLIENT_CERT + MQTT_SSL_DATA_API_MAX :
esp_transport_ssl_set_client_cert_data ( ssl , data , len ) ;
break ;
case MQTT_SSL_DATA_API_CLIENT_KEY + MQTT_SSL_DATA_API_MAX :
esp_transport_ssl_set_client_key_data ( ssl , data , len ) ;
break ;
default :
return ESP_ERR_INVALID_ARG ;
2020-01-19 22:28:35 +01:00
}
return ESP_OK ;
}
static esp_err_t esp_mqtt_set_ssl_transport_properties ( esp_transport_list_handle_t transport_list , mqtt_config_storage_t * cfg )
{
2022-05-23 22:19:24 +08:00
esp_transport_handle_t ssl = esp_transport_list_get_transport ( transport_list , MQTT_OVER_SSL_SCHEME ) ;
2020-01-19 22:28:35 +01:00
if ( cfg - > use_global_ca_store = = true ) {
esp_transport_ssl_enable_global_ca_store ( ssl ) ;
2022-06-13 17:34:00 +02:00
} else if ( cfg - > skip_server_verification = = true ) {
esp_transport_ssl_skip_server_verification ( ssl ) ;
2021-05-17 22:48:43 +02:00
} else if ( cfg - > crt_bundle_attach ! = NULL ) {
2021-09-07 15:38:20 +02:00
# ifdef MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
# ifdef CONFIG_MBEDTLS_CERTIFICATE_BUNDLE
2021-06-09 13:54:33 +01:00
esp_transport_ssl_crt_bundle_attach ( ssl , cfg - > crt_bundle_attach ) ;
2021-09-07 15:38:20 +02:00
# else
ESP_LOGE ( TAG , " Certificate bundle is not enabled for mbedTLS in menuconfig " ) ;
goto esp_mqtt_set_transport_failed ;
# endif /* CONFIG_MBEDTLS_CERTIFICATE_BUNDLE */
# else
2021-06-09 13:54:33 +01:00
ESP_LOGE ( TAG , " Certificate bundle feature is not available in IDF version %s " , IDF_VER ) ;
goto esp_mqtt_set_transport_failed ;
2021-09-07 15:38:20 +02:00
# endif /* MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE */
2020-01-19 22:28:35 +01:00
} else {
ESP_OK_CHECK ( TAG , esp_mqtt_set_cert_key_data ( ssl , MQTT_SSL_DATA_API_CA_CERT , cfg - > cacert_buf , cfg - > cacert_bytes ) ,
goto esp_mqtt_set_transport_failed ) ;
2021-06-09 13:54:33 +01:00
2020-01-19 22:28:35 +01:00
}
2022-05-03 14:13:23 -03:00
if ( cfg - > psk_hint_key ) {
# if defined(MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION) && MQTT_ENABLE_SSL
# ifdef CONFIG_ESP_TLS_PSK_VERIFICATION
esp_transport_ssl_set_psk_key_hint ( ssl , cfg - > psk_hint_key ) ;
# else
ESP_LOGE ( TAG , " PSK authentication configured but not enabled in menuconfig: Please enable ESP_TLS_PSK_VERIFICATION option " ) ;
goto esp_mqtt_set_transport_failed ;
# endif
# else
ESP_LOGE ( TAG , " PSK authentication is not available in IDF version %s " , IDF_VER ) ;
goto esp_mqtt_set_transport_failed ;
# endif
}
if ( cfg - > alpn_protos ) {
# if defined(MQTT_SUPPORTED_FEATURE_ALPN) && MQTT_ENABLE_SSL
# if defined(CONFIG_MBEDTLS_SSL_ALPN) || defined(CONFIG_WOLFSSL_HAVE_ALPN)
esp_transport_ssl_set_alpn_protocol ( ssl , ( const char * * ) cfg - > alpn_protos ) ;
# else
ESP_LOGE ( TAG , " APLN configured but not enabled in menuconfig: Please enable MBEDTLS_SSL_ALPN or WOLFSSL_HAVE_ALPN option " ) ;
goto esp_mqtt_set_transport_failed ;
# endif
# else
ESP_LOGE ( TAG , " APLN is not available in IDF version %s " , IDF_VER ) ;
goto esp_mqtt_set_transport_failed ;
# endif
}
if ( cfg - > skip_cert_common_name_check ) {
# if defined(MQTT_SUPPORTED_FEATURE_SKIP_CRT_CMN_NAME_CHECK) && MQTT_ENABLE_SSL
esp_transport_ssl_skip_common_name_check ( ssl ) ;
# else
ESP_LOGE ( TAG , " Skip certificate common name check is not available in IDF version %s " , IDF_VER ) ;
goto esp_mqtt_set_transport_failed ;
# endif
}
2020-01-19 22:28:35 +01:00
2020-06-18 09:17:45 +05:30
if ( cfg - > use_secure_element ) {
2020-07-17 21:45:28 +05:30
# ifdef MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT
# ifdef CONFIG_ESP_TLS_USE_SECURE_ELEMENT
2020-06-18 09:17:45 +05:30
esp_transport_ssl_use_secure_element ( ssl ) ;
# else
2020-07-17 21:45:28 +05:30
ESP_LOGE ( TAG , " Secure element not enabled for esp-tls in menuconfig " ) ;
goto esp_mqtt_set_transport_failed ;
# endif /* CONFIG_ESP_TLS_USE_SECURE_ELEMENT */
# else
ESP_LOGE ( TAG , " Secure element feature is not available in IDF version %s " , IDF_VER ) ;
goto esp_mqtt_set_transport_failed ;
# endif /* MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT */
2020-06-18 09:17:45 +05:30
}
2020-07-19 17:36:19 +05:30
2020-12-31 12:24:25 +00:00
if ( cfg - > ds_data ! = NULL ) {
2020-07-19 17:36:19 +05:30
# ifdef MQTT_SUPPORTED_FEATURE_DIGITAL_SIGNATURE
# ifdef CONFIG_ESP_TLS_USE_DS_PERIPHERAL
esp_transport_ssl_set_ds_data ( ssl , cfg - > ds_data ) ;
# else
ESP_LOGE ( TAG , " Digital Signature not enabled for esp-tls in menuconfig " ) ;
goto esp_mqtt_set_transport_failed ;
# endif /* CONFIG_ESP_TLS_USE_DS_PERIPHERAL */
# else
ESP_LOGE ( TAG , " Digital Signature feature is not available in IDF version %s " , IDF_VER ) ;
goto esp_mqtt_set_transport_failed ;
# endif
}
2020-01-19 22:28:35 +01:00
ESP_OK_CHECK ( TAG , esp_mqtt_set_cert_key_data ( ssl , MQTT_SSL_DATA_API_CLIENT_CERT , cfg - > clientcert_buf , cfg - > clientcert_bytes ) ,
goto esp_mqtt_set_transport_failed ) ;
ESP_OK_CHECK ( TAG , esp_mqtt_set_cert_key_data ( ssl , MQTT_SSL_DATA_API_CLIENT_KEY , cfg - > clientkey_buf , cfg - > clientkey_bytes ) ,
goto esp_mqtt_set_transport_failed ) ;
if ( cfg - > clientkey_password & & cfg - > clientkey_password_len ) {
# if defined(MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD) && MQTT_ENABLE_SSL
esp_transport_ssl_set_client_key_password ( ssl ,
2020-12-31 12:24:25 +00:00
cfg - > clientkey_password ,
cfg - > clientkey_password_len ) ;
2020-01-19 22:28:35 +01:00
# else
ESP_LOGE ( TAG , " Password protected keys are not available in IDF version %s " , IDF_VER ) ;
goto esp_mqtt_set_transport_failed ;
# endif
}
2020-06-09 11:59:07 +08:00
2020-01-16 16:30:52 +01:00
return ESP_OK ;
2020-01-19 22:28:35 +01:00
2020-12-31 12:24:25 +00:00
esp_mqtt_set_transport_failed :
2020-01-19 22:28:35 +01:00
return ESP_FAIL ;
2020-01-16 16:30:52 +01:00
}
2020-01-19 22:28:35 +01:00
# endif // MQTT_ENABLE_SSL
2020-01-16 16:30:52 +01:00
2020-07-08 11:36:38 +08:00
/* Checks if the user supplied config values are internally consistent */
static esp_err_t esp_mqtt_check_cfg_conflict ( const mqtt_config_storage_t * cfg , const esp_mqtt_client_config_t * user_cfg )
{
2022-10-17 12:21:52 +02:00
if ( cfg = = NULL | | user_cfg = = NULL ) {
ESP_LOGE ( TAG , " Invalid configuration " ) ;
return ESP_ERR_INVALID_ARG ;
}
2020-07-08 11:36:38 +08:00
esp_err_t ret = ESP_OK ;
bool ssl_cfg_enabled = cfg - > use_global_ca_store | | cfg - > cacert_buf | | cfg - > clientcert_buf | | cfg - > psk_hint_key | | cfg - > alpn_protos ;
bool is_ssl_scheme = false ;
if ( cfg - > scheme ) {
2022-05-23 22:19:24 +08:00
is_ssl_scheme = ( strcasecmp ( cfg - > scheme , MQTT_OVER_SSL_SCHEME ) = = 0 ) | | ( strcasecmp ( cfg - > scheme , MQTT_OVER_WSS_SCHEME ) = = 0 ) ;
2020-07-08 11:36:38 +08:00
}
if ( ! is_ssl_scheme & & ssl_cfg_enabled ) {
if ( cfg - > uri ) {
ESP_LOGW ( TAG , " SSL related configs set, but the URI scheme specifies a non-SSL scheme, scheme = %s " , cfg - > scheme ) ;
} else {
2022-05-03 14:13:23 -03:00
ESP_LOGW ( TAG , " SSL related configs set, but the transport protocol is a non-SSL scheme, transport = %d " , user_cfg - > broker . address . transport ) ;
2020-07-08 11:36:38 +08:00
}
ret = ESP_ERR_INVALID_ARG ;
}
2022-05-03 14:13:23 -03:00
if ( cfg - > uri & & user_cfg - > broker . address . transport ) {
ESP_LOGW ( TAG , " Transport config set, but overridden by scheme from URI: transport = %d, uri scheme = %s " , user_cfg - > broker . address . transport , cfg - > scheme ) ;
2020-07-08 11:36:38 +08:00
ret = ESP_ERR_INVALID_ARG ;
}
return ret ;
}
2022-05-20 11:55:28 +08:00
bool esp_mqtt_set_if_config ( char const * const new_config , char * * old_config )
2020-12-31 12:24:25 +00:00
{
if ( new_config ) {
free ( * old_config ) ;
* old_config = strdup ( new_config ) ;
2021-02-19 14:02:31 +00:00
if ( * old_config = = NULL ) {
2020-12-31 12:24:25 +00:00
return false ;
}
}
return true ;
}
2022-05-23 22:19:24 +08:00
static esp_err_t esp_mqtt_client_create_transport ( esp_mqtt_client_handle_t client )
{
esp_err_t ret = ESP_OK ;
if ( client - > transport_list ) {
esp_transport_list_destroy ( client - > transport_list ) ;
client - > transport_list = NULL ;
}
if ( client - > config - > scheme ) {
client - > transport_list = esp_transport_list_init ( ) ;
ESP_MEM_CHECK ( TAG , client - > transport_list , return ESP_ERR_NO_MEM ) ;
if ( ( strcasecmp ( client - > config - > scheme , MQTT_OVER_TCP_SCHEME ) = = 0 ) | | ( strcasecmp ( client - > config - > scheme , MQTT_OVER_WS_SCHEME ) = = 0 ) ) {
esp_transport_handle_t tcp = esp_transport_tcp_init ( ) ;
ESP_MEM_CHECK ( TAG , tcp , return ESP_ERR_NO_MEM ) ;
esp_transport_set_default_port ( tcp , MQTT_TCP_DEFAULT_PORT ) ;
esp_transport_list_add ( client - > transport_list , tcp , MQTT_OVER_TCP_SCHEME ) ;
if ( strcasecmp ( client - > config - > scheme , MQTT_OVER_WS_SCHEME ) = = 0 ) {
# if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init ( tcp ) ;
ESP_MEM_CHECK ( TAG , ws , return ESP_ERR_NO_MEM ) ;
esp_transport_set_default_port ( ws , MQTT_WS_DEFAULT_PORT ) ;
if ( client - > config - > path ) {
esp_transport_ws_set_path ( ws , client - > config - > path ) ;
}
# ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol ( ws , MQTT_OVER_TCP_SCHEME ) ;
# endif
esp_transport_list_add ( client - > transport_list , ws , MQTT_OVER_WS_SCHEME ) ;
# else
ESP_LOGE ( TAG , " Please enable MQTT_ENABLE_WS to use %s " , client - > config - > scheme ) ;
ret = ESP_FAIL ;
# endif
}
} else if ( ( strcasecmp ( client - > config - > scheme , MQTT_OVER_SSL_SCHEME ) = = 0 ) | | ( strcasecmp ( client - > config - > scheme , MQTT_OVER_WSS_SCHEME ) = = 0 ) ) {
# if MQTT_ENABLE_SSL
esp_transport_handle_t ssl = esp_transport_ssl_init ( ) ;
ESP_MEM_CHECK ( TAG , ssl , return ESP_ERR_NO_MEM ) ;
esp_transport_set_default_port ( ssl , MQTT_SSL_DEFAULT_PORT ) ;
esp_transport_list_add ( client - > transport_list , ssl , MQTT_OVER_SSL_SCHEME ) ;
if ( strcasecmp ( client - > config - > scheme , MQTT_OVER_WSS_SCHEME ) = = 0 ) {
# if MQTT_ENABLE_WS
esp_transport_handle_t wss = esp_transport_ws_init ( ssl ) ;
ESP_MEM_CHECK ( TAG , wss , return ESP_ERR_NO_MEM ) ;
esp_transport_set_default_port ( wss , MQTT_WSS_DEFAULT_PORT ) ;
if ( client - > config - > path ) {
esp_transport_ws_set_path ( wss , client - > config - > path ) ;
}
# ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol ( wss , MQTT_OVER_TCP_SCHEME ) ;
# endif
esp_transport_list_add ( client - > transport_list , wss , MQTT_OVER_WSS_SCHEME ) ;
# else
ESP_LOGE ( TAG , " Please enable MQTT_ENABLE_WS to use %s " , client - > config - > scheme ) ;
ret = ESP_FAIL ;
# endif
}
# else
ESP_LOGE ( TAG , " Please enable MQTT_ENABLE_SSL to use %s " , client - > config - > scheme ) ;
ret = ESP_FAIL ;
# endif
} else {
ESP_LOGE ( TAG , " Not support this mqtt scheme %s " , client - > config - > scheme ) ;
ret = ESP_FAIL ;
}
} else {
ESP_LOGE ( TAG , " No scheme found " ) ;
ret = ESP_FAIL ;
}
return ret ;
}
2018-11-09 23:26:35 +08:00
esp_err_t esp_mqtt_set_config ( esp_mqtt_client_handle_t client , const esp_mqtt_client_config_t * config )
2018-02-16 02:40:16 +07:00
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return ESP_ERR_INVALID_ARG ;
}
2020-01-17 14:06:56 +01:00
MQTT_API_LOCK ( client ) ;
2018-02-16 02:40:16 +07:00
//Copy user configurations to client context
2018-05-03 21:50:24 +07:00
esp_err_t err = ESP_OK ;
2020-12-31 12:24:25 +00:00
if ( ! client - > config ) {
client - > config = calloc ( 1 , sizeof ( mqtt_config_storage_t ) ) ;
ESP_MEM_CHECK ( TAG , client - > config , {
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-12-21 13:52:48 +01:00
return ESP_ERR_NO_MEM ;
} ) ;
2018-11-09 23:26:35 +08:00
}
2018-02-16 02:40:16 +07:00
2022-05-03 14:13:23 -03:00
client - > config - > message_retransmit_timeout = config - > session . message_retransmit_timeout ;
if ( config - > session . message_retransmit_timeout < = 0 ) {
2021-10-27 14:40:49 -03:00
client - > config - > message_retransmit_timeout = 1000 ;
2021-08-30 15:59:59 +05:00
}
2022-05-03 14:13:23 -03:00
client - > config - > task_prio = config - > task . priority ;
2020-12-31 12:24:25 +00:00
if ( client - > config - > task_prio < = 0 ) {
client - > config - > task_prio = MQTT_TASK_PRIORITY ;
2018-11-09 23:26:35 +08:00
}
2022-05-03 14:13:23 -03:00
client - > config - > task_stack = config - > task . stack_size ;
2020-12-31 12:24:25 +00:00
if ( client - > config - > task_stack < = 0 ) {
client - > config - > task_stack = MQTT_TASK_STACK ;
2018-02-16 02:40:16 +07:00
}
2022-05-03 14:13:23 -03:00
if ( config - > broker . address . port ) {
client - > config - > port = config - > broker . address . port ;
2018-02-16 02:40:16 +07:00
}
2020-12-31 12:24:25 +00:00
err = ESP_ERR_NO_MEM ;
2022-05-03 14:13:23 -03:00
ESP_MEM_CHECK ( TAG , esp_mqtt_set_if_config ( config - > broker . address . hostname , & client - > config - > host ) , goto _mqtt_set_config_failed ) ;
ESP_MEM_CHECK ( TAG , esp_mqtt_set_if_config ( config - > broker . address . path , & client - > config - > path ) , goto _mqtt_set_config_failed ) ;
ESP_MEM_CHECK ( TAG , esp_mqtt_set_if_config ( config - > credentials . username , & client - > connect_info . username ) , goto _mqtt_set_config_failed ) ;
ESP_MEM_CHECK ( TAG , esp_mqtt_set_if_config ( config - > credentials . authentication . password , & client - > connect_info . password ) , goto _mqtt_set_config_failed ) ;
if ( ! config - > credentials . set_null_client_id ) {
if ( config - > credentials . client_id ) {
ESP_MEM_CHECK ( TAG , esp_mqtt_set_if_config ( config - > credentials . client_id , & client - > connect_info . client_id ) , goto _mqtt_set_config_failed ) ;
2021-10-14 11:00:45 -03:00
} else if ( client - > connect_info . client_id = = NULL ) {
client - > connect_info . client_id = platform_create_id_string ( ) ;
}
ESP_MEM_CHECK ( TAG , client - > connect_info . client_id , goto _mqtt_set_config_failed ) ;
ESP_LOGD ( TAG , " MQTT client_id=%s " , client - > connect_info . client_id ) ;
2018-02-16 02:40:16 +07:00
}
2022-05-03 14:13:23 -03:00
ESP_MEM_CHECK ( TAG , esp_mqtt_set_if_config ( config - > broker . address . uri , & client - > config - > uri ) , goto _mqtt_set_config_failed ) ;
ESP_MEM_CHECK ( TAG , esp_mqtt_set_if_config ( config - > session . last_will . topic , & client - > connect_info . will_topic ) , goto _mqtt_set_config_failed ) ;
2018-02-16 02:40:16 +07:00
2022-05-03 14:13:23 -03:00
if ( config - > session . last_will . msg_len & & config - > session . last_will . msg ) {
2018-11-09 23:26:35 +08:00
free ( client - > connect_info . will_message ) ;
2022-05-03 14:13:23 -03:00
client - > connect_info . will_message = malloc ( config - > session . last_will . msg_len ) ;
2018-05-03 21:50:24 +07:00
ESP_MEM_CHECK ( TAG , client - > connect_info . will_message , goto _mqtt_set_config_failed ) ;
2022-05-03 14:13:23 -03:00
memcpy ( client - > connect_info . will_message , config - > session . last_will . msg , config - > session . last_will . msg_len ) ;
client - > connect_info . will_length = config - > session . last_will . msg_len ;
} else if ( config - > session . last_will . msg ) {
2018-11-09 23:26:35 +08:00
free ( client - > connect_info . will_message ) ;
2022-05-03 14:13:23 -03:00
client - > connect_info . will_message = strdup ( config - > session . last_will . msg ) ;
2018-05-03 21:50:24 +07:00
ESP_MEM_CHECK ( TAG , client - > connect_info . will_message , goto _mqtt_set_config_failed ) ;
2022-05-03 14:13:23 -03:00
client - > connect_info . will_length = strlen ( config - > session . last_will . msg ) ;
2018-02-16 02:40:16 +07:00
}
2022-05-03 14:13:23 -03:00
if ( config - > session . last_will . qos ) {
client - > connect_info . will_qos = config - > session . last_will . qos ;
2018-11-09 23:26:35 +08:00
}
2022-05-03 14:13:23 -03:00
if ( config - > session . last_will . retain ) {
client - > connect_info . will_retain = config - > session . last_will . retain ;
2018-11-09 23:26:35 +08:00
}
2018-02-16 02:40:16 +07:00
2022-05-03 14:13:23 -03:00
if ( config - > session . disable_clean_session = = client - > connect_info . clean_session ) {
client - > connect_info . clean_session = ! config - > session . disable_clean_session ;
if ( ! client - > connect_info . clean_session & & config - > credentials . set_null_client_id ) {
2021-10-14 11:00:45 -03:00
ESP_LOGE ( TAG , " Clean Session flag must be true if client has a null id " ) ;
}
2018-11-09 23:26:35 +08:00
}
2022-05-03 14:13:23 -03:00
if ( config - > session . keepalive ) {
client - > connect_info . keepalive = config - > session . keepalive ;
2018-02-16 18:46:13 +07:00
}
2018-02-16 02:40:16 +07:00
if ( client - > connect_info . keepalive = = 0 ) {
client - > connect_info . keepalive = MQTT_KEEPALIVE_TICK ;
}
2022-05-03 14:13:23 -03:00
if ( config - > session . disable_keepalive ) {
2020-11-18 14:41:21 +01:00
// internal `keepalive` value (in connect_info) is in line with 3.1.2.10 Keep Alive from mqtt spec:
// * keepalive=0: Keep alive mechanism disabled (server not to disconnect the client on its inactivity)
// * period in seconds to send a Control packet if inactive
client - > connect_info . keepalive = 0 ;
}
2020-01-09 11:50:40 +08:00
2022-05-03 14:13:23 -03:00
if ( config - > session . protocol_ver ) {
client - > connect_info . protocol_ver = config - > session . protocol_ver ;
2020-01-09 11:50:40 +08:00
}
2020-12-31 12:24:25 +00:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_UNDEFINED ) {
2020-01-09 11:50:40 +08:00
# ifdef MQTT_PROTOCOL_311
client - > connect_info . protocol_ver = MQTT_PROTOCOL_V_3_1_1 ;
# else
client - > connect_info . protocol_ver = MQTT_PROTOCOL_V_3_1 ;
2022-02-25 10:12:30 +08:00
# endif
} else if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifndef MQTT_PROTOCOL_5
ESP_LOGE ( TAG , " Please first enable MQTT_PROTOCOL_5 feature in menuconfig " ) ;
goto _mqtt_set_config_failed ;
2020-01-09 11:50:40 +08:00
# endif
}
2022-05-03 14:13:23 -03:00
client - > config - > network_timeout_ms = config - > network . timeout_ms ;
2020-12-31 12:24:25 +00:00
if ( client - > config - > network_timeout_ms < = 0 ) {
client - > config - > network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS ;
2020-08-27 16:34:22 +03:00
}
2022-05-03 14:13:23 -03:00
if ( config - > network . refresh_connection_after_ms ) {
client - > config - > refresh_connection_after_ms = config - > network . refresh_connection_after_ms ;
2018-11-09 23:26:35 +08:00
}
2020-12-31 12:24:25 +00:00
client - > config - > auto_reconnect = true ;
2022-05-03 14:13:23 -03:00
if ( config - > network . disable_auto_reconnect = = client - > config - > auto_reconnect ) {
client - > config - > auto_reconnect = ! config - > network . disable_auto_reconnect ;
2018-11-09 23:26:35 +08:00
}
2019-11-18 14:43:44 +08:00
2022-05-03 14:13:23 -03:00
if ( config - > network . reconnect_timeout_ms ) {
client - > config - > reconnect_timeout_ms = config - > network . reconnect_timeout_ms ;
2019-11-18 14:43:44 +08:00
} else {
2020-12-31 12:24:25 +00:00
client - > config - > reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS ;
2019-11-18 14:43:44 +08:00
}
2022-05-03 14:13:23 -03:00
if ( config - > broker . verification . alpn_protos ) {
2020-12-31 12:24:25 +00:00
for ( int i = 0 ; i < client - > config - > num_alpn_protos ; i + + ) {
free ( client - > config - > alpn_protos [ i ] ) ;
2019-11-13 11:02:57 +08:00
}
2020-12-31 12:24:25 +00:00
free ( client - > config - > alpn_protos ) ;
client - > config - > num_alpn_protos = 0 ;
2019-11-13 11:02:57 +08:00
const char * * p ;
2022-05-03 14:13:23 -03:00
for ( p = config - > broker . verification . alpn_protos ; * p ! = NULL ; p + + ) {
2020-12-31 12:24:25 +00:00
client - > config - > num_alpn_protos + + ;
2019-11-13 11:02:57 +08:00
}
// mbedTLS expects the list to be null-terminated
2022-05-03 14:13:23 -03:00
client - > config - > alpn_protos = calloc ( client - > config - > num_alpn_protos + 1 , sizeof ( config - > broker . verification . alpn_protos ) ) ;
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client - > config - > alpn_protos , goto _mqtt_set_config_failed ) ;
2019-11-13 11:02:57 +08:00
2020-12-31 12:24:25 +00:00
for ( int i = 0 ; i < client - > config - > num_alpn_protos ; i + + ) {
2022-05-03 14:13:23 -03:00
client - > config - > alpn_protos [ i ] = strdup ( config - > broker . verification . alpn_protos [ i ] ) ;
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client - > config - > alpn_protos [ i ] , goto _mqtt_set_config_failed ) ;
2019-11-13 11:02:57 +08:00
}
}
2020-01-19 22:28:35 +01:00
// configure ssl related parameters
2022-05-03 14:13:23 -03:00
client - > config - > use_global_ca_store = config - > broker . verification . use_global_ca_store ;
client - > config - > cacert_buf = config - > broker . verification . certificate ;
client - > config - > cacert_bytes = config - > broker . verification . certificate_len ;
client - > config - > psk_hint_key = config - > broker . verification . psk_hint_key ;
client - > config - > crt_bundle_attach = config - > broker . verification . crt_bundle_attach ;
client - > config - > clientcert_buf = config - > credentials . authentication . certificate ;
client - > config - > clientcert_bytes = config - > credentials . authentication . certificate_len ;
client - > config - > clientkey_buf = config - > credentials . authentication . key ;
client - > config - > clientkey_bytes = config - > credentials . authentication . key_len ;
client - > config - > skip_cert_common_name_check = config - > broker . verification . skip_cert_common_name_check ;
2022-06-13 17:34:00 +02:00
client - > config - > skip_server_verification = config - > broker . verification . skip_server_verification ;
2022-05-03 14:13:23 -03:00
client - > config - > use_secure_element = config - > credentials . authentication . use_secure_element ;
client - > config - > ds_data = config - > credentials . authentication . ds_data ;
if ( config - > credentials . authentication . key_password & & config - > credentials . authentication . key_password_len ) {
client - > config - > clientkey_password_len = config - > credentials . authentication . key_password_len ;
2020-12-31 12:24:25 +00:00
client - > config - > clientkey_password = malloc ( client - > config - > clientkey_password_len ) ;
2021-02-11 08:13:37 +01:00
ESP_MEM_CHECK ( TAG , client - > config - > clientkey_password , goto _mqtt_set_config_failed ) ;
2022-05-03 14:13:23 -03:00
memcpy ( client - > config - > clientkey_password , config - > credentials . authentication . key_password , client - > config - > clientkey_password_len ) ;
2020-01-16 16:30:52 +01:00
}
2022-05-03 14:13:23 -03:00
if ( config - > broker . address . transport ) {
2020-01-16 16:30:52 +01:00
free ( client - > config - > scheme ) ;
2022-10-17 12:21:52 +02:00
client - > config - > scheme = NULL ;
2022-05-03 14:13:23 -03:00
if ( config - > broker . address . transport = = MQTT_TRANSPORT_OVER_TCP ) {
2022-05-23 22:19:24 +08:00
client - > config - > scheme = create_string ( MQTT_OVER_TCP_SCHEME , strlen ( MQTT_OVER_TCP_SCHEME ) ) ;
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client - > config - > scheme , goto _mqtt_set_config_failed ) ;
2021-09-10 09:15:36 +02:00
}
# if MQTT_ENABLE_WS
2022-05-03 14:13:23 -03:00
else if ( config - > broker . address . transport = = MQTT_TRANSPORT_OVER_WS ) {
2022-05-23 22:19:24 +08:00
client - > config - > scheme = create_string ( MQTT_OVER_WS_SCHEME , strlen ( MQTT_OVER_WS_SCHEME ) ) ;
2021-09-10 09:15:36 +02:00
ESP_MEM_CHECK ( TAG , client - > config - > scheme , goto _mqtt_set_config_failed ) ;
}
# endif
# if MQTT_ENABLE_SSL
2022-05-03 14:13:23 -03:00
else if ( config - > broker . address . transport = = MQTT_TRANSPORT_OVER_SSL ) {
2022-05-23 22:19:24 +08:00
client - > config - > scheme = create_string ( MQTT_OVER_SSL_SCHEME , strlen ( MQTT_OVER_SSL_SCHEME ) ) ;
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client - > config - > scheme , goto _mqtt_set_config_failed ) ;
2021-09-10 09:15:36 +02:00
}
# endif
# if MQTT_ENABLE_WSS
2022-05-03 14:13:23 -03:00
else if ( config - > broker . address . transport = = MQTT_TRANSPORT_OVER_WSS ) {
2022-05-23 22:19:24 +08:00
client - > config - > scheme = create_string ( MQTT_OVER_WSS_SCHEME , strlen ( MQTT_OVER_WSS_SCHEME ) ) ;
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client - > config - > scheme , goto _mqtt_set_config_failed ) ;
2020-01-16 16:30:52 +01:00
}
2021-09-10 09:15:36 +02:00
# endif
2020-01-16 16:30:52 +01:00
}
// Set uri at the end of config to override separately configured uri elements
2022-05-03 14:13:23 -03:00
if ( config - > broker . address . uri ) {
2020-12-31 12:24:25 +00:00
if ( esp_mqtt_client_set_uri ( client , client - > config - > uri ) ! = ESP_OK ) {
2020-01-19 22:01:44 +01:00
err = ESP_FAIL ;
goto _mqtt_set_config_failed ;
2020-01-16 16:30:52 +01:00
}
}
2021-06-04 17:29:41 +01:00
esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict ( client - > config , config ) ;
2020-01-16 16:30:52 +01:00
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2021-06-04 17:29:41 +01:00
return config_has_conflict ;
2018-05-03 21:50:24 +07:00
_mqtt_set_config_failed :
esp_mqtt_destroy_config ( client ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-05-03 21:50:24 +07:00
return err ;
2018-02-16 02:40:16 +07:00
}
2022-05-20 11:55:28 +08:00
void esp_mqtt_destroy_config ( esp_mqtt_client_handle_t client )
2018-02-16 02:40:16 +07:00
{
2020-12-31 11:11:45 +00:00
if ( client - > config = = NULL ) {
return ;
}
free ( client - > config - > host ) ;
free ( client - > config - > uri ) ;
free ( client - > config - > path ) ;
free ( client - > config - > scheme ) ;
for ( int i = 0 ; i < client - > config - > num_alpn_protos ; i + + ) {
free ( client - > config - > alpn_protos [ i ] ) ;
}
free ( client - > config - > alpn_protos ) ;
free ( client - > config - > clientkey_password ) ;
2018-05-03 21:50:24 +07:00
free ( client - > connect_info . will_topic ) ;
free ( client - > connect_info . will_message ) ;
free ( client - > connect_info . client_id ) ;
free ( client - > connect_info . username ) ;
free ( client - > connect_info . password ) ;
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_destory ( client ) ;
# endif
2018-11-09 23:26:35 +08:00
memset ( & client - > connect_info , 0 , sizeof ( mqtt_connect_info_t ) ) ;
2019-06-18 10:19:44 +02:00
# ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if ( client - > config - > event_loop_handle ) {
esp_event_loop_delete ( client - > config - > event_loop_handle ) ;
}
# endif
2020-12-31 11:11:45 +00:00
memset ( client - > config , 0 , sizeof ( mqtt_config_storage_t ) ) ;
2018-02-16 02:40:16 +07:00
free ( client - > config ) ;
2020-01-19 22:01:44 +01:00
client - > config = NULL ;
2018-02-16 02:40:16 +07:00
}
2022-09-08 15:03:04 +02:00
static inline bool has_timed_out ( uint64_t last_tick , uint64_t timeout )
{
uint64_t next = last_tick + timeout ;
return ( int64_t ) ( next - platform_tick_get_ms ( ) ) < = 0 ;
2022-06-06 12:52:22 -03:00
}
2022-03-22 14:26:49 -03:00
static esp_err_t process_keepalive ( esp_mqtt_client_handle_t client )
{
if ( client - > connect_info . keepalive > 0 ) {
2022-06-06 12:52:22 -03:00
const uint64_t keepalive_ms = client - > connect_info . keepalive * 1000 ;
2022-03-22 14:26:49 -03:00
if ( client - > wait_for_ping_resp = = true ) {
2022-06-06 12:52:22 -03:00
if ( has_timed_out ( client - > keepalive_tick , keepalive_ms ) ) {
2022-03-22 14:26:49 -03:00
ESP_LOGE ( TAG , " No PING_RESP, disconnected " ) ;
esp_mqtt_abort_connection ( client ) ;
client - > wait_for_ping_resp = false ;
return ESP_FAIL ;
}
return ESP_OK ;
}
2022-09-08 15:03:04 +02:00
if ( has_timed_out ( client - > keepalive_tick , keepalive_ms / 2 ) ) {
2022-03-22 14:26:49 -03:00
if ( esp_mqtt_client_ping ( client ) = = ESP_FAIL ) {
ESP_LOGE ( TAG , " Can't send ping, disconnected " ) ;
esp_mqtt_abort_connection ( client ) ;
return ESP_FAIL ;
}
client - > wait_for_ping_resp = true ;
return ESP_OK ;
}
}
return ESP_OK ;
}
2021-05-23 11:50:20 +02:00
static inline esp_err_t esp_mqtt_write ( esp_mqtt_client_handle_t client )
{
int wlen = 0 , widx = 0 , len = client - > mqtt_state . outbound_message - > length ;
while ( len > 0 ) {
wlen = esp_transport_write ( client - > transport ,
( char * ) client - > mqtt_state . outbound_message - > data + widx ,
len ,
client - > config - > network_timeout_ms ) ;
if ( wlen < 0 ) {
ESP_LOGE ( TAG , " Writing failed: errno=%d " , errno ) ;
return ESP_FAIL ;
} else if ( wlen = = 0 ) {
ESP_LOGE ( TAG , " Writing didn't complete in specified timeout: errno=%d " , errno ) ;
return ESP_ERR_TIMEOUT ;
}
widx + = wlen ;
len - = wlen ;
}
return ESP_OK ;
}
2018-02-16 02:40:16 +07:00
static esp_err_t esp_mqtt_connect ( esp_mqtt_client_handle_t client , int timeout_ms )
{
2022-02-25 10:12:30 +08:00
int read_len , connect_rsp_code = 0 ;
2018-08-06 08:18:12 +02:00
client - > wait_for_ping_resp = false ;
2022-05-03 14:13:23 -03:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > mqtt_state . outbound_message = mqtt5_msg_connect ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
& client - > connect_info , & client - > mqtt5_config - > connect_property_info , & client - > mqtt5_config - > will_property_info ) ;
2022-05-03 14:13:23 -03:00
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_connect ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
& client - > connect_info ) ;
2022-05-03 14:13:23 -03:00
}
2019-11-19 16:05:47 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
ESP_LOGE ( TAG , " Connect message cannot be created " ) ;
return ESP_FAIL ;
}
2018-02-16 02:40:16 +07:00
client - > mqtt_state . pending_msg_type = mqtt_get_type ( client - > mqtt_state . outbound_message - > data ) ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > mqtt_state . pending_msg_id = mqtt5_get_id ( client - > mqtt_state . outbound_message - > data ,
2022-09-08 15:03:04 +02:00
client - > mqtt_state . outbound_message - > length ) ;
2022-02-25 10:12:30 +08:00
# endif
} else {
client - > mqtt_state . pending_msg_id = mqtt_get_id ( client - > mqtt_state . outbound_message - > data ,
2022-09-08 15:03:04 +02:00
client - > mqtt_state . outbound_message - > length ) ;
2022-02-25 10:12:30 +08:00
}
2020-04-03 17:59:05 +05:30
ESP_LOGD ( TAG , " Sending MQTT CONNECT message, type: %d, id: %04X " ,
2018-02-16 02:40:16 +07:00
client - > mqtt_state . pending_msg_type ,
client - > mqtt_state . pending_msg_id ) ;
2021-05-23 11:50:20 +02:00
if ( esp_mqtt_write ( client ) ! = ESP_OK ) {
2018-02-16 02:40:16 +07:00
return ESP_FAIL ;
}
2019-04-26 16:38:09 +02:00
client - > mqtt_state . in_buffer_read_len = 0 ;
client - > mqtt_state . message_length = 0 ;
/* wait configured network timeout for broker connection response */
2019-12-04 11:32:36 +01:00
uint64_t connack_recv_started = platform_tick_get_ms ( ) ;
do {
read_len = mqtt_message_receive ( client , client - > config - > network_timeout_ms ) ;
} while ( read_len = = 0 & & platform_tick_get_ms ( ) - connack_recv_started < client - > config - > network_timeout_ms ) ;
2019-04-26 16:38:09 +02:00
if ( read_len < = 0 ) {
ESP_LOGE ( TAG , " %s: mqtt_message_receive() returned %d " , __func__ , read_len ) ;
2019-05-06 13:45:33 +02:00
return ESP_FAIL ;
2018-02-16 02:40:16 +07:00
}
if ( mqtt_get_type ( client - > mqtt_state . in_buffer ) ! = MQTT_MSG_TYPE_CONNACK ) {
ESP_LOGE ( TAG , " Invalid MSG_TYPE response: %d, read_len: %d " , mqtt_get_type ( client - > mqtt_state . in_buffer ) , read_len ) ;
return ESP_FAIL ;
}
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
if ( esp_mqtt5_parse_connack ( client , & connect_rsp_code ) = = ESP_OK ) {
return ESP_OK ;
}
# endif
} else {
client - > mqtt_state . in_buffer_read_len = 0 ;
connect_rsp_code = mqtt_get_connect_return_code ( client - > mqtt_state . in_buffer ) ;
if ( connect_rsp_code = = MQTT_CONNECTION_ACCEPTED ) {
ESP_LOGD ( TAG , " Connected " ) ;
return ESP_OK ;
}
switch ( connect_rsp_code ) {
case MQTT_CONNECTION_REFUSE_PROTOCOL :
ESP_LOGW ( TAG , " Connection refused, bad protocol " ) ;
break ;
case MQTT_CONNECTION_REFUSE_SERVER_UNAVAILABLE :
ESP_LOGW ( TAG , " Connection refused, server unavailable " ) ;
break ;
case MQTT_CONNECTION_REFUSE_BAD_USERNAME :
ESP_LOGW ( TAG , " Connection refused, bad username or password " ) ;
break ;
case MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED :
ESP_LOGW ( TAG , " Connection refused, not authorized " ) ;
break ;
default :
ESP_LOGW ( TAG , " Connection refused, Unknow reason " ) ;
break ;
}
2018-02-16 02:40:16 +07:00
}
2019-03-01 18:20:53 +00:00
/* propagate event with connection refused error */
2019-09-30 14:19:43 +02:00
client - > event . event_id = MQTT_EVENT_ERROR ;
client - > event . error_handle - > error_type = MQTT_ERROR_TYPE_CONNECTION_REFUSED ;
client - > event . error_handle - > connect_return_code = connect_rsp_code ;
client - > event . error_handle - > esp_tls_stack_err = 0 ;
client - > event . error_handle - > esp_tls_last_esp_err = 0 ;
client - > event . error_handle - > esp_tls_cert_verify_flags = 0 ;
2019-03-01 18:20:53 +00:00
esp_mqtt_dispatch_event_with_msgid ( client ) ;
return ESP_FAIL ;
2018-02-16 02:40:16 +07:00
}
2021-06-18 13:53:24 +01:00
static void esp_mqtt_abort_connection ( esp_mqtt_client_handle_t client )
2018-02-16 02:40:16 +07:00
{
2020-03-30 19:58:43 +05:30
MQTT_API_LOCK ( client ) ;
2018-09-26 11:53:54 +02:00
esp_transport_close ( client - > transport ) ;
2019-11-18 14:43:44 +08:00
client - > wait_timeout_ms = client - > config - > reconnect_timeout_ms ;
2018-02-16 02:46:20 +07:00
client - > reconnect_tick = platform_tick_get_ms ( ) ;
2021-01-26 14:25:41 +00:00
client - > state = MQTT_STATE_WAIT_RECONNECT ;
2018-11-09 23:26:35 +08:00
ESP_LOGD ( TAG , " Reconnect after %d ms " , client - > wait_timeout_ms ) ;
2018-02-16 02:40:16 +07:00
client - > event . event_id = MQTT_EVENT_DISCONNECTED ;
2018-08-06 08:18:12 +02:00
client - > wait_for_ping_resp = false ;
2018-11-02 08:37:05 +01:00
esp_mqtt_dispatch_event_with_msgid ( client ) ;
2020-03-30 19:58:43 +05:30
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
}
2020-12-31 12:24:25 +00:00
static bool create_client_data ( esp_mqtt_client_handle_t client )
2018-02-16 02:40:16 +07:00
{
2019-09-30 14:19:43 +02:00
client - > event . error_handle = calloc ( 1 , sizeof ( esp_mqtt_error_codes_t ) ) ;
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client - > event . error_handle , return false )
2020-01-17 14:06:56 +01:00
client - > api_lock = xSemaphoreCreateRecursiveMutex ( ) ;
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client - > api_lock , return false ) ;
2020-01-16 16:30:52 +01:00
2020-12-31 12:24:25 +00:00
return true ;
}
esp_mqtt_client_handle_t esp_mqtt_client_init ( const esp_mqtt_client_config_t * config )
{
2022-07-24 21:10:38 +02:00
esp_mqtt_client_handle_t client = heap_caps_calloc ( 1 , sizeof ( struct esp_mqtt_client ) ,
# if MQTT_EVENT_QUEUE_SIZE > 1
2022-09-08 15:03:04 +02:00
// if supporting multiple queued events, we keep track of them
// using atomic variable, so need to make sure it won't get allocated in PSRAM
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT ) ;
2022-07-24 21:10:38 +02:00
# else
2022-09-08 15:03:04 +02:00
MALLOC_CAP_DEFAULT ) ;
2022-07-24 21:10:38 +02:00
# endif
2020-12-31 12:24:25 +00:00
ESP_MEM_CHECK ( TAG , client , return NULL ) ;
if ( ! create_client_data ( client ) ) {
goto _mqtt_init_failed ;
}
2020-01-16 16:30:52 +01:00
if ( esp_mqtt_set_config ( client , config ) ! = ESP_OK ) {
goto _mqtt_init_failed ;
2018-02-16 02:40:16 +07:00
}
2020-01-16 16:30:52 +01:00
# ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
2022-07-24 21:10:38 +02:00
. queue_size = MQTT_EVENT_QUEUE_SIZE ,
2020-12-31 12:24:25 +00:00
. task_name = NULL ,
2020-01-16 16:30:52 +01:00
} ;
esp_event_loop_create ( & no_task_loop , & client - > config - > event_loop_handle ) ;
2022-07-24 21:10:38 +02:00
# if MQTT_EVENT_QUEUE_SIZE > 1
atomic_init ( & client - > queued_events , 0 ) ;
# endif
2020-01-16 16:30:52 +01:00
# endif
2018-02-16 02:40:16 +07:00
2018-02-26 16:06:42 +08:00
client - > keepalive_tick = platform_tick_get_ms ( ) ;
client - > reconnect_tick = platform_tick_get_ms ( ) ;
2018-11-09 23:26:35 +08:00
client - > refresh_connection_tick = platform_tick_get_ms ( ) ;
2018-08-06 08:18:12 +02:00
client - > wait_for_ping_resp = false ;
2022-05-03 14:13:23 -03:00
int buffer_size = config - > buffer . size ;
2018-02-16 02:40:16 +07:00
if ( buffer_size < = 0 ) {
buffer_size = MQTT_BUFFER_SIZE_BYTE ;
}
2020-03-10 15:26:49 +01:00
// use separate value for output buffer size if configured
2022-05-03 14:13:23 -03:00
int out_buffer_size = config - > buffer . out_size > 0 ? config - > buffer . out_size : buffer_size ;
2018-02-16 02:40:16 +07:00
client - > mqtt_state . in_buffer = ( uint8_t * ) malloc ( buffer_size ) ;
2018-05-03 21:50:24 +07:00
ESP_MEM_CHECK ( TAG , client - > mqtt_state . in_buffer , goto _mqtt_init_failed ) ;
2018-02-16 02:40:16 +07:00
client - > mqtt_state . in_buffer_length = buffer_size ;
2020-03-10 15:26:49 +01:00
client - > mqtt_state . out_buffer = ( uint8_t * ) malloc ( out_buffer_size ) ;
2018-05-03 21:50:24 +07:00
ESP_MEM_CHECK ( TAG , client - > mqtt_state . out_buffer , goto _mqtt_init_failed ) ;
2020-03-10 15:26:49 +01:00
client - > mqtt_state . out_buffer_length = out_buffer_size ;
2018-02-16 02:40:16 +07:00
client - > outbox = outbox_init ( ) ;
2018-05-03 21:50:24 +07:00
ESP_MEM_CHECK ( TAG , client - > outbox , goto _mqtt_init_failed ) ;
2018-02-16 22:48:22 +07:00
client - > status_bits = xEventGroupCreate ( ) ;
2018-05-03 21:50:24 +07:00
ESP_MEM_CHECK ( TAG , client - > status_bits , goto _mqtt_init_failed ) ;
2019-11-19 16:05:47 +08:00
mqtt_msg_init ( & client - > mqtt_state . mqtt_connection , client - > mqtt_state . out_buffer ,
2020-12-31 12:24:25 +00:00
client - > mqtt_state . out_buffer_length ) ;
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
if ( esp_mqtt5_create_default_config ( client ) ! = ESP_OK ) {
goto _mqtt_init_failed ;
}
# endif
2018-02-16 02:40:16 +07:00
return client ;
2018-05-03 21:50:24 +07:00
_mqtt_init_failed :
esp_mqtt_client_destroy ( client ) ;
return NULL ;
2018-02-16 02:40:16 +07:00
}
esp_err_t esp_mqtt_client_destroy ( esp_mqtt_client_handle_t client )
{
2019-05-06 08:49:38 +02:00
if ( client = = NULL ) {
return ESP_ERR_INVALID_ARG ;
}
2020-12-31 12:24:25 +00:00
if ( client - > api_lock ) {
esp_mqtt_client_stop ( client ) ;
}
2018-02-16 02:40:16 +07:00
esp_mqtt_destroy_config ( client ) ;
2019-05-06 08:49:38 +02:00
if ( client - > transport_list ) {
esp_transport_list_destroy ( client - > transport_list ) ;
}
if ( client - > outbox ) {
outbox_destroy ( client - > outbox ) ;
}
if ( client - > status_bits ) {
vEventGroupDelete ( client - > status_bits ) ;
}
2018-02-16 02:40:16 +07:00
free ( client - > mqtt_state . in_buffer ) ;
free ( client - > mqtt_state . out_buffer ) ;
2020-12-31 12:24:25 +00:00
if ( client - > api_lock ) {
vSemaphoreDelete ( client - > api_lock ) ;
}
2019-09-30 14:19:43 +02:00
free ( client - > event . error_handle ) ;
2018-02-16 02:40:16 +07:00
free ( client ) ;
return ESP_OK ;
}
static char * create_string ( const char * ptr , int len )
{
char * ret ;
if ( len < = 0 ) {
return NULL ;
}
ret = calloc ( 1 , len + 1 ) ;
2018-05-03 21:50:24 +07:00
ESP_MEM_CHECK ( TAG , ret , return NULL ) ;
2018-02-16 02:40:16 +07:00
memcpy ( ret , ptr , len ) ;
return ret ;
}
esp_err_t esp_mqtt_client_set_uri ( esp_mqtt_client_handle_t client , const char * uri )
{
struct http_parser_url puri ;
http_parser_url_init ( & puri ) ;
int parser_status = http_parser_parse_url ( uri , strlen ( uri ) , 0 , & puri ) ;
if ( parser_status ! = 0 ) {
ESP_LOGE ( TAG , " Error parse uri = %s " , uri ) ;
return ESP_FAIL ;
}
2019-06-18 10:19:44 +02:00
// This API could be also executed when client is active (need to protect config fields)
2020-01-17 14:06:56 +01:00
MQTT_API_LOCK ( client ) ;
2019-02-02 16:46:31 +01:00
// set uri overrides actual scheme, host, path if configured previously
free ( client - > config - > scheme ) ;
free ( client - > config - > host ) ;
free ( client - > config - > path ) ;
2018-02-16 02:40:16 +07:00
2019-02-02 16:46:31 +01:00
client - > config - > scheme = create_string ( uri + puri . field_data [ UF_SCHEMA ] . off , puri . field_data [ UF_SCHEMA ] . len ) ;
client - > config - > host = create_string ( uri + puri . field_data [ UF_HOST ] . off , puri . field_data [ UF_HOST ] . len ) ;
2020-06-29 10:14:56 +08:00
client - > config - > path = NULL ;
if ( puri . field_data [ UF_PATH ] . len | | puri . field_data [ UF_QUERY ] . len ) {
if ( puri . field_data [ UF_QUERY ] . len = = 0 ) {
asprintf ( & client - > config - > path , " %.*s " , puri . field_data [ UF_PATH ] . len , uri + puri . field_data [ UF_PATH ] . off ) ;
} else if ( puri . field_data [ UF_PATH ] . len = = 0 ) {
asprintf ( & client - > config - > path , " /?%.*s " , puri . field_data [ UF_QUERY ] . len , uri + puri . field_data [ UF_QUERY ] . off ) ;
} else {
asprintf ( & client - > config - > path , " %.*s?%.*s " , puri . field_data [ UF_PATH ] . len , uri + puri . field_data [ UF_PATH ] . off ,
2020-12-31 12:24:25 +00:00
puri . field_data [ UF_QUERY ] . len , uri + puri . field_data [ UF_QUERY ] . off ) ;
2020-06-29 10:14:56 +08:00
}
ESP_MEM_CHECK ( TAG , client - > config - > path , {
MQTT_API_UNLOCK ( client ) ;
return ESP_ERR_NO_MEM ;
} ) ;
}
2018-02-16 02:40:16 +07:00
2018-05-03 21:50:24 +07:00
if ( puri . field_data [ UF_PORT ] . len ) {
2019-05-06 13:45:33 +02:00
client - > config - > port = strtol ( ( const char * ) ( uri + puri . field_data [ UF_PORT ] . off ) , NULL , 10 ) ;
2018-02-16 02:40:16 +07:00
}
char * user_info = create_string ( uri + puri . field_data [ UF_USERINFO ] . off , puri . field_data [ UF_USERINFO ] . len ) ;
if ( user_info ) {
char * pass = strchr ( user_info , ' : ' ) ;
if ( pass ) {
pass [ 0 ] = 0 ; //terminal username
pass + + ;
client - > connect_info . password = strdup ( pass ) ;
}
client - > connect_info . username = strdup ( user_info ) ;
free ( user_info ) ;
}
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
return ESP_OK ;
}
static esp_err_t mqtt_write_data ( esp_mqtt_client_handle_t client )
{
2021-05-23 11:50:20 +02:00
if ( esp_mqtt_write ( client ) ! = ESP_OK ) {
2020-07-27 06:57:00 +02:00
esp_mqtt_client_dispatch_transport_error ( client ) ;
2018-02-16 02:40:16 +07:00
return ESP_FAIL ;
}
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
esp_mqtt5_flow_control ( client ) ;
# endif
2018-02-16 02:40:16 +07:00
return ESP_OK ;
}
2018-11-02 08:37:05 +01:00
static esp_err_t esp_mqtt_dispatch_event_with_msgid ( esp_mqtt_client_handle_t client )
2018-02-16 02:40:16 +07:00
{
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > event . msg_id = mqtt5_get_id ( client - > mqtt_state . in_buffer , client - > mqtt_state . in_buffer_length ) ;
# endif
} else {
client - > event . msg_id = mqtt_get_id ( client - > mqtt_state . in_buffer , client - > mqtt_state . in_buffer_length ) ;
}
2018-11-02 08:37:05 +01:00
return esp_mqtt_dispatch_event ( client ) ;
}
2022-07-24 21:10:38 +02:00
esp_err_t esp_mqtt_dispatch_custom_event ( esp_mqtt_client_handle_t client , esp_mqtt_event_t * event )
{
esp_err_t ret = esp_event_post_to ( client - > config - > event_loop_handle , MQTT_EVENTS , MQTT_USER_EVENT , event , sizeof ( * event ) , 0 ) ;
# if MQTT_EVENT_QUEUE_SIZE > 1
if ( ret = = ESP_OK ) {
atomic_fetch_add ( & client - > queued_events , 1 ) ;
}
# endif
return ret ;
}
2018-11-02 08:37:05 +01:00
static esp_err_t esp_mqtt_dispatch_event ( esp_mqtt_client_handle_t client )
{
2018-02-16 02:40:16 +07:00
client - > event . client = client ;
2022-02-25 10:12:30 +08:00
client - > event . protocol_ver = client - > connect_info . protocol_ver ;
esp_err_t ret = ESP_FAIL ;
2018-02-16 02:40:16 +07:00
if ( client - > config - > event_handle ) {
2022-02-25 10:12:30 +08:00
ret = client - > config - > event_handle ( & client - > event ) ;
2018-12-18 16:43:08 +01:00
} else {
2019-05-29 15:12:25 +02:00
# ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
2018-12-18 16:43:08 +01:00
esp_event_post_to ( client - > config - > event_loop_handle , MQTT_EVENTS , client - > event . event_id , & client - > event , sizeof ( client - > event ) , portMAX_DELAY ) ;
2022-02-25 10:12:30 +08:00
ret = esp_event_loop_run ( client - > config - > event_loop_handle , 0 ) ;
2019-05-29 15:12:25 +02:00
# else
return ESP_FAIL ;
# endif
2018-02-16 02:40:16 +07:00
}
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_delete_user_property ( client - > event . property - > user_property ) ;
client - > event . property - > user_property = NULL ;
# endif
}
return ret ;
2018-05-03 21:50:24 +07:00
}
2018-02-16 02:40:16 +07:00
2019-04-26 16:38:09 +02:00
static esp_err_t deliver_publish ( esp_mqtt_client_handle_t client )
2018-02-16 02:40:16 +07:00
{
2019-04-26 16:38:09 +02:00
uint8_t * msg_buf = client - > mqtt_state . in_buffer ;
size_t msg_read_len = client - > mqtt_state . in_buffer_read_len ;
size_t msg_total_len = client - > mqtt_state . message_length ;
size_t msg_topic_len = msg_read_len , msg_data_len = msg_read_len ;
size_t msg_data_offset = 0 ;
2019-05-06 09:37:50 +02:00
char * msg_topic = NULL , * msg_data = NULL ;
2019-04-26 16:38:09 +02:00
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
if ( esp_mqtt5_get_publish_data ( client , msg_buf , msg_read_len , & msg_topic , & msg_topic_len , & msg_data , & msg_data_len ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " %s: esp_mqtt5_get_publish_data() failed " , __func__ ) ;
return ESP_FAIL ;
}
# endif
} else {
// get topic
msg_topic = mqtt_get_publish_topic ( msg_buf , & msg_topic_len ) ;
if ( msg_topic = = NULL ) {
ESP_LOGE ( TAG , " %s: mqtt_get_publish_topic() failed " , __func__ ) ;
return ESP_FAIL ;
}
ESP_LOGD ( TAG , " %s: msg_topic_len=%zu " , __func__ , msg_topic_len ) ;
2018-02-16 02:40:16 +07:00
2022-02-25 10:12:30 +08:00
// get payload
msg_data = mqtt_get_publish_data ( msg_buf , & msg_data_len ) ;
if ( msg_data_len > 0 & & msg_data = = NULL ) {
ESP_LOGE ( TAG , " %s: mqtt_get_publish_data() failed " , __func__ ) ;
return ESP_FAIL ;
}
2019-04-26 16:38:09 +02:00
}
// post data event
2021-05-23 15:43:07 +02:00
client - > event . retain = mqtt_get_retain ( msg_buf ) ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > event . msg_id = mqtt5_get_id ( msg_buf , msg_read_len ) ;
# endif
} else {
client - > event . msg_id = mqtt_get_id ( msg_buf , msg_read_len ) ;
}
2021-09-20 12:07:53 +02:00
client - > event . qos = mqtt_get_qos ( msg_buf ) ;
2021-09-20 14:51:21 +02:00
client - > event . dup = mqtt_get_dup ( msg_buf ) ;
2019-04-26 16:38:09 +02:00
client - > event . total_data_len = msg_data_len + msg_total_len - msg_read_len ;
post_data_event :
2021-02-19 14:02:31 +00:00
ESP_LOGD ( TAG , " Get data len= %zu, topic len=%zu, total_data: %d offset: %zu " , msg_data_len , msg_topic_len ,
2019-05-06 13:45:33 +02:00
client - > event . total_data_len , msg_data_offset ) ;
2019-04-26 16:38:09 +02:00
client - > event . event_id = MQTT_EVENT_DATA ;
2019-05-06 09:37:50 +02:00
client - > event . data = msg_data_len > 0 ? msg_data : NULL ;
2019-04-26 16:38:09 +02:00
client - > event . data_len = msg_data_len ;
client - > event . current_data_offset = msg_data_offset ;
2019-05-06 09:37:50 +02:00
client - > event . topic = msg_topic ;
2019-04-26 16:38:09 +02:00
client - > event . topic_len = msg_topic_len ;
esp_mqtt_dispatch_event ( client ) ;
if ( msg_read_len < msg_total_len ) {
size_t buf_len = client - > mqtt_state . in_buffer_length ;
2019-05-06 13:45:33 +02:00
msg_data = ( char * ) client - > mqtt_state . in_buffer ;
2019-04-26 16:38:09 +02:00
msg_topic = NULL ;
msg_topic_len = 0 ;
msg_data_offset + = msg_data_len ;
2022-03-24 13:05:08 +04:00
int ret = esp_transport_read ( client - > transport , ( char * ) client - > mqtt_state . in_buffer ,
2022-09-08 15:03:04 +02:00
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len ,
client - > config - > network_timeout_ms ) ;
2022-03-24 13:05:08 +04:00
if ( ret < = 0 ) {
2021-12-15 15:11:15 +05:30
return esp_mqtt_handle_transport_read_error ( ret , client ) = = 0 ? ESP_OK : ESP_FAIL ;
2018-02-16 02:40:16 +07:00
}
2021-12-15 15:11:15 +05:30
2022-03-24 13:05:08 +04:00
msg_data_len = ret ;
2019-04-26 16:38:09 +02:00
msg_read_len + = msg_data_len ;
goto post_data_event ;
}
return ESP_OK ;
2018-02-16 02:40:16 +07:00
}
2021-09-20 14:51:21 +02:00
static esp_err_t deliver_suback ( esp_mqtt_client_handle_t client )
{
uint8_t * msg_buf = client - > mqtt_state . in_buffer ;
size_t msg_data_len = client - > mqtt_state . in_buffer_read_len ;
char * msg_data = NULL ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
msg_data = mqtt5_get_suback_data ( msg_buf , & msg_data_len , & client - > event . property - > user_property ) ;
# endif
} else {
msg_data = mqtt_get_suback_data ( msg_buf , & msg_data_len ) ;
}
2021-09-20 14:51:21 +02:00
if ( msg_data_len < = 0 ) {
ESP_LOGE ( TAG , " Failed to acquire suback data " ) ;
return ESP_FAIL ;
}
2022-09-08 15:03:04 +02:00
client - > event . error_handle - > esp_tls_stack_err = 0 ;
client - > event . error_handle - > esp_tls_last_esp_err = 0 ;
client - > event . error_handle - > esp_tls_cert_verify_flags = 0 ;
client - > event . error_handle - > error_type = MQTT_ERROR_TYPE_NONE ;
client - > event . error_handle - > connect_return_code = MQTT_CONNECTION_ACCEPTED ;
2021-09-20 14:51:21 +02:00
// post data event
2022-12-05 15:22:48 +01:00
if ( ( uint8_t ) * msg_data = = 0x80 ) {
2022-09-08 15:03:04 +02:00
client - > event . error_handle - > error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED ;
}
2021-09-20 14:51:21 +02:00
client - > event . data_len = msg_data_len ;
client - > event . total_data_len = msg_data_len ;
client - > event . event_id = MQTT_EVENT_SUBSCRIBED ;
client - > event . data = msg_data ;
client - > event . current_data_offset = 0 ;
esp_mqtt_dispatch_event_with_msgid ( client ) ;
return ESP_OK ;
}
2018-02-16 02:40:16 +07:00
static bool is_valid_mqtt_msg ( esp_mqtt_client_handle_t client , int msg_type , int msg_id )
{
ESP_LOGD ( TAG , " pending_id=%d, pending_msg_count = %d " , client - > mqtt_state . pending_msg_id , client - > mqtt_state . pending_msg_count ) ;
if ( client - > mqtt_state . pending_msg_count = = 0 ) {
return false ;
}
if ( outbox_delete ( client - > outbox , msg_id , msg_type ) = = ESP_OK ) {
client - > mqtt_state . pending_msg_count - - ;
return true ;
}
return false ;
}
2021-06-09 13:54:33 +01:00
static outbox_item_handle_t mqtt_enqueue_oversized ( esp_mqtt_client_handle_t client , uint8_t * remaining_data , int remaining_len )
2018-10-25 16:38:25 +02:00
{
ESP_LOGD ( TAG , " mqtt_enqueue_oversized id: %d, type=%d successful " ,
client - > mqtt_state . pending_msg_id , client - > mqtt_state . pending_msg_type ) ;
//lock mutex
outbox_message_t msg = { 0 } ;
msg . data = client - > mqtt_state . outbound_message - > data ;
msg . len = client - > mqtt_state . outbound_message - > length ;
msg . msg_id = client - > mqtt_state . pending_msg_id ;
msg . msg_type = client - > mqtt_state . pending_msg_type ;
2018-12-20 21:46:55 +01:00
msg . msg_qos = client - > mqtt_state . pending_publish_qos ;
2018-10-25 16:38:25 +02:00
msg . remaining_data = remaining_data ;
msg . remaining_len = remaining_len ;
//Copy to queue buffer
2021-06-09 13:54:33 +01:00
return outbox_enqueue ( client - > outbox , & msg , platform_tick_get_ms ( ) ) ;
2018-10-25 16:38:25 +02:00
//unlock
}
2021-06-09 13:54:33 +01:00
static outbox_item_handle_t mqtt_enqueue ( esp_mqtt_client_handle_t client )
2018-02-16 02:40:16 +07:00
{
2018-02-17 12:53:17 +07:00
ESP_LOGD ( TAG , " mqtt_enqueue id: %d, type=%d successful " ,
2018-05-03 21:50:24 +07:00
client - > mqtt_state . pending_msg_id , client - > mqtt_state . pending_msg_type ) ;
2018-02-16 02:40:16 +07:00
if ( client - > mqtt_state . pending_msg_count > 0 ) {
2018-10-25 16:38:25 +02:00
outbox_message_t msg = { 0 } ;
msg . data = client - > mqtt_state . outbound_message - > data ;
msg . len = client - > mqtt_state . outbound_message - > length ;
msg . msg_id = client - > mqtt_state . pending_msg_id ;
msg . msg_type = client - > mqtt_state . pending_msg_type ;
2018-12-20 21:46:55 +01:00
msg . msg_qos = client - > mqtt_state . pending_publish_qos ;
2018-02-16 02:40:16 +07:00
//Copy to queue buffer
2021-06-09 13:54:33 +01:00
return outbox_enqueue ( client - > outbox , & msg , platform_tick_get_ms ( ) ) ;
2018-02-16 02:40:16 +07:00
}
2021-06-09 13:54:33 +01:00
return NULL ;
2018-02-16 02:40:16 +07:00
}
2019-04-26 16:38:09 +02:00
/*
* Returns :
* - 1 in case of failure
* 0 if no message has been received
* 1 if a message has been received and placed to client - > mqtt_state :
* message length : client - > mqtt_state . message_length
* message content : client - > mqtt_state . in_buffer
*/
static int mqtt_message_receive ( esp_mqtt_client_handle_t client , int read_poll_timeout_ms )
{
int read_len , total_len , fixed_header_len ;
uint8_t * buf = client - > mqtt_state . in_buffer + client - > mqtt_state . in_buffer_read_len ;
esp_transport_handle_t t = client - > transport ;
client - > mqtt_state . message_length = 0 ;
if ( client - > mqtt_state . in_buffer_read_len = = 0 ) {
/*
* Read first byte of the mqtt packet fixed header , it contains packet
* type and flags .
*/
read_len = esp_transport_read ( t , ( char * ) buf , 1 , read_poll_timeout_ms ) ;
2021-12-15 15:11:15 +05:30
if ( read_len < = 0 ) {
return esp_mqtt_handle_transport_read_error ( read_len , client ) ;
2019-04-26 16:38:09 +02:00
}
ESP_LOGD ( TAG , " %s: first byte: 0x%x " , __func__ , * buf ) ;
/*
* Verify the flags and act according to MQTT protocol : close connection
* if the flags are set incorrectly .
*/
if ( ! mqtt_has_valid_msg_hdr ( buf , read_len ) ) {
ESP_LOGE ( TAG , " %s: received a message with an invalid header=0x%x " , __func__ , * buf ) ;
goto err ;
}
buf + + ;
client - > mqtt_state . in_buffer_read_len + + ;
}
if ( ( client - > mqtt_state . in_buffer_read_len = = 1 ) | |
2019-05-06 13:45:33 +02:00
( ( client - > mqtt_state . in_buffer_read_len < 6 ) & & ( * ( buf - 1 ) & 0x80 ) ) ) {
2019-04-26 16:38:09 +02:00
do {
/*
* Read the " remaining length " part of mqtt packet fixed header . It
* starts at second byte and spans up to 4 bytes , but we accept here
* only up to 2 bytes of remaining length , i . e . messages with
* maximal remaining length value = 16383 ( maximal total message
* size of 16386 bytes ) .
*/
read_len = esp_transport_read ( t , ( char * ) buf , 1 , read_poll_timeout_ms ) ;
2021-12-15 15:11:15 +05:30
if ( read_len < = 0 ) {
return esp_mqtt_handle_transport_read_error ( read_len , client ) ;
2019-04-26 16:38:09 +02:00
}
ESP_LOGD ( TAG , " %s: read \" remaining length \" byte: 0x%x " , __func__ , * buf ) ;
buf + + ;
client - > mqtt_state . in_buffer_read_len + + ;
} while ( ( client - > mqtt_state . in_buffer_read_len < 6 ) & & ( * ( buf - 1 ) & 0x80 ) ) ;
}
total_len = mqtt_get_total_length ( client - > mqtt_state . in_buffer , client - > mqtt_state . in_buffer_read_len , & fixed_header_len ) ;
2021-02-19 14:02:31 +00:00
ESP_LOGD ( TAG , " %s: total message length: %d (already read: %zu) " , __func__ , total_len , client - > mqtt_state . in_buffer_read_len ) ;
2019-04-26 16:38:09 +02:00
client - > mqtt_state . message_length = total_len ;
if ( client - > mqtt_state . in_buffer_length < total_len ) {
if ( mqtt_get_type ( client - > mqtt_state . in_buffer ) = = MQTT_MSG_TYPE_PUBLISH ) {
/*
* In case larger publish messages , we only need to read full topic , data can be split to multiple data event .
* Evaluate and correct total_len to read only publish message header , so data can be read separately
*/
if ( client - > mqtt_state . in_buffer_read_len < fixed_header_len + 2 ) {
/* read next 2 bytes - topic length to get minimum portion of publish packet */
read_len = esp_transport_read ( t , ( char * ) buf , client - > mqtt_state . in_buffer_read_len - fixed_header_len + 2 , read_poll_timeout_ms ) ;
ESP_LOGD ( TAG , " %s: read_len=%d " , __func__ , read_len ) ;
2021-12-15 15:11:15 +05:30
if ( read_len < = 0 ) {
return esp_mqtt_handle_transport_read_error ( read_len , client ) ;
2019-04-26 16:38:09 +02:00
}
client - > mqtt_state . in_buffer_read_len + = read_len ;
buf + = read_len ;
if ( client - > mqtt_state . in_buffer_read_len < fixed_header_len + 2 ) {
2021-02-19 14:02:31 +00:00
ESP_LOGD ( TAG , " %s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu) " ,
2019-04-26 16:38:09 +02:00
__func__ , total_len , client - > mqtt_state . in_buffer_read_len ) ;
return 0 ;
}
}
int topic_len = client - > mqtt_state . in_buffer [ fixed_header_len ] < < 8 ;
2019-05-06 13:45:33 +02:00
topic_len | = client - > mqtt_state . in_buffer [ fixed_header_len + 1 ] ;
total_len = fixed_header_len + topic_len + ( mqtt_get_qos ( client - > mqtt_state . in_buffer ) > 0 ? 2 : 0 ) ;
2019-04-26 16:38:09 +02:00
ESP_LOGD ( TAG , " %s: total len modified to %d as message longer than input buffer " , __func__ , total_len ) ;
if ( client - > mqtt_state . in_buffer_length < total_len ) {
ESP_LOGE ( TAG , " %s: message is too big, insufficient buffer size " , __func__ ) ;
goto err ;
} else {
total_len = client - > mqtt_state . in_buffer_length ;
}
/* free to continue with reading */
} else {
ESP_LOGE ( TAG , " %s: message is too big, insufficient buffer size " , __func__ ) ;
goto err ;
}
}
if ( client - > mqtt_state . in_buffer_read_len < total_len ) {
/* read the rest of the mqtt message */
read_len = esp_transport_read ( t , ( char * ) buf , total_len - client - > mqtt_state . in_buffer_read_len , read_poll_timeout_ms ) ;
ESP_LOGD ( TAG , " %s: read_len=%d " , __func__ , read_len ) ;
2021-12-15 15:11:15 +05:30
if ( read_len < = 0 ) {
return esp_mqtt_handle_transport_read_error ( read_len , client ) ;
2019-04-26 16:38:09 +02:00
}
client - > mqtt_state . in_buffer_read_len + = read_len ;
if ( client - > mqtt_state . in_buffer_read_len < total_len ) {
2021-02-19 14:02:31 +00:00
ESP_LOGD ( TAG , " %s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu) " ,
2019-04-26 16:38:09 +02:00
__func__ , total_len , client - > mqtt_state . in_buffer_read_len ) ;
return 0 ;
}
}
2021-02-19 14:02:31 +00:00
ESP_LOGD ( TAG , " %s: transport_read():%zu %zu " , __func__ , client - > mqtt_state . in_buffer_read_len , client - > mqtt_state . message_length ) ;
2019-04-26 16:38:09 +02:00
return 1 ;
err :
2020-07-27 06:57:00 +02:00
esp_mqtt_client_dispatch_transport_error ( client ) ;
2019-04-26 16:38:09 +02:00
return - 1 ;
}
2018-02-16 02:40:16 +07:00
static esp_err_t mqtt_process_receive ( esp_mqtt_client_handle_t client )
{
2022-02-25 10:12:30 +08:00
uint8_t msg_type = 0 , msg_qos = 0 ;
uint16_t msg_id = 0 ;
2018-02-16 02:40:16 +07:00
2019-04-26 16:38:09 +02:00
/* non-blocking receive in order not to block other tasks */
int recv = mqtt_message_receive ( client , 0 ) ;
if ( recv < 0 ) {
ESP_LOGE ( TAG , " %s: mqtt_message_receive() returned %d " , __func__ , recv ) ;
2018-02-16 02:40:16 +07:00
return ESP_FAIL ;
}
2019-04-26 16:38:09 +02:00
if ( recv = = 0 ) {
2018-02-16 02:40:16 +07:00
return ESP_OK ;
}
2019-04-26 16:38:09 +02:00
int read_len = client - > mqtt_state . message_length ;
2018-02-16 02:40:16 +07:00
2019-04-26 16:38:09 +02:00
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type ( client - > mqtt_state . in_buffer ) ;
msg_qos = mqtt_get_qos ( client - > mqtt_state . in_buffer ) ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
msg_id = mqtt5_get_id ( client - > mqtt_state . in_buffer , read_len ) ;
# endif
} else {
msg_id = mqtt_get_id ( client - > mqtt_state . in_buffer , read_len ) ;
}
2018-10-02 17:23:14 +02:00
2019-04-26 16:38:09 +02:00
ESP_LOGD ( TAG , " msg_type=%d, msg_id=%d " , msg_type , msg_id ) ;
2018-02-16 02:40:16 +07:00
2019-05-06 13:45:33 +02:00
switch ( msg_type ) {
case MQTT_MSG_TYPE_SUBACK :
if ( is_valid_mqtt_msg ( client , MQTT_MSG_TYPE_SUBSCRIBE , msg_id ) ) {
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_suback ( client ) ;
# endif
2021-09-20 14:51:21 +02:00
ESP_LOGD ( TAG , " deliver_suback, message_length_read=%zu, message_length=%zu " , client - > mqtt_state . in_buffer_read_len , client - > mqtt_state . message_length ) ;
if ( deliver_suback ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Failed to deliver suback message id=%d " , msg_id ) ;
return ESP_FAIL ;
}
2019-05-06 13:45:33 +02:00
}
break ;
case MQTT_MSG_TYPE_UNSUBACK :
if ( is_valid_mqtt_msg ( client , MQTT_MSG_TYPE_UNSUBSCRIBE , msg_id ) ) {
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_unsuback ( client ) ;
# endif
2019-05-06 13:45:33 +02:00
ESP_LOGD ( TAG , " UnSubscribe successful " ) ;
client - > event . event_id = MQTT_EVENT_UNSUBSCRIBED ;
esp_mqtt_dispatch_event_with_msgid ( client ) ;
}
break ;
case MQTT_MSG_TYPE_PUBLISH :
2021-02-19 14:02:31 +00:00
ESP_LOGD ( TAG , " deliver_publish, message_length_read=%zu, message_length=%zu " , client - > mqtt_state . in_buffer_read_len , client - > mqtt_state . message_length ) ;
2019-05-06 13:45:33 +02:00
if ( deliver_publish ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Failed to deliver publish message id=%d " , msg_id ) ;
return ESP_FAIL ;
}
if ( msg_qos = = 1 ) {
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > mqtt_state . outbound_message = mqtt5_msg_puback ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_puback ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
}
2019-05-06 13:45:33 +02:00
} else if ( msg_qos = = 2 ) {
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > mqtt_state . outbound_message = mqtt5_msg_pubrec ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_pubrec ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
}
2019-05-06 13:45:33 +02:00
}
2019-11-19 16:05:47 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
ESP_LOGE ( TAG , " Publish response message PUBACK or PUBREC cannot be created " ) ;
return ESP_FAIL ;
}
2019-04-26 16:38:09 +02:00
2019-05-06 13:45:33 +02:00
if ( msg_qos = = 1 | | msg_qos = = 2 ) {
ESP_LOGD ( TAG , " Queue response QoS: %d " , msg_qos ) ;
2019-04-26 16:38:09 +02:00
2019-05-06 13:45:33 +02:00
if ( mqtt_write_data ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Error write qos msg repsonse, qos = %d " , msg_qos ) ;
return ESP_FAIL ;
2019-04-26 16:38:09 +02:00
}
2019-05-06 13:45:33 +02:00
}
break ;
case MQTT_MSG_TYPE_PUBACK :
if ( is_valid_mqtt_msg ( client , MQTT_MSG_TYPE_PUBLISH , msg_id ) ) {
ESP_LOGD ( TAG , " received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish " ) ;
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_puback ( client ) ;
# endif
2019-05-06 13:45:33 +02:00
client - > event . event_id = MQTT_EVENT_PUBLISHED ;
esp_mqtt_dispatch_event_with_msgid ( client ) ;
}
break ;
case MQTT_MSG_TYPE_PUBREC :
ESP_LOGD ( TAG , " received MQTT_MSG_TYPE_PUBREC " ) ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
ESP_LOGI ( TAG , " MQTT_MSG_TYPE_PUBREC return code is %d " , mqtt5_msg_get_reason_code ( client - > mqtt_state . in_buffer , client - > mqtt_state . in_buffer_read_len ) ) ;
client - > mqtt_state . outbound_message = mqtt5_msg_pubrel ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_pubrel ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
}
2019-11-19 16:05:47 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
ESP_LOGE ( TAG , " Publish response message PUBREL cannot be created " ) ;
return ESP_FAIL ;
}
2019-09-27 19:58:09 +08:00
outbox_set_pending ( client - > outbox , msg_id , ACKNOWLEDGED ) ;
2019-05-06 13:45:33 +02:00
mqtt_write_data ( client ) ;
break ;
case MQTT_MSG_TYPE_PUBREL :
ESP_LOGD ( TAG , " received MQTT_MSG_TYPE_PUBREL " ) ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
ESP_LOGI ( TAG , " MQTT_MSG_TYPE_PUBREL return code is %d " , mqtt5_msg_get_reason_code ( client - > mqtt_state . in_buffer , client - > mqtt_state . in_buffer_read_len ) ) ;
client - > mqtt_state . outbound_message = mqtt5_msg_pubcomp ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_pubcomp ( & client - > mqtt_state . mqtt_connection , msg_id ) ;
}
2019-11-19 16:05:47 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
ESP_LOGE ( TAG , " Publish response message PUBCOMP cannot be created " ) ;
return ESP_FAIL ;
}
2019-05-06 13:45:33 +02:00
mqtt_write_data ( client ) ;
break ;
case MQTT_MSG_TYPE_PUBCOMP :
ESP_LOGD ( TAG , " received MQTT_MSG_TYPE_PUBCOMP " ) ;
if ( is_valid_mqtt_msg ( client , MQTT_MSG_TYPE_PUBLISH , msg_id ) ) {
ESP_LOGD ( TAG , " Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish " ) ;
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_pubcomp ( client ) ;
# endif
2019-05-06 13:45:33 +02:00
client - > event . event_id = MQTT_EVENT_PUBLISHED ;
esp_mqtt_dispatch_event_with_msgid ( client ) ;
}
break ;
case MQTT_MSG_TYPE_PINGRESP :
ESP_LOGD ( TAG , " MQTT_MSG_TYPE_PINGRESP " ) ;
client - > wait_for_ping_resp = false ;
2022-03-22 14:26:49 -03:00
/* It is the responsibility of the Client to ensure that the interval between Control Packets
* being sent does not exceed the Keep Alive value . In the absence of sending any other Control
* Packets , the Client MUST send a PINGREQ Packet [ MQTT - 3.1 .2 - 23 ] .
* [ MQTT - 3.1 .2 - 23 ]
*/
client - > keepalive_tick = platform_tick_get_ms ( ) ;
2019-05-06 13:45:33 +02:00
break ;
2018-10-02 17:23:14 +02:00
}
2019-04-26 16:38:09 +02:00
client - > mqtt_state . in_buffer_read_len = 0 ;
2018-02-16 02:40:16 +07:00
return ESP_OK ;
}
2018-12-20 21:46:55 +01:00
static esp_err_t mqtt_resend_queued ( esp_mqtt_client_handle_t client , outbox_item_handle_t item )
{
// decode queued data
client - > mqtt_state . outbound_message - > data = outbox_item_get_data ( item , & client - > mqtt_state . outbound_message - > length , & client - > mqtt_state . pending_msg_id ,
2019-05-06 13:45:33 +02:00
& client - > mqtt_state . pending_msg_type , & client - > mqtt_state . pending_publish_qos ) ;
2020-02-20 16:00:15 +01:00
// set duplicate flag for QoS-1 and QoS-2 messages
2021-11-25 16:38:26 +08:00
if ( client - > mqtt_state . pending_msg_type = = MQTT_MSG_TYPE_PUBLISH & & client - > mqtt_state . pending_publish_qos > 0 & & ( outbox_item_get_pending ( item ) = = TRANSMITTED ) ) {
2018-12-20 21:46:55 +01:00
mqtt_set_dup ( client - > mqtt_state . outbound_message - > data ) ;
2022-09-08 15:03:04 +02:00
ESP_LOGD ( TAG , " Sending Duplicated QoS%d message with id=%d " , client - > mqtt_state . pending_publish_qos , client - > mqtt_state . pending_msg_id ) ;
2018-12-20 21:46:55 +01:00
}
// try to resend the data
if ( mqtt_write_data ( client ) ! = ESP_OK ) {
2019-09-30 14:19:43 +02:00
ESP_LOGE ( TAG , " Error to resend data " ) ;
2018-12-20 21:46:55 +01:00
esp_mqtt_abort_connection ( client ) ;
return ESP_FAIL ;
}
2020-12-08 20:57:00 +01:00
// check if it was QoS-0 publish message
if ( client - > mqtt_state . pending_msg_type = = MQTT_MSG_TYPE_PUBLISH & & client - > mqtt_state . pending_publish_qos = = 0 ) {
2021-02-19 14:02:31 +00:00
// delete all qos0 publish messages once we process them
if ( outbox_delete_item ( client - > outbox , item ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Failed to remove queued qos0 message from the outbox " ) ;
}
2020-12-08 20:57:00 +01:00
}
2018-12-20 21:46:55 +01:00
return ESP_OK ;
}
2020-12-06 15:23:08 +01:00
static void mqtt_delete_expired_messages ( esp_mqtt_client_handle_t client )
{
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
# if MQTT_REPORT_DELETED_MESSAGES
// also report the deleted items as MQTT_EVENT_DELETED events if enabled
int deleted_items = 0 ;
int msg_id = 0 ;
while ( ( msg_id = outbox_delete_single_expired ( client - > outbox , platform_tick_get_ms ( ) , OUTBOX_EXPIRED_TIMEOUT_MS ) ) > 0 ) {
client - > event . event_id = MQTT_EVENT_DELETED ;
client - > event . msg_id = msg_id ;
if ( esp_mqtt_dispatch_event ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Failed to post event on deleting message id=%d " , msg_id ) ;
}
deleted_items + + ;
}
# else
int deleted_items = outbox_delete_expired ( client - > outbox , platform_tick_get_ms ( ) , OUTBOX_EXPIRED_TIMEOUT_MS ) ;
# endif
client - > mqtt_state . pending_msg_count - = deleted_items ;
if ( client - > mqtt_state . pending_msg_count < 0 ) {
client - > mqtt_state . pending_msg_count = 0 ;
}
}
2022-07-24 21:10:38 +02:00
/**
* @ brief When using multiple queued item , we ' d like to reduce the poll timeout to proceed with event loop exacution
*/
static inline int max_poll_timeout ( esp_mqtt_client_handle_t client , int max_timeout )
{
return
# if MQTT_EVENT_QUEUE_SIZE > 1
2022-09-08 15:03:04 +02:00
atomic_load ( & client - > queued_events ) > 0 ? 10 : max_timeout ;
2022-07-24 21:10:38 +02:00
# else
2022-09-08 15:03:04 +02:00
max_timeout ;
2022-07-24 21:10:38 +02:00
# endif
}
static inline void run_event_loop ( esp_mqtt_client_handle_t client )
{
# if MQTT_EVENT_QUEUE_SIZE > 1
if ( atomic_load ( & client - > queued_events ) > 0 ) {
2022-09-08 15:03:04 +02:00
atomic_fetch_sub ( & client - > queued_events , 1 ) ;
2022-07-24 21:10:38 +02:00
# else
{
# endif
esp_err_t ret = esp_event_loop_run ( client - > config - > event_loop_handle , 0 ) ;
if ( ret ! = ESP_OK ) {
ESP_LOGE ( TAG , " Error in running event_loop %d " , ret ) ;
}
}
}
2018-02-16 02:40:16 +07:00
static void esp_mqtt_task ( void * pv )
{
esp_mqtt_client_handle_t client = ( esp_mqtt_client_handle_t ) pv ;
2020-01-09 09:45:16 +01:00
uint64_t last_retransmit = 0 ;
outbox_tick_t msg_tick = 0 ;
2018-02-16 02:40:16 +07:00
client - > run = true ;
//get transport by scheme
2018-09-26 11:53:54 +02:00
client - > transport = esp_transport_list_get_transport ( client - > transport_list , client - > config - > scheme ) ;
2018-02-16 02:40:16 +07:00
if ( client - > transport = = NULL ) {
2018-05-03 21:50:24 +07:00
ESP_LOGE ( TAG , " There are no transports valid, stop mqtt client, config scheme = %s " , client - > config - > scheme ) ;
2018-02-16 02:40:16 +07:00
client - > run = false ;
}
//default port
if ( client - > config - > port = = 0 ) {
2018-09-26 11:53:54 +02:00
client - > config - > port = esp_transport_get_default_port ( client - > transport ) ;
2018-02-16 02:40:16 +07:00
}
client - > state = MQTT_STATE_INIT ;
2018-02-16 22:48:22 +07:00
xEventGroupClearBits ( client - > status_bits , STOPPED_BIT ) ;
2018-02-16 02:40:16 +07:00
while ( client - > run ) {
2018-12-21 13:52:48 +01:00
MQTT_API_LOCK ( client ) ;
2022-07-24 21:10:38 +02:00
run_event_loop ( client ) ;
2021-01-26 14:25:41 +00:00
switch ( client - > state ) {
case MQTT_STATE_DISCONNECTED :
break ;
2019-05-06 13:45:33 +02:00
case MQTT_STATE_INIT :
2020-01-16 16:30:52 +01:00
xEventGroupClearBits ( client - > status_bits , RECONNECT_BIT | DISCONNECT_BIT ) ;
2019-05-06 13:45:33 +02:00
client - > event . event_id = MQTT_EVENT_BEFORE_CONNECT ;
esp_mqtt_dispatch_event_with_msgid ( client ) ;
if ( client - > transport = = NULL ) {
2019-09-30 14:19:43 +02:00
ESP_LOGE ( TAG , " There is no transport " ) ;
2019-05-06 13:45:33 +02:00
client - > run = false ;
}
2020-01-19 22:28:35 +01:00
# if MQTT_ENABLE_SSL
esp_mqtt_set_ssl_transport_properties ( client - > transport_list , client - > config ) ;
# endif
2018-02-16 02:40:16 +07:00
2019-05-06 13:45:33 +02:00
if ( esp_transport_connect ( client - > transport ,
2018-02-16 02:40:16 +07:00
client - > config - > host ,
client - > config - > port ,
client - > config - > network_timeout_ms ) < 0 ) {
2019-05-06 13:45:33 +02:00
ESP_LOGE ( TAG , " Error transport connect " ) ;
2020-07-27 06:57:00 +02:00
esp_mqtt_client_dispatch_transport_error ( client ) ;
2019-05-06 13:45:33 +02:00
esp_mqtt_abort_connection ( client ) ;
break ;
}
ESP_LOGD ( TAG , " Transport connected to %s://%s:%d " , client - > config - > scheme , client - > config - > host , client - > config - > port ) ;
if ( esp_mqtt_connect ( client , client - > config - > network_timeout_ms ) ! = ESP_OK ) {
2020-04-24 23:05:47 +05:30
ESP_LOGE ( TAG , " MQTT connect failed " ) ;
2019-05-06 13:45:33 +02:00
esp_mqtt_abort_connection ( client ) ;
break ;
}
client - > event . event_id = MQTT_EVENT_CONNECTED ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver ! = MQTT_PROTOCOL_V_5 ) {
client - > event . session_present = mqtt_get_connect_session_present ( client - > mqtt_state . in_buffer ) ;
}
2019-05-06 13:45:33 +02:00
client - > state = MQTT_STATE_CONNECTED ;
esp_mqtt_dispatch_event_with_msgid ( client ) ;
client - > refresh_connection_tick = platform_tick_get_ms ( ) ;
2022-06-06 12:52:22 -03:00
client - > keepalive_tick = platform_tick_get_ms ( ) ;
2018-02-16 02:40:16 +07:00
2019-05-06 13:45:33 +02:00
break ;
case MQTT_STATE_CONNECTED :
2020-01-16 16:30:52 +01:00
// check for disconnection request
if ( xEventGroupWaitBits ( client - > status_bits , DISCONNECT_BIT , true , true , 0 ) & DISCONNECT_BIT ) {
2021-10-02 10:34:45 +02:00
send_disconnect_msg ( client ) ; // ignore error, if clean disconnect fails, just abort the connection
2020-01-16 16:30:52 +01:00
esp_mqtt_abort_connection ( client ) ;
break ;
}
2019-05-06 13:45:33 +02:00
// receive and process data
if ( mqtt_process_receive ( client ) = = ESP_FAIL ) {
esp_mqtt_abort_connection ( client ) ;
2018-02-16 02:40:16 +07:00
break ;
2019-05-06 13:45:33 +02:00
}
2018-02-16 02:40:16 +07:00
2020-12-06 15:23:08 +01:00
// delete long pending messages
mqtt_delete_expired_messages ( client ) ;
2020-08-10 15:47:46 +08:00
2019-05-06 13:45:33 +02:00
// resend all non-transmitted messages first
outbox_item_handle_t item = outbox_dequeue ( client - > outbox , QUEUED , NULL ) ;
if ( item ) {
if ( mqtt_resend_queued ( client , item ) = = ESP_OK ) {
outbox_set_pending ( client - > outbox , client - > mqtt_state . pending_msg_id , TRANSMITTED ) ;
2018-12-20 21:46:55 +01:00
}
2019-05-06 13:45:33 +02:00
// resend other "transmitted" messages after 1s
2022-06-06 12:52:22 -03:00
} else if ( has_timed_out ( last_retransmit , client - > config - > message_retransmit_timeout ) ) {
2019-05-06 13:45:33 +02:00
last_retransmit = platform_tick_get_ms ( ) ;
item = outbox_dequeue ( client - > outbox , TRANSMITTED , & msg_tick ) ;
2021-08-30 15:59:59 +05:00
if ( item & & ( last_retransmit - msg_tick > client - > config - > message_retransmit_timeout ) ) {
2019-05-06 13:45:33 +02:00
mqtt_resend_queued ( client , item ) ;
2018-02-16 02:40:16 +07:00
}
2019-05-06 13:45:33 +02:00
}
2018-02-17 01:03:35 +07:00
2022-03-22 14:26:49 -03:00
if ( process_keepalive ( client ) ! = ESP_OK ) {
break ;
2019-05-06 13:45:33 +02:00
}
if ( client - > config - > refresh_connection_after_ms & &
2022-09-08 15:03:04 +02:00
has_timed_out ( client - > refresh_connection_tick , client - > config - > refresh_connection_after_ms ) ) {
2019-05-06 13:45:33 +02:00
ESP_LOGD ( TAG , " Refreshing the connection... " ) ;
esp_mqtt_abort_connection ( client ) ;
client - > state = MQTT_STATE_INIT ;
}
break ;
2021-01-26 14:25:41 +00:00
case MQTT_STATE_WAIT_RECONNECT :
2019-05-06 13:45:33 +02:00
2021-10-03 09:05:49 +02:00
if ( ! client - > config - > auto_reconnect & & xEventGroupGetBits ( client - > status_bits ) & RECONNECT_BIT ) {
xEventGroupClearBits ( client - > status_bits , RECONNECT_BIT ) ;
client - > state = MQTT_STATE_INIT ;
client - > wait_timeout_ms = MQTT_RECON_DEFAULT_MS ;
ESP_LOGD ( TAG , " Reconnecting per user request... " ) ;
2019-05-06 13:45:33 +02:00
break ;
2021-10-03 09:05:49 +02:00
} else if ( client - > config - > auto_reconnect & &
platform_tick_get_ms ( ) - client - > reconnect_tick > client - > wait_timeout_ms ) {
2019-05-06 13:45:33 +02:00
client - > state = MQTT_STATE_INIT ;
client - > reconnect_tick = platform_tick_get_ms ( ) ;
ESP_LOGD ( TAG , " Reconnecting... " ) ;
break ;
}
MQTT_API_UNLOCK ( client ) ;
xEventGroupWaitBits ( client - > status_bits , RECONNECT_BIT , false , true ,
2022-07-24 21:10:38 +02:00
max_poll_timeout ( client , client - > wait_timeout_ms / 2 / portTICK_PERIOD_MS ) ) ;
2019-09-30 14:19:43 +02:00
// continue the while loop instead of break, as the mutex is unlocked
2019-05-06 13:45:33 +02:00
continue ;
2021-01-26 14:25:41 +00:00
default :
ESP_LOGE ( TAG , " MQTT client error, client is in an unrecoverable state. " ) ;
break ;
2018-12-21 13:52:48 +01:00
}
MQTT_API_UNLOCK ( client ) ;
if ( MQTT_STATE_CONNECTED = = client - > state ) {
2022-07-24 21:10:38 +02:00
if ( esp_transport_poll_read ( client - > transport , max_poll_timeout ( client , MQTT_POLL_READ_TIMEOUT_MS ) ) < 0 ) {
2018-12-21 13:52:48 +01:00
ESP_LOGE ( TAG , " Poll read error: %d, aborting connection " , errno ) ;
esp_mqtt_abort_connection ( client ) ;
}
2018-02-16 02:40:16 +07:00
}
2018-12-21 13:52:48 +01:00
2018-02-16 02:40:16 +07:00
}
2018-09-26 11:53:54 +02:00
esp_transport_close ( client - > transport ) ;
2021-06-20 21:25:35 +02:00
outbox_delete_all_items ( client - > outbox ) ;
2018-02-16 22:48:22 +07:00
xEventGroupSetBits ( client - > status_bits , STOPPED_BIT ) ;
2022-10-26 13:09:01 +02:00
client - > state = MQTT_STATE_DISCONNECTED ;
2018-02-16 02:40:16 +07:00
vTaskDelete ( NULL ) ;
}
esp_err_t esp_mqtt_client_start ( esp_mqtt_client_handle_t client )
{
2019-05-06 08:49:38 +02:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return ESP_ERR_INVALID_ARG ;
}
2020-01-17 14:06:56 +01:00
MQTT_API_LOCK ( client ) ;
2021-01-26 14:25:41 +00:00
if ( client - > state ! = MQTT_STATE_INIT & & client - > state ! = MQTT_STATE_DISCONNECTED ) {
2018-02-16 02:40:16 +07:00
ESP_LOGE ( TAG , " Client has started " ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
return ESP_FAIL ;
}
2022-05-23 22:19:24 +08:00
if ( esp_mqtt_client_create_transport ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Failed to create transport list " ) ;
MQTT_API_UNLOCK ( client ) ;
return ESP_FAIL ;
}
2019-06-18 10:19:44 +02:00
esp_err_t err = ESP_OK ;
2018-07-25 08:58:38 +02:00
# if MQTT_CORE_SELECTION_ENABLED
2019-05-06 13:45:33 +02:00
ESP_LOGD ( TAG , " Core selection enabled on %u " , MQTT_TASK_CORE ) ;
if ( xTaskCreatePinnedToCore ( esp_mqtt_task , " mqtt_task " , client - > config - > task_stack , client , client - > config - > task_prio , & client - > task_handle , MQTT_TASK_CORE ) ! = pdTRUE ) {
ESP_LOGE ( TAG , " Error create mqtt task " ) ;
2019-06-18 10:19:44 +02:00
err = ESP_FAIL ;
2019-05-06 13:45:33 +02:00
}
2018-07-25 08:58:38 +02:00
# else
2019-05-06 13:45:33 +02:00
ESP_LOGD ( TAG , " Core selection disabled " ) ;
if ( xTaskCreate ( esp_mqtt_task , " mqtt_task " , client - > config - > task_stack , client , client - > config - > task_prio , & client - > task_handle ) ! = pdTRUE ) {
ESP_LOGE ( TAG , " Error create mqtt task " ) ;
2019-06-18 10:19:44 +02:00
err = ESP_FAIL ;
2019-05-06 13:45:33 +02:00
}
2018-07-25 08:58:38 +02:00
# endif
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2019-06-18 10:19:44 +02:00
return err ;
2018-02-16 02:40:16 +07:00
}
2020-01-16 16:30:52 +01:00
esp_err_t esp_mqtt_client_disconnect ( esp_mqtt_client_handle_t client )
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return ESP_ERR_INVALID_ARG ;
}
2020-01-16 16:30:52 +01:00
ESP_LOGI ( TAG , " Client asked to disconnect " ) ;
xEventGroupSetBits ( client - > status_bits , DISCONNECT_BIT ) ;
return ESP_OK ;
}
2018-10-24 10:57:45 +02:00
esp_err_t esp_mqtt_client_reconnect ( esp_mqtt_client_handle_t client )
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return ESP_ERR_INVALID_ARG ;
}
2018-10-24 10:57:45 +02:00
ESP_LOGI ( TAG , " Client force reconnect requested " ) ;
2021-01-26 14:25:41 +00:00
if ( client - > state ! = MQTT_STATE_WAIT_RECONNECT ) {
2018-10-24 10:57:45 +02:00
ESP_LOGD ( TAG , " The client is not waiting for reconnection. Ignore the request " ) ;
return ESP_FAIL ;
}
client - > wait_timeout_ms = 0 ;
xEventGroupSetBits ( client - > status_bits , RECONNECT_BIT ) ;
return ESP_OK ;
}
2021-10-02 10:34:45 +02:00
static esp_err_t send_disconnect_msg ( esp_mqtt_client_handle_t client )
{
// Notify the broker we are disconnecting
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > mqtt_state . outbound_message = mqtt5_msg_disconnect ( & client - > mqtt_state . mqtt_connection , & client - > mqtt5_config - > disconnect_property_info ) ;
if ( client - > mqtt_state . outbound_message - > length ) {
esp_mqtt5_client_delete_user_property ( client - > mqtt5_config - > disconnect_property_info . user_property ) ;
client - > mqtt5_config - > disconnect_property_info . user_property = NULL ;
memset ( & client - > mqtt5_config - > disconnect_property_info , 0 , sizeof ( esp_mqtt5_disconnect_property_config_t ) ) ;
}
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_disconnect ( & client - > mqtt_state . mqtt_connection ) ;
}
2021-10-02 10:34:45 +02:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
ESP_LOGE ( TAG , " Disconnect message cannot be created " ) ;
return ESP_FAIL ;
}
if ( mqtt_write_data ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Error sending disconnect message " ) ;
}
return ESP_OK ;
}
2018-02-16 02:40:16 +07:00
esp_err_t esp_mqtt_client_stop ( esp_mqtt_client_handle_t client )
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return ESP_ERR_INVALID_ARG ;
}
2020-01-17 14:06:56 +01:00
MQTT_API_LOCK ( client ) ;
2018-10-04 13:50:37 +02:00
if ( client - > run ) {
2020-08-04 19:16:13 +08:00
/* A running client cannot be stopped from the MQTT task/event handler */
TaskHandle_t running_task = xTaskGetCurrentTaskHandle ( ) ;
if ( running_task = = client - > task_handle ) {
2020-11-09 16:04:27 +08:00
MQTT_API_UNLOCK ( client ) ;
2020-08-04 19:16:13 +08:00
ESP_LOGE ( TAG , " Client cannot be stopped from MQTT task " ) ;
return ESP_FAIL ;
}
2019-06-11 19:57:58 +02:00
// Only send the disconnect message if the client is connected
2020-12-31 12:24:25 +00:00
if ( client - > state = = MQTT_STATE_CONNECTED ) {
2021-10-02 10:34:45 +02:00
if ( send_disconnect_msg ( client ) ! = ESP_OK ) {
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2019-11-19 16:05:47 +08:00
return ESP_FAIL ;
}
2019-03-22 21:10:10 +01:00
}
2019-05-06 13:45:33 +02:00
2018-10-04 13:50:37 +02:00
client - > run = false ;
2021-01-26 14:25:41 +00:00
client - > state = MQTT_STATE_DISCONNECTED ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2019-08-01 19:01:54 -04:00
xEventGroupWaitBits ( client - > status_bits , STOPPED_BIT , false , true , portMAX_DELAY ) ;
2018-10-04 13:50:37 +02:00
return ESP_OK ;
} else {
ESP_LOGW ( TAG , " Client asked to stop, but was not started " ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-10-04 13:50:37 +02:00
return ESP_FAIL ;
}
2018-02-16 02:40:16 +07:00
}
2018-03-16 12:25:34 +01:00
static esp_err_t esp_mqtt_client_ping ( esp_mqtt_client_handle_t client )
2018-02-16 02:40:16 +07:00
{
client - > mqtt_state . outbound_message = mqtt_msg_pingreq ( & client - > mqtt_state . mqtt_connection ) ;
2019-11-19 16:05:47 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
ESP_LOGE ( TAG , " Ping message cannot be created " ) ;
return ESP_FAIL ;
}
2018-02-16 02:40:16 +07:00
if ( mqtt_write_data ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Error sending ping " ) ;
2018-03-16 12:25:34 +01:00
return ESP_FAIL ;
2018-02-16 02:40:16 +07:00
}
ESP_LOGD ( TAG , " Sent PING successful " ) ;
2018-03-16 12:25:34 +01:00
return ESP_OK ;
2018-02-16 02:40:16 +07:00
}
int esp_mqtt_client_subscribe ( esp_mqtt_client_handle_t client , const char * topic , int qos )
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return - 1 ;
}
2020-01-17 14:06:56 +01:00
MQTT_API_LOCK ( client ) ;
2018-02-16 02:40:16 +07:00
if ( client - > state ! = MQTT_STATE_CONNECTED ) {
ESP_LOGE ( TAG , " Client has not connected " ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
return - 1 ;
}
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
if ( esp_mqtt5_client_subscribe_check ( client , qos ) ! = ESP_OK ) {
ESP_LOGI ( TAG , " MQTT5 subscribe check fail " ) ;
MQTT_API_UNLOCK ( client ) ;
return - 1 ;
}
client - > mqtt_state . outbound_message = mqtt5_msg_subscribe ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
topic , qos ,
& client - > mqtt_state . pending_msg_id , client - > mqtt5_config - > subscribe_property_info ) ;
2022-02-25 10:12:30 +08:00
if ( client - > mqtt_state . outbound_message - > length ) {
client - > mqtt5_config - > subscribe_property_info = NULL ;
}
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_subscribe ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
topic , qos ,
& client - > mqtt_state . pending_msg_id ) ;
2022-02-25 10:12:30 +08:00
}
2019-11-19 16:05:47 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
ESP_LOGE ( TAG , " Subscribe message cannot be created " ) ;
2020-02-06 19:37:47 +05:00
MQTT_API_UNLOCK ( client ) ;
2019-11-19 16:05:47 +08:00
return - 1 ;
}
2018-02-16 02:40:16 +07:00
client - > mqtt_state . pending_msg_type = mqtt_get_type ( client - > mqtt_state . outbound_message - > data ) ;
client - > mqtt_state . pending_msg_count + + ;
2021-06-09 13:54:33 +01:00
//move pending msg to outbox (if have)
if ( ! mqtt_enqueue ( client ) ) {
MQTT_API_UNLOCK ( client ) ;
return - 1 ;
}
outbox_set_pending ( client - > outbox , client - > mqtt_state . pending_msg_id , TRANSMITTED ) ; // handle error
2018-02-16 02:40:16 +07:00
if ( mqtt_write_data ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Error to subscribe topic=%s, qos=%d " , topic , qos ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
return - 1 ;
}
2018-02-17 12:53:17 +07:00
ESP_LOGD ( TAG , " Sent subscribe topic=%s, id: %d, type=%d successful " , topic , client - > mqtt_state . pending_msg_id , client - > mqtt_state . pending_msg_type ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
return client - > mqtt_state . pending_msg_id ;
}
int esp_mqtt_client_unsubscribe ( esp_mqtt_client_handle_t client , const char * topic )
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return - 1 ;
}
2020-01-17 14:06:56 +01:00
MQTT_API_LOCK ( client ) ;
2018-02-16 02:40:16 +07:00
if ( client - > state ! = MQTT_STATE_CONNECTED ) {
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
ESP_LOGE ( TAG , " Client has not connected " ) ;
return - 1 ;
}
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > mqtt_state . outbound_message = mqtt5_msg_unsubscribe ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
topic ,
& client - > mqtt_state . pending_msg_id , client - > mqtt5_config - > unsubscribe_property_info ) ;
2022-02-25 10:12:30 +08:00
if ( client - > mqtt_state . outbound_message - > length ) {
client - > mqtt5_config - > unsubscribe_property_info = NULL ;
}
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_unsubscribe ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
topic ,
& client - > mqtt_state . pending_msg_id ) ;
2022-02-25 10:12:30 +08:00
}
2019-11-19 16:05:47 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
2020-02-10 07:58:36 +01:00
MQTT_API_UNLOCK ( client ) ;
2019-11-19 16:05:47 +08:00
ESP_LOGE ( TAG , " Unubscribe message cannot be created " ) ;
return - 1 ;
}
2018-02-16 02:40:16 +07:00
ESP_LOGD ( TAG , " unsubscribe, topic \" %s \" , id: %d " , topic , client - > mqtt_state . pending_msg_id ) ;
client - > mqtt_state . pending_msg_type = mqtt_get_type ( client - > mqtt_state . outbound_message - > data ) ;
client - > mqtt_state . pending_msg_count + + ;
2021-06-09 13:54:33 +01:00
if ( ! mqtt_enqueue ( client ) ) {
MQTT_API_UNLOCK ( client ) ;
return - 1 ;
}
outbox_set_pending ( client - > outbox , client - > mqtt_state . pending_msg_id , TRANSMITTED ) ; //handle error
2018-02-16 02:40:16 +07:00
if ( mqtt_write_data ( client ) ! = ESP_OK ) {
ESP_LOGE ( TAG , " Error to unsubscribe topic=%s " , topic ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
return - 1 ;
}
ESP_LOGD ( TAG , " Sent Unsubscribe topic=%s, id: %d, successful " , topic , client - > mqtt_state . pending_msg_id ) ;
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:40:16 +07:00
return client - > mqtt_state . pending_msg_id ;
}
2020-12-08 20:57:00 +01:00
static inline int mqtt_client_enqueue_priv ( esp_mqtt_client_handle_t client , const char * topic , const char * data ,
2021-02-19 14:02:31 +00:00
int len , int qos , int retain , bool store )
2018-02-16 02:40:16 +07:00
{
2018-02-16 22:48:22 +07:00
uint16_t pending_msg_id = 0 ;
2022-02-25 10:12:30 +08:00
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
# ifdef MQTT_PROTOCOL_5
client - > mqtt_state . outbound_message = mqtt5_msg_publish ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
topic , data , len ,
qos , retain ,
& pending_msg_id , client - > mqtt5_config - > publish_property_info , client - > mqtt5_config - > server_resp_property_info . response_info ) ;
2022-02-25 10:12:30 +08:00
if ( client - > mqtt_state . outbound_message - > length ) {
client - > mqtt5_config - > publish_property_info = NULL ;
}
# endif
} else {
client - > mqtt_state . outbound_message = mqtt_msg_publish ( & client - > mqtt_state . mqtt_connection ,
2022-09-08 15:03:04 +02:00
topic , data , len ,
qos , retain ,
& pending_msg_id ) ;
2022-02-25 10:12:30 +08:00
}
2018-10-05 17:30:31 +02:00
2022-02-25 10:12:30 +08:00
if ( client - > mqtt_state . outbound_message - > length = = 0 ) {
2019-05-06 08:49:38 +02:00
ESP_LOGE ( TAG , " Publish message cannot be created " ) ;
2019-11-19 16:05:47 +08:00
return - 1 ;
2019-05-06 08:49:38 +02:00
}
2019-02-05 17:01:54 +01:00
/* We have to set as pending all the qos>0 messages */
2022-05-03 14:13:23 -03:00
//TODO: client->mqtt_state.outbound_message = publish_msg;
2020-12-08 20:57:00 +01:00
if ( qos > 0 | | store ) {
2018-02-16 02:50:29 +07:00
client - > mqtt_state . pending_msg_type = mqtt_get_type ( client - > mqtt_state . outbound_message - > data ) ;
client - > mqtt_state . pending_msg_id = pending_msg_id ;
2018-12-20 21:46:55 +01:00
client - > mqtt_state . pending_publish_qos = qos ;
2018-02-16 02:50:29 +07:00
client - > mqtt_state . pending_msg_count + + ;
2019-02-05 17:01:54 +01:00
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if ( client - > mqtt_state . mqtt_connection . message . fragmented_msg_total_length = = 0 ) {
2021-06-09 13:54:33 +01:00
if ( ! mqtt_enqueue ( client ) ) {
return - 1 ;
}
2020-01-17 20:32:32 +01:00
} else {
int first_fragment = client - > mqtt_state . outbound_message - > length - client - > mqtt_state . outbound_message - > fragmented_msg_data_offset ;
2021-06-09 13:54:33 +01:00
if ( ! mqtt_enqueue_oversized ( client , ( ( uint8_t * ) data ) + first_fragment , len - first_fragment ) ) {
return - 1 ;
}
2020-07-13 18:52:34 +05:00
client - > mqtt_state . outbound_message - > fragmented_msg_total_length = 0 ;
2019-02-05 17:01:54 +01:00
}
2018-02-16 02:50:29 +07:00
}
2020-07-13 18:52:34 +05:00
return pending_msg_id ;
}
int esp_mqtt_client_publish ( esp_mqtt_client_handle_t client , const char * topic , const char * data , int len , int qos , int retain )
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return - 1 ;
}
2020-07-13 18:52:34 +05:00
MQTT_API_LOCK ( client ) ;
2020-12-06 14:08:45 +01:00
# if MQTT_SKIP_PUBLISH_IF_DISCONNECTED
if ( client - > state ! = MQTT_STATE_CONNECTED ) {
2020-12-08 20:57:00 +01:00
ESP_LOGI ( TAG , " Publishing skipped: client is not connected " ) ;
2020-12-06 14:08:45 +01:00
MQTT_API_UNLOCK ( client ) ;
return - 1 ;
}
# endif
2021-12-21 15:31:56 +08:00
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
if ( esp_mqtt5_client_publish_check ( client , qos , retain ) ! = ESP_OK ) {
ESP_LOGI ( TAG , " MQTT5 publish check fail " ) ;
MQTT_API_UNLOCK ( client ) ;
return - 1 ;
}
}
# endif
2021-12-21 15:31:56 +08:00
/* Acceptable publish messages:
data = = NULL , len = = 0 : publish null message
data valid , len = = 0 : publish all data , payload len is determined from string length
data valid , len > 0 : publish data with defined length
*/
if ( len < = 0 & & data ! = NULL ) {
len = strlen ( data ) ;
}
2020-12-08 20:57:00 +01:00
int pending_msg_id = mqtt_client_enqueue_priv ( client , topic , data , len , qos , retain , false ) ;
2020-07-13 18:52:34 +05:00
if ( pending_msg_id < 0 ) {
MQTT_API_UNLOCK ( client ) ;
return - 1 ;
}
int ret = 0 ;
2018-02-16 02:50:29 +07:00
2018-12-20 21:46:55 +01:00
/* Skip sending if not connected (rely on resending) */
if ( client - > state ! = MQTT_STATE_CONNECTED ) {
ESP_LOGD ( TAG , " Publish: client is not connected " ) ;
2020-08-10 14:47:05 +08:00
if ( qos > 0 ) {
ret = pending_msg_id ;
}
2020-08-10 15:47:46 +08:00
2020-12-06 15:23:08 +01:00
// delete long pending messages
mqtt_delete_expired_messages ( client ) ;
2020-08-10 15:47:46 +08:00
2018-12-20 21:46:55 +01:00
goto cannot_publish ;
}
2018-10-25 16:38:25 +02:00
/* Provide support for sending fragmented message if it doesn't fit buffer */
int remaining_len = len ;
const char * current_data = data ;
bool sending = true ;
while ( sending ) {
if ( mqtt_write_data ( client ) ! = ESP_OK ) {
2018-12-20 21:46:55 +01:00
esp_mqtt_abort_connection ( client ) ;
2019-11-19 16:05:47 +08:00
ret = - 1 ;
2018-12-20 21:46:55 +01:00
goto cannot_publish ;
2018-10-25 16:38:25 +02:00
}
int data_sent = client - > mqtt_state . outbound_message - > length - client - > mqtt_state . outbound_message - > fragmented_msg_data_offset ;
2020-01-17 20:32:32 +01:00
client - > mqtt_state . outbound_message - > fragmented_msg_data_offset = 0 ;
client - > mqtt_state . outbound_message - > fragmented_msg_total_length = 0 ;
2018-10-25 16:38:25 +02:00
remaining_len - = data_sent ;
current_data + = data_sent ;
if ( remaining_len > 0 ) {
2019-05-06 13:45:33 +02:00
mqtt_connection_t * connection = & client - > mqtt_state . mqtt_connection ;
2018-10-25 16:38:25 +02:00
ESP_LOGD ( TAG , " Sending fragmented message, remains to send %d bytes of %d " , remaining_len , len ) ;
if ( remaining_len > connection - > buffer_length ) {
// Continue with sending
memcpy ( connection - > buffer , current_data , connection - > buffer_length ) ;
connection - > message . length = connection - > buffer_length ;
sending = true ;
} else {
memcpy ( connection - > buffer , current_data , remaining_len ) ;
connection - > message . length = remaining_len ;
sending = true ;
}
connection - > message . data = connection - > buffer ;
client - > mqtt_state . outbound_message = & connection - > message ;
} else {
// Message was sent correctly
sending = false ;
}
2018-02-16 02:40:16 +07:00
}
2018-12-20 21:46:55 +01:00
2019-11-18 14:43:44 +08:00
if ( qos > 0 ) {
2019-09-30 14:57:24 +08:00
//Tick is set after transmit to avoid retransmitting too early due slow network speed / big messages
outbox_set_tick ( client - > outbox , pending_msg_id , platform_tick_get_ms ( ) ) ;
2018-12-20 21:46:55 +01:00
outbox_set_pending ( client - > outbox , pending_msg_id , TRANSMITTED ) ;
}
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2018-02-16 02:50:29 +07:00
return pending_msg_id ;
2018-12-20 21:46:55 +01:00
cannot_publish :
2020-01-17 20:32:32 +01:00
// clear out possible fragmented publish if failed or skipped
client - > mqtt_state . outbound_message - > fragmented_msg_total_length = 0 ;
2018-12-20 21:46:55 +01:00
if ( qos = = 0 ) {
2019-09-30 14:19:43 +02:00
ESP_LOGW ( TAG , " Publish: Losing qos0 data when client not connected " ) ;
2018-12-20 21:46:55 +01:00
}
2020-01-17 14:06:56 +01:00
MQTT_API_UNLOCK ( client ) ;
2020-01-17 20:32:32 +01:00
2019-11-19 16:05:47 +08:00
return ret ;
2018-02-16 02:40:16 +07:00
}
2020-12-08 20:57:00 +01:00
int esp_mqtt_client_enqueue ( esp_mqtt_client_handle_t client , const char * topic , const char * data , int len , int qos , int retain , bool store )
2020-07-13 18:52:34 +05:00
{
2021-03-04 18:30:27 +04:00
if ( ! client ) {
ESP_LOGE ( TAG , " Client was not initialized " ) ;
return - 1 ;
}
2022-05-30 21:23:27 +01:00
/* Acceptable publish messages:
data = = NULL , len = = 0 : publish null message
data valid , len = = 0 : publish all data , payload len is determined from string length
data valid , len > 0 : publish data with defined length
*/
if ( len < = 0 & & data ! = NULL ) {
len = strlen ( data ) ;
}
2020-07-13 18:52:34 +05:00
MQTT_API_LOCK ( client ) ;
2022-02-25 10:12:30 +08:00
# ifdef MQTT_PROTOCOL_5
if ( client - > connect_info . protocol_ver = = MQTT_PROTOCOL_V_5 ) {
if ( esp_mqtt5_client_publish_check ( client , qos , retain ) ! = ESP_OK ) {
ESP_LOGI ( TAG , " esp_mqtt_client_enqueue check fail " ) ;
MQTT_API_UNLOCK ( client ) ;
return - 1 ;
}
}
# endif
2020-12-08 20:57:00 +01:00
int ret = mqtt_client_enqueue_priv ( client , topic , data , len , qos , retain , store ) ;
2020-07-13 18:52:34 +05:00
MQTT_API_UNLOCK ( client ) ;
2020-12-08 20:57:00 +01:00
if ( ret = = 0 & & store = = false ) {
// messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error
2020-07-13 18:52:34 +05:00
return - 1 ;
}
return ret ;
}
2018-02-16 02:40:16 +07:00
2020-12-31 12:24:25 +00:00
esp_err_t esp_mqtt_client_register_event ( esp_mqtt_client_handle_t client , esp_mqtt_event_id_t event , esp_event_handler_t event_handler , void * event_handler_arg )
2018-12-18 16:43:08 +01:00
{
if ( client = = NULL ) {
return ESP_ERR_INVALID_ARG ;
}
2019-05-29 15:12:25 +02:00
# ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
2018-12-18 16:43:08 +01:00
if ( client - > config - > event_handle ) {
ESP_LOGW ( TAG , " Registering event loop while event callback is not null, clearing callback " ) ;
client - > config - > event_handle = NULL ;
}
return esp_event_handler_register_with ( client - > config - > event_loop_handle , MQTT_EVENTS , event , event_handler , event_handler_arg ) ;
2019-05-29 15:12:25 +02:00
# else
ESP_LOGE ( TAG , " Registering event handler while event loop not available in IDF version %s " , IDF_VER ) ;
return ESP_FAIL ;
# endif
2018-12-18 16:43:08 +01:00
}
2020-08-07 17:54:40 +08:00
2022-07-19 06:42:13 -03:00
esp_err_t esp_mqtt_client_unregister_event ( esp_mqtt_client_handle_t client , esp_mqtt_event_id_t event , esp_event_handler_t event_handler )
{
if ( client = = NULL ) {
return ESP_ERR_INVALID_ARG ;
}
# ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if ( client - > config - > event_handle ) {
ESP_LOGW ( TAG , " Unregistering event loop while event callback is not null, clearing callback " ) ;
client - > config - > event_handle = NULL ;
}
return esp_event_handler_unregister_with ( client - > config - > event_loop_handle , MQTT_EVENTS , event , event_handler ) ;
# else
ESP_LOGE ( TAG , " Unregistering event handler while event loop not available in IDF version %s " , IDF_VER ) ;
return ESP_FAIL ;
# endif
}
2020-08-07 17:54:40 +08:00
2020-07-27 06:57:00 +02:00
static void esp_mqtt_client_dispatch_transport_error ( esp_mqtt_client_handle_t client )
2020-08-07 17:54:40 +08:00
{
2020-12-31 12:24:25 +00:00
client - > event . event_id = MQTT_EVENT_ERROR ;
client - > event . error_handle - > error_type = MQTT_ERROR_TYPE_TCP_TRANSPORT ;
client - > event . error_handle - > connect_return_code = 0 ;
2020-08-07 17:54:40 +08:00
# ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_ERR_REPORTING
2020-12-31 12:24:25 +00:00
client - > event . error_handle - > esp_tls_last_esp_err = esp_tls_get_and_clear_last_error ( esp_transport_get_error_handle ( client - > transport ) ,
& client - > event . error_handle - > esp_tls_stack_err ,
& client - > event . error_handle - > esp_tls_cert_verify_flags ) ;
2020-07-27 06:57:00 +02:00
# ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_SOCK_ERRNO_REPORTING
2020-12-31 12:24:25 +00:00
client - > event . error_handle - > esp_transport_sock_errno = esp_transport_get_errno ( client - > transport ) ;
2020-07-27 06:57:00 +02:00
# endif
2020-08-07 17:54:40 +08:00
# endif
2020-12-31 12:24:25 +00:00
esp_mqtt_dispatch_event_with_msgid ( client ) ;
2020-08-07 17:54:40 +08:00
}
2020-11-11 17:12:50 +08:00
int esp_mqtt_client_get_outbox_size ( esp_mqtt_client_handle_t client )
{
int outbox_size = 0 ;
if ( client = = NULL ) {
return 0 ;
}
MQTT_API_LOCK ( client ) ;
if ( client - > outbox ) {
outbox_size = outbox_get_size ( client - > outbox ) ;
}
MQTT_API_UNLOCK ( client ) ;
return outbox_size ;
}