Merge branch 'feature/message_topic_in_multichunk_msg' into 'master'

feat: Include message topic in all chunks

Closes IDFGH-11179

See merge request espressif/esp-mqtt!219
This commit is contained in:
Rocha Euripedes
2024-08-22 18:57:03 +08:00

View File

@ -521,7 +521,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
} else { } else {
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS; client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
} }
client->config->transport = config->network.transport; client->config->transport = config->network.transport;
if (config->network.if_name) { if (config->network.if_name) {
@ -1062,7 +1062,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
} }
#endif #endif
} else { } else {
// get topic // get and save topic
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len); msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) { if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__); ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
@ -1077,6 +1077,10 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
return ESP_FAIL; return ESP_FAIL;
} }
} }
char *saved_msg_topic = strndup(msg_topic, msg_topic_len);
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
size_t saved_msg_topic_len = msg_topic_len;
// post data event // post data event
client->event.retain = mqtt_get_retain(msg_buf); client->event.retain = mqtt_get_retain(msg_buf);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
@ -1089,7 +1093,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.qos = mqtt_get_qos(msg_buf); client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buf); client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len; client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT, ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len), NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset)); client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
@ -1101,24 +1105,39 @@ post_data_event:
client->event.topic_len = msg_topic_len; client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client); esp_mqtt_dispatch_event(client);
if (msg_read_len < msg_total_len) { client->event.topic = saved_msg_topic;
client->event.topic_len = saved_msg_topic_len;
while(msg_read_len < msg_total_len) {
size_t buf_len = client->mqtt_state.in_buffer_length; size_t buf_len = client->mqtt_state.in_buffer_length;
msg_data = (char *)client->mqtt_state.in_buffer; msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = NULL;
msg_topic_len = 0;
msg_data_offset += msg_data_len; msg_data_offset += msg_data_len;
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, size_t read_len;
client->config->network_timeout_ms); if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) {
read_len = buf_len - saved_msg_topic_len;
} else {
read_len = msg_total_len - msg_read_len;
}
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms);
if (ret <= 0) { if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL; return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
} }
msg_data_len = ret; msg_data_len = ret;
msg_read_len += msg_data_len; msg_read_len += msg_data_len;
goto post_data_event;
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
esp_mqtt_dispatch_event(client);
} }
free(saved_msg_topic);
return ESP_OK; return ESP_OK;
} }