feat: Introduces outbox limit

A memory limit for the outbox can be configured.
User will not be able to publish or enqueue if the new message goes
beyond the configured limit.
This commit is contained in:
Euripedes Rocha
2023-03-30 15:16:14 +02:00
parent 21a5491d53
commit 372ab7b374
10 changed files with 210 additions and 141 deletions

View File

@ -1,10 +1,12 @@
#include "mqtt_client.h"
#include "esp_transport.h"
#include "mqtt_client_priv.h"
#include "esp_log.h"
#include <stdint.h>
#include "esp_err.h"
#include "esp_log.h"
#include "esp_heap_caps.h"
#include "esp_transport.h"
#include "mqtt_client.h"
#include "mqtt_client_priv.h"
#include "mqtt_msg.h"
#include "mqtt_outbox.h"
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
#ifdef ESP_EVENT_ANY_ID
@ -564,6 +566,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
goto _mqtt_set_config_failed;
}
}
client->config->outbox_limit = config->outbox.limit;
esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict(client->config, config);
MQTT_API_UNLOCK(client);
@ -811,7 +814,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
// use separate value for output buffer size if configured
int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size;
if (mqtt_connection_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) {
if (mqtt_msg_buffer_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) {
goto _mqtt_init_failed;
}
@ -871,7 +874,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
vEventGroupDelete(client->status_bits);
}
free(client->mqtt_state.in_buffer);
mqtt_connection_destroy(&client->mqtt_state.connection);
mqtt_msg_buffer_destroy(&client->mqtt_state.connection);
if (client->api_lock) {
vSemaphoreDelete(client->api_lock);
}
@ -983,7 +986,7 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
ret = esp_event_loop_run(client->config->event_loop_handle, 0);
#else
return ESP_FAIL;
return ESP_FAIL;
#endif
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
@ -1277,7 +1280,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, read_len);
#endif
@ -1456,7 +1459,7 @@ static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item
}
} else if (client->mqtt_state.pending_publish_qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
@ -1808,6 +1811,10 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
if (client->config->outbox_limit > 0 && outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
@ -1828,16 +1835,16 @@ int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
return -1;
}
mqtt5_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->subscribe_property_info = NULL;
}
#endif
} else {
mqtt_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id);
topic_list, size,
&client->mqtt_state.pending_msg_id);
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Subscribe message cannot be created");
@ -1885,16 +1892,16 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
topic,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->unsubscribe_property_info = NULL;
}
#endif
} else {
mqtt_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id);
topic,
&client->mqtt_state.pending_msg_id);
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
MQTT_API_UNLOCK(client);
@ -1928,18 +1935,18 @@ static int make_publish(esp_mqtt_client_handle_t client, const char *topic, cons
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
topic, data, len,
qos, retain,
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->publish_property_info = NULL;
}
#endif
} else {
mqtt_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id);
topic, data, len,
qos, retain,
&pending_msg_id);
}
if (client->mqtt_state.connection.outbound_message.length == 0) {
@ -2008,6 +2015,13 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
len = strlen(data);
}
if (client->config->outbox_limit > 0 && qos > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
MQTT_API_UNLOCK(client);
return -2;
}
}
int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false);
if (pending_msg_id < 0) {
MQTT_API_UNLOCK(client);
@ -2064,7 +2078,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
if (qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
@ -2102,6 +2116,13 @@ int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic,
len = strlen(data);
}
if (client->config->outbox_limit > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}
}
MQTT_API_LOCK(client);
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {