added a fix for incomplete header message received (topic or data could not be resolved from msg)

This commit is contained in:
David Cermak
2018-12-04 15:29:14 +01:00
parent db64b79120
commit cf5b8eda89

View File

@@ -511,23 +511,32 @@ typedef struct {
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
{ {
const char *mqtt_topic, *mqtt_data; const char *mqtt_topic = NULL, *mqtt_data = NULL;
uint32_t mqtt_topic_length, mqtt_data_length; uint32_t mqtt_topic_length, mqtt_data_length;
uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0; uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0;
int len_read; int len_read= length;
int max_to_read = client->mqtt_state.in_buffer_length;
int buffer_offset = 0;
esp_transport_handle_t transport = client->transport; esp_transport_handle_t transport = client->transport;
do do
{ {
if (total_mqtt_len == 0) { if (total_mqtt_len == 0) {
mqtt_topic_length = length;
mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length);
mqtt_data_length = length;
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
/* any further reading only the underlying payload */ /* any further reading only the underlying payload */
transport = esp_transport_get_payload_transport_handle(transport); transport = esp_transport_get_payload_transport_handle(transport);
mqtt_data_length = mqtt_topic_length = length;
if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) ||
NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) {
// mqtt header is not complete, continue reading
memmove(client->mqtt_state.in_buffer, message, length);
buffer_offset = length;
message = client->mqtt_state.in_buffer;
max_to_read = client->mqtt_state.in_buffer_length - length;
mqtt_len = 0;
} else {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
}
} else { } else {
mqtt_len = len_read; mqtt_len = len_read;
mqtt_data = (const char*)client->mqtt_state.in_buffer; mqtt_data = (const char*)client->mqtt_state.in_buffer;
@@ -535,15 +544,17 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
mqtt_topic_length = 0; mqtt_topic_length = 0;
} }
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); if (total_mqtt_len != 0) {
client->event.event_id = MQTT_EVENT_DATA; ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.data = (char *)mqtt_data; client->event.event_id = MQTT_EVENT_DATA;
client->event.data_len = mqtt_len; client->event.data = (char *)mqtt_data;
client->event.total_data_len = total_mqtt_len; client->event.data_len = mqtt_len;
client->event.current_data_offset = mqtt_offset; client->event.total_data_len = total_mqtt_len;
client->event.topic = (char *)mqtt_topic; client->event.current_data_offset = mqtt_offset;
client->event.topic_len = mqtt_topic_length; client->event.topic = (char *)mqtt_topic;
esp_mqtt_dispatch_event(client); client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
}
mqtt_offset += mqtt_len; mqtt_offset += mqtt_len;
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) { if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
@@ -551,10 +562,13 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
} }
len_read = esp_transport_read(transport, len_read = esp_transport_read(transport,
(char *)client->mqtt_state.in_buffer, (char *)client->mqtt_state.in_buffer + buffer_offset,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ? client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ?
client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read, max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
client->config->network_timeout_ms); client->config->network_timeout_ms);
length = len_read + buffer_offset;
buffer_offset = 0;
max_to_read = client->mqtt_state.in_buffer_length;
if (len_read <= 0) { if (len_read <= 0) {
ESP_LOGE(TAG, "Read error or timeout: %d", errno); ESP_LOGE(TAG, "Read error or timeout: %d", errno);
break; break;
@@ -562,7 +576,6 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
client->mqtt_state.message_length_read += len_read; client->mqtt_state.message_length_read += len_read;
} while (1); } while (1);
} }
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
@@ -667,8 +680,6 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break; break;
case MQTT_MSG_TYPE_PUBACK: case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {