mirror of
https://github.com/espressif/esp-mqtt.git
synced 2025-08-03 20:55:14 +02:00
Merge enqueue functions
- enqueue and oversized enqueue did the same work with small differences this clean up the extra unnecessary function
This commit is contained in:
@@ -1128,39 +1128,20 @@ static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_ty
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
|
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
|
||||||
{
|
|
||||||
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
|
|
||||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
|
||||||
//lock mutex
|
|
||||||
outbox_message_t msg = { 0 };
|
|
||||||
msg.data = client->mqtt_state.outbound_message->data;
|
|
||||||
msg.len = client->mqtt_state.outbound_message->length;
|
|
||||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
|
||||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
|
||||||
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
|
||||||
msg.remaining_data = remaining_data;
|
|
||||||
msg.remaining_len = remaining_len;
|
|
||||||
//Copy to queue buffer
|
|
||||||
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
|
||||||
//unlock
|
|
||||||
}
|
|
||||||
|
|
||||||
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client)
|
|
||||||
{
|
{
|
||||||
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
|
||||||
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
|
||||||
if (client->mqtt_state.pending_msg_count > 0) {
|
|
||||||
outbox_message_t msg = { 0 };
|
outbox_message_t msg = { 0 };
|
||||||
msg.data = client->mqtt_state.outbound_message->data;
|
msg.data = client->mqtt_state.outbound_message->data;
|
||||||
msg.len = client->mqtt_state.outbound_message->length;
|
msg.len = client->mqtt_state.outbound_message->length;
|
||||||
msg.msg_id = client->mqtt_state.pending_msg_id;
|
msg.msg_id = client->mqtt_state.pending_msg_id;
|
||||||
msg.msg_type = client->mqtt_state.pending_msg_type;
|
msg.msg_type = client->mqtt_state.pending_msg_type;
|
||||||
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
msg.msg_qos = client->mqtt_state.pending_publish_qos;
|
||||||
|
msg.remaining_data = remaining_data;
|
||||||
|
msg.remaining_len = remaining_len;
|
||||||
//Copy to queue buffer
|
//Copy to queue buffer
|
||||||
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -1875,7 +1856,7 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
|
|||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
//move pending msg to outbox (if have)
|
//move pending msg to outbox (if have)
|
||||||
if (!mqtt_enqueue(client)) {
|
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||||
MQTT_API_UNLOCK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@@ -1933,7 +1914,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
|
|||||||
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
if (!mqtt_enqueue(client)) {
|
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||||
MQTT_API_UNLOCK(client);
|
MQTT_API_UNLOCK(client);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@@ -1984,12 +1965,12 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
|
|||||||
client->mqtt_state.pending_msg_count ++;
|
client->mqtt_state.pending_msg_count ++;
|
||||||
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
|
||||||
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
|
||||||
if (!mqtt_enqueue(client)) {
|
if (!mqtt_enqueue(client, NULL, 0)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
|
||||||
if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
|
if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
|
||||||
|
Reference in New Issue
Block a user