diff --git a/Kconfig b/Kconfig index c17f83c..323cc39 100644 --- a/Kconfig +++ b/Kconfig @@ -1,4 +1,4 @@ -menu "ESPMQTT Configurations" +menu "ESP-MQTT Configurations" config MQTT_PROTOCOL_311 bool "Enable MQTT protocol 3.1.1" diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 01a2fe1..6728ab4 100755 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -32,10 +32,10 @@ typedef enum { typedef enum { MQTT_TRANSPORT_UNKNOWN = 0x0, - MQTT_TRANSPORT_OVER_TCP, - MQTT_TRANSPORT_OVER_SSL, - MQTT_TRANSPORT_OVER_WS, - MQTT_TRANSPORT_OVER_WSS + MQTT_TRANSPORT_OVER_TCP, /*!< MQTT over TCP, using scheme: ``mqtt`` */ + MQTT_TRANSPORT_OVER_SSL, /*!< MQTT over SSL, using scheme: ``mqtts`` */ + MQTT_TRANSPORT_OVER_WS, /*!< MQTT over Websocket, using scheme:: ``ws`` */ + MQTT_TRANSPORT_OVER_WSS /*!< MQTT over Websocket Secure, using scheme: ``wss`` */ } esp_mqtt_transport_t; typedef struct { @@ -57,27 +57,27 @@ typedef esp_err_t (* mqtt_event_callback_t)(esp_mqtt_event_handle_t event); typedef struct { - mqtt_event_callback_t event_handle; - const char *host; + mqtt_event_callback_t event_handle; /*!< handle for MQTT events */ + const char *host; /*!< MQTT server domain (ipv4 as string) */ const char *uri; - uint32_t port; - const char *client_id; - const char *username; - const char *password; - const char *lwt_topic; - const char *lwt_msg; - int lwt_qos; - int lwt_retain; - int lwt_msg_len; - int disable_clean_session; - int keepalive; - bool disable_auto_reconnect; - void *user_context; - int task_prio; - int task_stack; - int buffer_size; - const char *cert_pem; - esp_mqtt_transport_t transport; + uint32_t port; /*!< MQTT server port */ + const char *client_id; /*!< default client id is ``ESP32_%CHIPID%`` where %CHIPID% are last 3 bytes of MAC address in hex format */ + const char *username; /*!< MQTT username */ + const char *password; /*!< MQTT password */ + const char *lwt_topic; /*!< LWT (Last Will and Testament) message topic (NULL by default) */ + const char *lwt_msg; /*!< LWT message (NULL by default) */ + int lwt_qos; /*!< LWT message qos */ + int lwt_retain; /*!< LWT retained message flag */ + int lwt_msg_len; /*!< LWT message length */ + int disable_clean_session; /*!< mqtt clean session, default clean_session is true */ + int keepalive; /*!< mqtt keepalive, default is 120 seconds */ + bool disable_auto_reconnect; /*!< this mqtt client will reconnect to server (when errors/disconnect). Set disable_auto_reconnect=true to disable */ + void *user_context; /*!< pass user context to this option, then can receive that context in ``event->user_context`` */ + int task_prio; /*!< MQTT task priority, default is 5, can be changed in ``make menuconfig`` */ + int task_stack; /*!< MQTT task stack size, default is 6144 bytes, can be changed in ``make menuconfig`` */ + int buffer_size; /*!< size of MQTT send/receive buffer, default is 1024 */ + const char *cert_pem; /*!< pointer to CERT file for server verify (with SSL), default is NULL, not required to verify the server */ + esp_mqtt_transport_t transport; /*!< overrides URI transport */ } esp_mqtt_client_config_t; esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config); diff --git a/lib/include/transport.h b/lib/include/transport.h deleted file mode 100644 index 9dbd880..0000000 --- a/lib/include/transport.h +++ /dev/null @@ -1,242 +0,0 @@ -/* - * This file is subject to the terms and conditions defined in - * file 'LICENSE', which is part of this source code package. - * Tuan PM - */ -#ifndef _TRANSPORT_H_ -#define _TRANSPORT_H_ - -#include - -#ifdef __cplusplus -extern "C" { -#endif - - -typedef struct transport_list_t* transport_list_handle_t; -typedef struct transport_item_t* transport_handle_t; - -typedef int (*connect_func)(transport_handle_t t, const char *host, int port, int timeout_ms); -typedef int (*io_func)(transport_handle_t t, const char *buffer, int len, int timeout_ms); -typedef int (*io_read_func)(transport_handle_t t, char *buffer, int len, int timeout_ms); -typedef int (*trans_func)(transport_handle_t t); -typedef int (*poll_func)(transport_handle_t t, int timeout_ms); - -/** - * @brief Create transport list - * - * @return A handle can hold all transports - */ -transport_list_handle_t transport_list_init(); - -/** - * @brief Cleanup and free all transports, include itself, - * this function will invoke transport_destroy of every transport have added this the list - * - * @param[in] list The list - * - * @return - * - ESP_OK - * - ESP_FAIL - */ -esp_err_t transport_list_destroy(transport_list_handle_t list); - -/** - * @brief Add a transport to the list, and define a scheme to indentify this transport in the list - * - * @param[in] list The list - * @param[in] t The Transport - * @param[in] scheme The scheme - * - * @return - * - ESP_OK - */ -esp_err_t transport_list_add(transport_list_handle_t list, transport_handle_t t, const char *scheme); - -/** - * @brief This function will remove all transport from the list, - * invoke transport_destroy of every transport have added this the list - * - * @param[in] list The list - * - * @return - * - ESP_OK - * - ESP_ERR_INVALID_ARG - */ -esp_err_t transport_list_clean(transport_list_handle_t list); - -/** - * @brief Get the transport by scheme, which has been defined when calling function `transport_list_add` - * - * @param[in] list The list - * @param[in] tag The tag - * - * @return The transport handle - */ -transport_handle_t transport_list_get_transport(transport_list_handle_t list, const char *scheme); - -/** - * @brief Initialize a transport handle object - * - * @return The transport handle - */ -transport_handle_t transport_init(); - -/** - * @brief Cleanup and free memory the transport - * - * @param[in] t The transport handle - * - * @return - * - ESP_OK - * - ESP_FAIL - */ -esp_err_t transport_destroy(transport_handle_t t); - -/** - * @brief Get default port number used by this transport - * - * @param[in] t The transport handle - * - * @return the port number - */ -int transport_get_default_port(transport_handle_t t); - -/** - * @brief Set default port number that can be used by this transport - * - * @param[in] t The transport handle - * @param[in] port The port number - * - * @return - * - ESP_OK - * - ESP_FAIL - */ -esp_err_t transport_set_default_port(transport_handle_t t, int port); - -/** - * @brief Transport connection function, to make a connection to server - * - * @param t The transport handle - * @param[in] host Hostname - * @param[in] port Port - * @param[in] timeout_ms The timeout milliseconds - * - * @return - * - socket for will use by this transport - * - (-1) if there are any errors, should check errno - */ -int transport_connect(transport_handle_t t, const char *host, int port, int timeout_ms); - -/** - * @brief Transport read function - * - * @param t The transport handle - * @param buffer The buffer - * @param[in] len The length - * @param[in] timeout_ms The timeout milliseconds - * - * @return - * - Number of bytes was read - * - (-1) if there are any errors, should check errno - */ -int transport_read(transport_handle_t t, char *buffer, int len, int timeout_ms); - -/** - * @brief Poll the transport until readable or timeout - * - * @param[in] t The transport handle - * @param[in] timeout_ms The timeout milliseconds - * - * @return - * - 0 Timeout - * - (-1) If there are any errors, should check errno - * - other The transport can read - */ -int transport_poll_read(transport_handle_t t, int timeout_ms); - -/** - * @brief Transport write function - * - * @param t The transport handle - * @param buffer The buffer - * @param[in] len The length - * @param[in] timeout_ms The timeout milliseconds - * - * @return - * - Number of bytes was written - * - (-1) if there are any errors, should check errno - */ -int transport_write(transport_handle_t t, const char *buffer, int len, int timeout_ms); - -/** - * @brief Poll the transport until writeable or timeout - * - * @param[in] t The transport handle - * @param[in] timeout_ms The timeout milliseconds - * - * @return - * - 0 Timeout - * - (-1) If there are any errors, should check errno - * - other The transport can write - */ -int transport_poll_write(transport_handle_t t, int timeout_ms); - -/** - * @brief Transport close - * - * @param t The transport handle - * - * @return - * - 0 if ok - * - (-1) if there are any errors, should check errno - */ -int transport_close(transport_handle_t t); - -/** - * @brief Get user data context of this transport - * - * @param[in] t The transport handle - * - * @return The user data context - */ -void *transport_get_context_data(transport_handle_t t); - -/** - * @brief Set the user context data for this transport - * - * @param[in] t The transport handle - * @param data The user data context - * - * @return - * - ESP_OK - */ -esp_err_t transport_set_context_data(transport_handle_t t, void *data); - -/** - * @brief Set transport functions for the transport handle - * - * @param[in] t The transport handle - * @param[in] _connect The connect function pointer - * @param[in] _read The read function pointer - * @param[in] _write The write function pointer - * @param[in] _close The close function pointer - * @param[in] _poll_read The poll read function pointer - * @param[in] _poll_write The poll write function pointer - * @param[in] _destroy The destroy function pointer - * - * @return - * - ESP_OK - */ -esp_err_t transport_set_func(transport_handle_t t, - connect_func _connect, - io_read_func _read, - io_func _write, - trans_func _close, - poll_func _poll_read, - poll_func _poll_write, - trans_func _destroy); -#ifdef __cplusplus -} -#endif -#endif diff --git a/lib/include/transport_ssl.h b/lib/include/transport_ssl.h deleted file mode 100644 index 2469aa5..0000000 --- a/lib/include/transport_ssl.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * This file is subject to the terms and conditions defined in - * file 'LICENSE', which is part of this source code package. - * Tuan PM - */ -#ifndef _TRANSPORT_SSL_H_ -#define _TRANSPORT_SSL_H_ - -#include "transport.h" - -#ifdef __cplusplus -extern "C" { -#endif - - -/** - * @brief Create new SSL transport, the transport handle must be release transport_destroy callback - * - * @return the allocated transport_handle_t, or NULL if the handle can not be allocated - */ -transport_handle_t transport_ssl_init(); - -/** - * @brief Set SSL certificate data (as PEM format). - * Note that, this function stores the pointer to data, rather than making a copy. - * So we need to make sure to keep the data lifetime before cleanup the connection - * - * @param t ssl transport - * @param[in] data The pem data - * @param[in] len The length - */ -void transport_ssl_set_cert_data(transport_handle_t t, const char *data, int len); - - -#ifdef __cplusplus -} -#endif -#endif - diff --git a/lib/include/transport_tcp.h b/lib/include/transport_tcp.h deleted file mode 100644 index 99160f3..0000000 --- a/lib/include/transport_tcp.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * This file is subject to the terms and conditions defined in - * file 'LICENSE', which is part of this source code package. - * Tuan PM - */ -#ifndef _TRANSPORT_TCP_H_ -#define _TRANSPORT_TCP_H_ - -#include "transport.h" - -#ifdef __cplusplus -extern "C" { -#endif - -/** - * @brief Create TCP transport, the transport handle must be release transport_destroy callback - * - * @return the allocated transport_handle_t, or NULL if the handle can not be allocated - */ -transport_handle_t transport_tcp_init(); - - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/lib/transport_ws.c b/lib/transport_ws.c index 59599ec..df880ca 100644 --- a/lib/transport_ws.c +++ b/lib/transport_ws.c @@ -19,6 +19,12 @@ typedef struct { transport_handle_t parent; } transport_ws_t; +transport_handle_t ws_transport_get_payload_transport_handle(transport_handle_t t) +{ + transport_ws_t *ws = transport_get_context_data(t); + return ws->parent; +} + static char *trimwhitespace(const char *str) { char *end; @@ -151,6 +157,7 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms) { transport_ws_t *ws = transport_get_context_data(t); int payload_len; + int payload_len_buff = len; char *data_ptr = buffer, opcode, mask, *mask_key = NULL; int rlen; int poll_read; @@ -161,7 +168,6 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms) ESP_LOGE(TAG, "Error read data"); return rlen; } - opcode = (*data_ptr & 0x0F); data_ptr ++; mask = ((*data_ptr >> 7) & 0x01); @@ -171,6 +177,7 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms) if (payload_len == 126) { // headerLen += 2; payload_len = data_ptr[0] << 8 | data_ptr[1]; + payload_len_buff = len - 4; data_ptr += 2; } else if (payload_len == 127) { // headerLen += 8; @@ -182,6 +189,11 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms) payload_len = data_ptr[4] << 24 | data_ptr[5] << 16 | data_ptr[6] << 8 | data_ptr[7]; } data_ptr += 8; + payload_len_buff = len - 10; + } + if (payload_len > payload_len_buff) { + ESP_LOGD(TAG, "Actual data received (%d) are longer than mqtt buffer (%d)", payload_len, payload_len_buff); + payload_len = payload_len_buff; } if (mask) { @@ -244,7 +256,7 @@ transport_handle_t transport_ws_init(transport_handle_t parent_handle) return NULL; }); - transport_set_func(t, ws_connect, ws_read, ws_write, ws_close, ws_poll_read, ws_poll_write, ws_destroy); + transport_set_func(t, ws_connect, ws_read, ws_write, ws_close, ws_poll_read, ws_poll_write, ws_destroy, ws_transport_get_payload_transport_handle); transport_set_context_data(t, ws); return t; } diff --git a/mqtt_client.c b/mqtt_client.c index 2b2c6c4..e73aa0c 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -461,6 +461,11 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) } +typedef struct { + char *path; + char *buffer; + transport_handle_t parent; +} transport_ws_t; static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { @@ -468,6 +473,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i uint32_t mqtt_topic_length, mqtt_data_length; uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0; int len_read; + transport_handle_t transport = client->transport; do { @@ -478,9 +484,13 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; mqtt_len = mqtt_data_length; + /* any further reading only the underlying payload */ + transport = transport_get_payload_transport_handle(transport); } else { mqtt_len = len_read; mqtt_data = (const char*)client->mqtt_state.in_buffer; + mqtt_topic = NULL; + mqtt_topic_length = 0; } ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); @@ -498,7 +508,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i break; } - len_read = transport_read(client->transport, + len_read = transport_read(transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ? client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read,