mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-29 18:28:24 +02:00
Fix for case where multiple MQTT messages are fitted into a single TCP packet : iterate over the buffer received.
Merges https://github.com/espressif/esp-mqtt/pull/82 Closes #64 Closes #80
This commit is contained in:
committed by
David Cermak
parent
1e787461dc
commit
db64b79120
@ -606,6 +606,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
uint8_t msg_type;
|
||||
uint8_t msg_qos;
|
||||
uint16_t msg_id;
|
||||
uint32_t transport_message_offset = 0 ;
|
||||
|
||||
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 1000);
|
||||
|
||||
@ -618,9 +619,13 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
|
||||
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
|
||||
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
|
||||
// In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here
|
||||
while ( transport_message_offset < read_len ){
|
||||
// If the message was valid, get the type, quality of service and id of the message
|
||||
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
switch (msg_type)
|
||||
@ -656,9 +661,12 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
// return ESP_FAIL;
|
||||
}
|
||||
}
|
||||
client->mqtt_state.message_length_read = read_len;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
|
||||
|
||||
// Deliver the publish message
|
||||
client->mqtt_state.message_length_read = read_len - transport_message_offset;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
|
||||
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
|
||||
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
|
||||
break;
|
||||
@ -695,6 +703,9 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
break;
|
||||
}
|
||||
|
||||
transport_message_offset += mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset) ;
|
||||
}
|
||||
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user