diff --git a/mqtt_client.c b/mqtt_client.c index 9f27538..d50e3c0 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -511,23 +511,32 @@ typedef struct { static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { - const char *mqtt_topic, *mqtt_data; + const char *mqtt_topic = NULL, *mqtt_data = NULL; uint32_t mqtt_topic_length, mqtt_data_length; - uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0; - int len_read; + uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0; + int len_read= length; + int max_to_read = client->mqtt_state.in_buffer_length; + int buffer_offset = 0; esp_transport_handle_t transport = client->transport; do { if (total_mqtt_len == 0) { - mqtt_topic_length = length; - mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length); - mqtt_data_length = length; - mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); - total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; - mqtt_len = mqtt_data_length; /* any further reading only the underlying payload */ transport = esp_transport_get_payload_transport_handle(transport); + mqtt_data_length = mqtt_topic_length = length; + if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) || + NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) { + // mqtt header is not complete, continue reading + memmove(client->mqtt_state.in_buffer, message, length); + buffer_offset = length; + message = client->mqtt_state.in_buffer; + max_to_read = client->mqtt_state.in_buffer_length - length; + mqtt_len = 0; + } else { + total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; + mqtt_len = mqtt_data_length; + } } else { mqtt_len = len_read; mqtt_data = (const char*)client->mqtt_state.in_buffer; @@ -535,15 +544,17 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i mqtt_topic_length = 0; } - ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); - client->event.event_id = MQTT_EVENT_DATA; - client->event.data = (char *)mqtt_data; - client->event.data_len = mqtt_len; - client->event.total_data_len = total_mqtt_len; - client->event.current_data_offset = mqtt_offset; - client->event.topic = (char *)mqtt_topic; - client->event.topic_len = mqtt_topic_length; - esp_mqtt_dispatch_event(client); + if (total_mqtt_len != 0) { + ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); + client->event.event_id = MQTT_EVENT_DATA; + client->event.data = (char *)mqtt_data; + client->event.data_len = mqtt_len; + client->event.total_data_len = total_mqtt_len; + client->event.current_data_offset = mqtt_offset; + client->event.topic = (char *)mqtt_topic; + client->event.topic_len = mqtt_topic_length; + esp_mqtt_dispatch_event(client); + } mqtt_offset += mqtt_len; if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) { @@ -551,10 +562,13 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i } len_read = esp_transport_read(transport, - (char *)client->mqtt_state.in_buffer, - client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ? - client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read, + (char *)client->mqtt_state.in_buffer + buffer_offset, + client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ? + max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read, client->config->network_timeout_ms); + length = len_read + buffer_offset; + buffer_offset = 0; + max_to_read = client->mqtt_state.in_buffer_length; if (len_read <= 0) { ESP_LOGE(TAG, "Read error or timeout: %d", errno); break; @@ -562,7 +576,6 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i client->mqtt_state.message_length_read += len_read; } while (1); - } static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) @@ -667,8 +680,6 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); - - deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); break; case MQTT_MSG_TYPE_PUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {