Fix: Return an error when fail to enqueue

The functions that enqueue messages didn't had a return for the
handler, with this the error was only logged instead of returned which
may cause the user to have an ID for a message that was not published.
This commit is contained in:
Euripedes Rocha
2021-06-09 13:54:33 +01:00
parent e24852a4dc
commit 7471177fe7

View File

@ -1004,7 +1004,7 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int
return false;
}
static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
@ -1018,16 +1018,14 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
//unlock
}
static void mqtt_enqueue(esp_mqtt_client_handle_t client)
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client)
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
if (client->mqtt_state.pending_msg_count > 0) {
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
@ -1036,9 +1034,9 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client)
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
//Copy to queue buffer
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
//unlock
return NULL;
}
@ -1635,8 +1633,12 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
mqtt_enqueue(client); //move pending msg to outbox (if have)
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
//move pending msg to outbox (if have)
if (!mqtt_enqueue(client)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
@ -1673,8 +1675,11 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
mqtt_enqueue(client);
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
if (!mqtt_enqueue(client)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
@ -1719,10 +1724,14 @@ static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, cons
client->mqtt_state.pending_msg_count ++;
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
mqtt_enqueue(client);
if (!mqtt_enqueue(client)) {
return -1;
}
} else {
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment);
if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
return -1;
}
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
}
}