mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-07-30 10:48:06 +02:00
client: queued oversized messaged even if not connected
This commit is contained in:
@ -1496,8 +1496,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
/* We have to set as pending all the qos>0 messages */
|
/* We have to set as pending all the qos>0 messages */
|
||||||
|
client->mqtt_state.outbound_message = publish_msg;
|
||||||
if (qos > 0) {
|
if (qos > 0) {
|
||||||
client->mqtt_state.outbound_message = publish_msg;
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_id = pending_msg_id;
|
client->mqtt_state.pending_msg_id = pending_msg_id;
|
||||||
client->mqtt_state.pending_publish_qos = qos;
|
client->mqtt_state.pending_publish_qos = qos;
|
||||||
@ -1505,9 +1505,10 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
||||||
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
||||||
mqtt_enqueue(client);
|
mqtt_enqueue(client);
|
||||||
|
} else {
|
||||||
|
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||||
|
mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
client->mqtt_state.outbound_message = publish_msg;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Skip sending if not connected (rely on resending) */
|
/* Skip sending if not connected (rely on resending) */
|
||||||
@ -1530,22 +1531,14 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
}
|
}
|
||||||
|
|
||||||
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||||
|
client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0;
|
||||||
|
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||||
remaining_len -= data_sent;
|
remaining_len -= data_sent;
|
||||||
current_data += data_sent;
|
current_data += data_sent;
|
||||||
|
|
||||||
if (remaining_len > 0) {
|
if (remaining_len > 0) {
|
||||||
mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection;
|
mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection;
|
||||||
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
|
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
|
||||||
if (connection->message.fragmented_msg_data_offset) {
|
|
||||||
// asked to enqueue oversized message (first time only)
|
|
||||||
connection->message.fragmented_msg_data_offset = 0;
|
|
||||||
connection->message.fragmented_msg_total_length = 0;
|
|
||||||
if (qos > 0) {
|
|
||||||
// internally enqueue all big messages, as they dont fit 'pending msg' structure
|
|
||||||
mqtt_enqueue_oversized(client, (uint8_t *)current_data, remaining_len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (remaining_len > connection->buffer_length) {
|
if (remaining_len > connection->buffer_length) {
|
||||||
// Continue with sending
|
// Continue with sending
|
||||||
memcpy(connection->buffer, current_data, connection->buffer_length);
|
memcpy(connection->buffer, current_data, connection->buffer_length);
|
||||||
@ -1573,10 +1566,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
|
|||||||
return pending_msg_id;
|
return pending_msg_id;
|
||||||
|
|
||||||
cannot_publish:
|
cannot_publish:
|
||||||
|
// clear out possible fragmented publish if failed or skipped
|
||||||
|
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||||
if (qos == 0) {
|
if (qos == 0) {
|
||||||
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
|
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
|
||||||
}
|
}
|
||||||
MQTT_API_UNLOCK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user