diff --git a/mqtt_client.c b/mqtt_client.c index a5afa3c..5bf3af9 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -521,7 +521,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } else { client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS; } - + client->config->transport = config->network.transport; if (config->network.if_name) { @@ -1062,7 +1062,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) } #endif } else { - // get topic + // get and save topic msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len); if (msg_topic == NULL) { ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__); @@ -1077,6 +1077,10 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) return ESP_FAIL; } } + char *saved_msg_topic = strndup(msg_topic, msg_topic_len); + ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM); + size_t saved_msg_topic_len = msg_topic_len; + // post data event client->event.retain = mqtt_get_retain(msg_buf); if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { @@ -1089,7 +1093,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) client->event.qos = mqtt_get_qos(msg_buf); client->event.dup = mqtt_get_dup(msg_buf); client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len; -post_data_event: + ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT, NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len), client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset)); @@ -1101,24 +1105,39 @@ post_data_event: client->event.topic_len = msg_topic_len; esp_mqtt_dispatch_event(client); - if (msg_read_len < msg_total_len) { + client->event.topic = saved_msg_topic; + client->event.topic_len = saved_msg_topic_len; + while(msg_read_len < msg_total_len) { size_t buf_len = client->mqtt_state.in_buffer_length; - msg_data = (char *)client->mqtt_state.in_buffer; - msg_topic = NULL; - msg_topic_len = 0; msg_data_offset += msg_data_len; - int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, - msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, - client->config->network_timeout_ms); + + size_t read_len; + if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) { + read_len = buf_len - saved_msg_topic_len; + } else { + read_len = msg_total_len - msg_read_len; + } + + int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms); if (ret <= 0) { return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL; } msg_data_len = ret; msg_read_len += msg_data_len; - goto post_data_event; + + ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT, + NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len), + client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset)); + + client->event.data_len = msg_data_len; + client->event.current_data_offset = msg_data_offset; + + esp_mqtt_dispatch_event(client); + } + free(saved_msg_topic); return ESP_OK; }