forked from espressif/esp-mqtt
fix handling of large messages (#40)
This commit is contained in:
@@ -71,7 +71,7 @@ enum mqtt_connect_return_code
|
|||||||
typedef struct mqtt_message
|
typedef struct mqtt_message
|
||||||
{
|
{
|
||||||
uint8_t* data;
|
uint8_t* data;
|
||||||
uint16_t length;
|
uint32_t length;
|
||||||
|
|
||||||
} mqtt_message_t;
|
} mqtt_message_t;
|
||||||
|
|
||||||
@@ -108,9 +108,9 @@ static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1
|
|||||||
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
|
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
|
||||||
|
|
||||||
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
|
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
|
||||||
int mqtt_get_total_length(uint8_t* buffer, uint16_t length);
|
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length);
|
||||||
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length);
|
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length);
|
||||||
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length);
|
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length);
|
||||||
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length);
|
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length);
|
||||||
|
|
||||||
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
|
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
|
||||||
|
@@ -133,10 +133,10 @@ void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buff
|
|||||||
connection->buffer_length = buffer_length;
|
connection->buffer_length = buffer_length;
|
||||||
}
|
}
|
||||||
|
|
||||||
int mqtt_get_total_length(uint8_t* buffer, uint16_t length)
|
uint32_t mqtt_get_total_length(uint8_t* buffer, uint16_t length)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
int totlen = 0;
|
uint32_t totlen = 0;
|
||||||
|
|
||||||
for (i = 1; i < length; ++i)
|
for (i = 1; i < length; ++i)
|
||||||
{
|
{
|
||||||
@@ -148,11 +148,11 @@ int mqtt_get_total_length(uint8_t* buffer, uint16_t length)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
totlen += i;
|
totlen += i;
|
||||||
|
|
||||||
return totlen;
|
return totlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
|
const char* mqtt_get_publish_topic(uint8_t* buffer, uint32_t* length)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
int totlen = 0;
|
int totlen = 0;
|
||||||
@@ -179,9 +179,9 @@ const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
|
|||||||
|
|
||||||
*length = topiclen;
|
*length = topiclen;
|
||||||
return (const char*)(buffer + i);
|
return (const char*)(buffer + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length)
|
const char* mqtt_get_publish_data(uint8_t* buffer, uint32_t* length)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
int totlen = 0;
|
int totlen = 0;
|
||||||
|
@@ -22,8 +22,8 @@ typedef struct mqtt_state
|
|||||||
uint8_t *out_buffer;
|
uint8_t *out_buffer;
|
||||||
int in_buffer_length;
|
int in_buffer_length;
|
||||||
int out_buffer_length;
|
int out_buffer_length;
|
||||||
uint16_t message_length;
|
uint32_t message_length;
|
||||||
uint16_t message_length_read;
|
uint32_t message_length_read;
|
||||||
mqtt_message_t *outbound_message;
|
mqtt_message_t *outbound_message;
|
||||||
mqtt_connection_t mqtt_connection;
|
mqtt_connection_t mqtt_connection;
|
||||||
uint16_t pending_msg_id;
|
uint16_t pending_msg_id;
|
||||||
@@ -442,25 +442,20 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
|
|||||||
return client->config->event_handle(&client->event);
|
return client->config->event_handle(&client->event);
|
||||||
}
|
}
|
||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
|
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
|
||||||
{
|
{
|
||||||
const char *mqtt_topic, *mqtt_data;
|
const char *mqtt_topic, *mqtt_data;
|
||||||
uint16_t mqtt_topic_length, mqtt_data_length, total_mqtt_len = 0;
|
uint32_t mqtt_topic_length, mqtt_data_length;
|
||||||
uint16_t mqtt_len, mqtt_offset = 0;
|
uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0;
|
||||||
int len_read;
|
int len_read;
|
||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
mqtt_topic_length = length;
|
if (total_mqtt_len == 0){
|
||||||
mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length);
|
|
||||||
mqtt_data_length = length;
|
|
||||||
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
|
|
||||||
|
|
||||||
if(total_mqtt_len == 0){
|
|
||||||
mqtt_topic_length = length;
|
mqtt_topic_length = length;
|
||||||
mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length);
|
mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length);
|
||||||
mqtt_data_length = length;
|
mqtt_data_length = length;
|
||||||
@@ -472,7 +467,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
|||||||
mqtt_data = (const char*)client->mqtt_state.in_buffer;
|
mqtt_data = (const char*)client->mqtt_state.in_buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_data_length, mqtt_topic_length);
|
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
|
||||||
client->event.event_id = MQTT_EVENT_DATA;
|
client->event.event_id = MQTT_EVENT_DATA;
|
||||||
client->event.data = (char *)mqtt_data;
|
client->event.data = (char *)mqtt_data;
|
||||||
client->event.data_len = mqtt_len;
|
client->event.data_len = mqtt_len;
|
||||||
@@ -488,7 +483,8 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
|||||||
|
|
||||||
len_read = transport_read(client->transport,
|
len_read = transport_read(client->transport,
|
||||||
(char *)client->mqtt_state.in_buffer,
|
(char *)client->mqtt_state.in_buffer,
|
||||||
client->mqtt_state.in_buffer_length,
|
client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ?
|
||||||
|
client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
|
||||||
client->config->network_timeout_ms);
|
client->config->network_timeout_ms);
|
||||||
if (len_read <= 0) {
|
if (len_read <= 0) {
|
||||||
ESP_LOGE(TAG, "Read error or timeout: %d", errno);
|
ESP_LOGE(TAG, "Read error or timeout: %d", errno);
|
||||||
|
Reference in New Issue
Block a user