bugfix for longer data on ws transport

This commit is contained in:
David Cermak
2018-08-16 18:35:08 +02:00
parent 0c25441fdd
commit 3bbdebcca9
7 changed files with 50 additions and 336 deletions

View File

@ -1,4 +1,4 @@
menu "ESPMQTT Configurations" menu "ESP-MQTT Configurations"
config MQTT_PROTOCOL_311 config MQTT_PROTOCOL_311
bool "Enable MQTT protocol 3.1.1" bool "Enable MQTT protocol 3.1.1"

View File

@ -32,10 +32,10 @@ typedef enum {
typedef enum { typedef enum {
MQTT_TRANSPORT_UNKNOWN = 0x0, MQTT_TRANSPORT_UNKNOWN = 0x0,
MQTT_TRANSPORT_OVER_TCP, MQTT_TRANSPORT_OVER_TCP, /*!< MQTT over TCP, using scheme: ``mqtt`` */
MQTT_TRANSPORT_OVER_SSL, MQTT_TRANSPORT_OVER_SSL, /*!< MQTT over SSL, using scheme: ``mqtts`` */
MQTT_TRANSPORT_OVER_WS, MQTT_TRANSPORT_OVER_WS, /*!< MQTT over Websocket, using scheme:: ``ws`` */
MQTT_TRANSPORT_OVER_WSS MQTT_TRANSPORT_OVER_WSS /*!< MQTT over Websocket Secure, using scheme: ``wss`` */
} esp_mqtt_transport_t; } esp_mqtt_transport_t;
typedef struct { typedef struct {
@ -57,27 +57,27 @@ typedef esp_err_t (* mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
typedef struct { typedef struct {
mqtt_event_callback_t event_handle; mqtt_event_callback_t event_handle; /*!< handle for MQTT events */
const char *host; const char *host; /*!< MQTT server domain (ipv4 as string) */
const char *uri; const char *uri;
uint32_t port; uint32_t port; /*!< MQTT server port */
const char *client_id; const char *client_id; /*!< default client id is ``ESP32_%CHIPID%`` where %CHIPID% are last 3 bytes of MAC address in hex format */
const char *username; const char *username; /*!< MQTT username */
const char *password; const char *password; /*!< MQTT password */
const char *lwt_topic; const char *lwt_topic; /*!< LWT (Last Will and Testament) message topic (NULL by default) */
const char *lwt_msg; const char *lwt_msg; /*!< LWT message (NULL by default) */
int lwt_qos; int lwt_qos; /*!< LWT message qos */
int lwt_retain; int lwt_retain; /*!< LWT retained message flag */
int lwt_msg_len; int lwt_msg_len; /*!< LWT message length */
int disable_clean_session; int disable_clean_session; /*!< mqtt clean session, default clean_session is true */
int keepalive; int keepalive; /*!< mqtt keepalive, default is 120 seconds */
bool disable_auto_reconnect; bool disable_auto_reconnect; /*!< this mqtt client will reconnect to server (when errors/disconnect). Set disable_auto_reconnect=true to disable */
void *user_context; void *user_context; /*!< pass user context to this option, then can receive that context in ``event->user_context`` */
int task_prio; int task_prio; /*!< MQTT task priority, default is 5, can be changed in ``make menuconfig`` */
int task_stack; int task_stack; /*!< MQTT task stack size, default is 6144 bytes, can be changed in ``make menuconfig`` */
int buffer_size; int buffer_size; /*!< size of MQTT send/receive buffer, default is 1024 */
const char *cert_pem; const char *cert_pem; /*!< pointer to CERT file for server verify (with SSL), default is NULL, not required to verify the server */
esp_mqtt_transport_t transport; esp_mqtt_transport_t transport; /*!< overrides URI transport */
} esp_mqtt_client_config_t; } esp_mqtt_client_config_t;
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config); esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config);

View File

@ -1,242 +0,0 @@
/*
* This file is subject to the terms and conditions defined in
* file 'LICENSE', which is part of this source code package.
* Tuan PM <tuanpm at live dot com>
*/
#ifndef _TRANSPORT_H_
#define _TRANSPORT_H_
#include <esp_err.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct transport_list_t* transport_list_handle_t;
typedef struct transport_item_t* transport_handle_t;
typedef int (*connect_func)(transport_handle_t t, const char *host, int port, int timeout_ms);
typedef int (*io_func)(transport_handle_t t, const char *buffer, int len, int timeout_ms);
typedef int (*io_read_func)(transport_handle_t t, char *buffer, int len, int timeout_ms);
typedef int (*trans_func)(transport_handle_t t);
typedef int (*poll_func)(transport_handle_t t, int timeout_ms);
/**
* @brief Create transport list
*
* @return A handle can hold all transports
*/
transport_list_handle_t transport_list_init();
/**
* @brief Cleanup and free all transports, include itself,
* this function will invoke transport_destroy of every transport have added this the list
*
* @param[in] list The list
*
* @return
* - ESP_OK
* - ESP_FAIL
*/
esp_err_t transport_list_destroy(transport_list_handle_t list);
/**
* @brief Add a transport to the list, and define a scheme to indentify this transport in the list
*
* @param[in] list The list
* @param[in] t The Transport
* @param[in] scheme The scheme
*
* @return
* - ESP_OK
*/
esp_err_t transport_list_add(transport_list_handle_t list, transport_handle_t t, const char *scheme);
/**
* @brief This function will remove all transport from the list,
* invoke transport_destroy of every transport have added this the list
*
* @param[in] list The list
*
* @return
* - ESP_OK
* - ESP_ERR_INVALID_ARG
*/
esp_err_t transport_list_clean(transport_list_handle_t list);
/**
* @brief Get the transport by scheme, which has been defined when calling function `transport_list_add`
*
* @param[in] list The list
* @param[in] tag The tag
*
* @return The transport handle
*/
transport_handle_t transport_list_get_transport(transport_list_handle_t list, const char *scheme);
/**
* @brief Initialize a transport handle object
*
* @return The transport handle
*/
transport_handle_t transport_init();
/**
* @brief Cleanup and free memory the transport
*
* @param[in] t The transport handle
*
* @return
* - ESP_OK
* - ESP_FAIL
*/
esp_err_t transport_destroy(transport_handle_t t);
/**
* @brief Get default port number used by this transport
*
* @param[in] t The transport handle
*
* @return the port number
*/
int transport_get_default_port(transport_handle_t t);
/**
* @brief Set default port number that can be used by this transport
*
* @param[in] t The transport handle
* @param[in] port The port number
*
* @return
* - ESP_OK
* - ESP_FAIL
*/
esp_err_t transport_set_default_port(transport_handle_t t, int port);
/**
* @brief Transport connection function, to make a connection to server
*
* @param t The transport handle
* @param[in] host Hostname
* @param[in] port Port
* @param[in] timeout_ms The timeout milliseconds
*
* @return
* - socket for will use by this transport
* - (-1) if there are any errors, should check errno
*/
int transport_connect(transport_handle_t t, const char *host, int port, int timeout_ms);
/**
* @brief Transport read function
*
* @param t The transport handle
* @param buffer The buffer
* @param[in] len The length
* @param[in] timeout_ms The timeout milliseconds
*
* @return
* - Number of bytes was read
* - (-1) if there are any errors, should check errno
*/
int transport_read(transport_handle_t t, char *buffer, int len, int timeout_ms);
/**
* @brief Poll the transport until readable or timeout
*
* @param[in] t The transport handle
* @param[in] timeout_ms The timeout milliseconds
*
* @return
* - 0 Timeout
* - (-1) If there are any errors, should check errno
* - other The transport can read
*/
int transport_poll_read(transport_handle_t t, int timeout_ms);
/**
* @brief Transport write function
*
* @param t The transport handle
* @param buffer The buffer
* @param[in] len The length
* @param[in] timeout_ms The timeout milliseconds
*
* @return
* - Number of bytes was written
* - (-1) if there are any errors, should check errno
*/
int transport_write(transport_handle_t t, const char *buffer, int len, int timeout_ms);
/**
* @brief Poll the transport until writeable or timeout
*
* @param[in] t The transport handle
* @param[in] timeout_ms The timeout milliseconds
*
* @return
* - 0 Timeout
* - (-1) If there are any errors, should check errno
* - other The transport can write
*/
int transport_poll_write(transport_handle_t t, int timeout_ms);
/**
* @brief Transport close
*
* @param t The transport handle
*
* @return
* - 0 if ok
* - (-1) if there are any errors, should check errno
*/
int transport_close(transport_handle_t t);
/**
* @brief Get user data context of this transport
*
* @param[in] t The transport handle
*
* @return The user data context
*/
void *transport_get_context_data(transport_handle_t t);
/**
* @brief Set the user context data for this transport
*
* @param[in] t The transport handle
* @param data The user data context
*
* @return
* - ESP_OK
*/
esp_err_t transport_set_context_data(transport_handle_t t, void *data);
/**
* @brief Set transport functions for the transport handle
*
* @param[in] t The transport handle
* @param[in] _connect The connect function pointer
* @param[in] _read The read function pointer
* @param[in] _write The write function pointer
* @param[in] _close The close function pointer
* @param[in] _poll_read The poll read function pointer
* @param[in] _poll_write The poll write function pointer
* @param[in] _destroy The destroy function pointer
*
* @return
* - ESP_OK
*/
esp_err_t transport_set_func(transport_handle_t t,
connect_func _connect,
io_read_func _read,
io_func _write,
trans_func _close,
poll_func _poll_read,
poll_func _poll_write,
trans_func _destroy);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,39 +0,0 @@
/*
* This file is subject to the terms and conditions defined in
* file 'LICENSE', which is part of this source code package.
* Tuan PM <tuanpm at live dot com>
*/
#ifndef _TRANSPORT_SSL_H_
#define _TRANSPORT_SSL_H_
#include "transport.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief Create new SSL transport, the transport handle must be release transport_destroy callback
*
* @return the allocated transport_handle_t, or NULL if the handle can not be allocated
*/
transport_handle_t transport_ssl_init();
/**
* @brief Set SSL certificate data (as PEM format).
* Note that, this function stores the pointer to data, rather than making a copy.
* So we need to make sure to keep the data lifetime before cleanup the connection
*
* @param t ssl transport
* @param[in] data The pem data
* @param[in] len The length
*/
void transport_ssl_set_cert_data(transport_handle_t t, const char *data, int len);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,27 +0,0 @@
/*
* This file is subject to the terms and conditions defined in
* file 'LICENSE', which is part of this source code package.
* Tuan PM <tuanpm at live dot com>
*/
#ifndef _TRANSPORT_TCP_H_
#define _TRANSPORT_TCP_H_
#include "transport.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief Create TCP transport, the transport handle must be release transport_destroy callback
*
* @return the allocated transport_handle_t, or NULL if the handle can not be allocated
*/
transport_handle_t transport_tcp_init();
#ifdef __cplusplus
}
#endif
#endif

View File

@ -19,6 +19,12 @@ typedef struct {
transport_handle_t parent; transport_handle_t parent;
} transport_ws_t; } transport_ws_t;
transport_handle_t ws_transport_get_payload_transport_handle(transport_handle_t t)
{
transport_ws_t *ws = transport_get_context_data(t);
return ws->parent;
}
static char *trimwhitespace(const char *str) static char *trimwhitespace(const char *str)
{ {
char *end; char *end;
@ -151,6 +157,7 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms)
{ {
transport_ws_t *ws = transport_get_context_data(t); transport_ws_t *ws = transport_get_context_data(t);
int payload_len; int payload_len;
int payload_len_buff = len;
char *data_ptr = buffer, opcode, mask, *mask_key = NULL; char *data_ptr = buffer, opcode, mask, *mask_key = NULL;
int rlen; int rlen;
int poll_read; int poll_read;
@ -161,7 +168,6 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms)
ESP_LOGE(TAG, "Error read data"); ESP_LOGE(TAG, "Error read data");
return rlen; return rlen;
} }
opcode = (*data_ptr & 0x0F); opcode = (*data_ptr & 0x0F);
data_ptr ++; data_ptr ++;
mask = ((*data_ptr >> 7) & 0x01); mask = ((*data_ptr >> 7) & 0x01);
@ -171,6 +177,7 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms)
if (payload_len == 126) { if (payload_len == 126) {
// headerLen += 2; // headerLen += 2;
payload_len = data_ptr[0] << 8 | data_ptr[1]; payload_len = data_ptr[0] << 8 | data_ptr[1];
payload_len_buff = len - 4;
data_ptr += 2; data_ptr += 2;
} else if (payload_len == 127) { } else if (payload_len == 127) {
// headerLen += 8; // headerLen += 8;
@ -182,6 +189,11 @@ static int ws_read(transport_handle_t t, char *buffer, int len, int timeout_ms)
payload_len = data_ptr[4] << 24 | data_ptr[5] << 16 | data_ptr[6] << 8 | data_ptr[7]; payload_len = data_ptr[4] << 24 | data_ptr[5] << 16 | data_ptr[6] << 8 | data_ptr[7];
} }
data_ptr += 8; data_ptr += 8;
payload_len_buff = len - 10;
}
if (payload_len > payload_len_buff) {
ESP_LOGD(TAG, "Actual data received (%d) are longer than mqtt buffer (%d)", payload_len, payload_len_buff);
payload_len = payload_len_buff;
} }
if (mask) { if (mask) {
@ -244,7 +256,7 @@ transport_handle_t transport_ws_init(transport_handle_t parent_handle)
return NULL; return NULL;
}); });
transport_set_func(t, ws_connect, ws_read, ws_write, ws_close, ws_poll_read, ws_poll_write, ws_destroy); transport_set_func(t, ws_connect, ws_read, ws_write, ws_close, ws_poll_read, ws_poll_write, ws_destroy, ws_transport_get_payload_transport_handle);
transport_set_context_data(t, ws); transport_set_context_data(t, ws);
return t; return t;
} }

View File

@ -461,6 +461,11 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
} }
typedef struct {
char *path;
char *buffer;
transport_handle_t parent;
} transport_ws_t;
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
{ {
@ -468,6 +473,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
uint32_t mqtt_topic_length, mqtt_data_length; uint32_t mqtt_topic_length, mqtt_data_length;
uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0; uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0;
int len_read; int len_read;
transport_handle_t transport = client->transport;
do do
{ {
@ -478,9 +484,13 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length; mqtt_len = mqtt_data_length;
/* any further reading only the underlying payload */
transport = transport_get_payload_transport_handle(transport);
} else { } else {
mqtt_len = len_read; mqtt_len = len_read;
mqtt_data = (const char*)client->mqtt_state.in_buffer; mqtt_data = (const char*)client->mqtt_state.in_buffer;
mqtt_topic = NULL;
mqtt_topic_length = 0;
} }
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
@ -498,7 +508,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
break; break;
} }
len_read = transport_read(client->transport, len_read = transport_read(transport,
(char *)client->mqtt_state.in_buffer, (char *)client->mqtt_state.in_buffer,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ? client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ?
client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read, client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read,