mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-29 18:28:24 +02:00
minor fixes for issues present with fragmented/packed data
This commit is contained in:
158
mqtt_client.c
158
mqtt_client.c
@ -536,12 +536,9 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
||||
} else {
|
||||
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
|
||||
mqtt_len = mqtt_data_length;
|
||||
if (client->mqtt_state.message_length_read < client->mqtt_state.message_length) {
|
||||
/* if message is framented -> correct the size for the first DATA event */
|
||||
mqtt_data_length = client->mqtt_state.message_length_read - ((uint8_t*)mqtt_data- message);
|
||||
}
|
||||
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
|
||||
/* read msg id only once */
|
||||
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
|
||||
client->event.msg_id = mqtt_get_id(message, length);
|
||||
}
|
||||
} else {
|
||||
mqtt_len = len_read;
|
||||
@ -555,7 +552,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
|
||||
client->event.event_id = MQTT_EVENT_DATA;
|
||||
client->event.data = (char *)mqtt_data;
|
||||
client->event.data_len = mqtt_len;
|
||||
client->event.total_data_len = total_mqtt_len;
|
||||
client->event.total_data_len = mqtt_data_length;
|
||||
client->event.current_data_offset = mqtt_offset;
|
||||
client->event.topic = (char *)mqtt_topic;
|
||||
client->event.topic_len = mqtt_topic_length;
|
||||
@ -660,88 +657,87 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
|
||||
}
|
||||
|
||||
// In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here
|
||||
while ( transport_message_offset < read_len ){
|
||||
// If the message was valid, get the type, quality of service and id of the message
|
||||
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
while ( transport_message_offset < read_len ) {
|
||||
// If the message was valid, get the type, quality of service and id of the message
|
||||
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
|
||||
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
|
||||
client->mqtt_state.message_length_read = read_len - transport_message_offset;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
switch (msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "Subscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "UnSubscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
if (msg_qos == 1) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
else if (msg_qos == 2) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
|
||||
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
|
||||
// TODO: Shoule reconnect?
|
||||
// return ESP_FAIL;
|
||||
switch (msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "Subscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
|
||||
ESP_LOGD(TAG, "UnSubscribe successful");
|
||||
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
if (msg_qos == 1) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
else if (msg_qos == 2) {
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver the publish message
|
||||
client->mqtt_state.message_length_read = read_len - transport_message_offset;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
|
||||
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
|
||||
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
if (mqtt_write_data(client) != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
|
||||
// TODO: Shoule reconnect?
|
||||
// return ESP_FAIL;
|
||||
}
|
||||
}
|
||||
// Deliver the publish message
|
||||
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length);
|
||||
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
mqtt_write_data(client);
|
||||
|
||||
transport_message_offset += mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset) ;
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
|
||||
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
|
||||
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
|
||||
client->event.event_id = MQTT_EVENT_PUBLISHED;
|
||||
esp_mqtt_dispatch_event_with_msgid(client);
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
|
||||
client->wait_for_ping_resp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
transport_message_offset += client->mqtt_state.message_length;
|
||||
}
|
||||
|
||||
return ESP_OK;
|
||||
|
Reference in New Issue
Block a user