Files
esp-mqtt/lib/mqtt_outbox.c

222 lines
6.1 KiB
C
Raw Permalink Normal View History

2018-02-16 02:40:16 +07:00
#include "mqtt_outbox.h"
#include <stdint.h>
2018-02-16 02:40:16 +07:00
#include <stdlib.h>
#include <string.h>
#include "mqtt_config.h"
2019-03-14 20:26:28 +08:00
#include "sys/queue.h"
#include "esp_heap_caps.h"
2018-02-16 02:40:16 +07:00
#include "esp_log.h"
#ifndef CONFIG_MQTT_CUSTOM_OUTBOX
static const char *TAG = "outbox";
2018-02-16 02:40:16 +07:00
typedef struct outbox_item {
char *buffer;
int len;
int msg_id;
int msg_type;
int msg_qos;
outbox_tick_t tick;
pending_state_t pending;
STAILQ_ENTRY(outbox_item) next;
} outbox_item_t;
STAILQ_HEAD(outbox_list_t, outbox_item);
struct outbox_t {
_Atomic uint64_t size;
struct outbox_list_t *list;
};
2019-03-19 15:06:20 +08:00
outbox_handle_t outbox_init(void)
2018-02-16 02:40:16 +07:00
{
outbox_handle_t outbox = calloc(1, sizeof(struct outbox_t));
ESP_MEM_CHECK(TAG, outbox, return NULL);
outbox->list = calloc(1, sizeof(struct outbox_list_t));
ESP_MEM_CHECK(TAG, outbox->list, {free(outbox); return NULL;});
outbox->size = 0;
STAILQ_INIT(outbox->list);
2018-02-16 02:40:16 +07:00
return outbox;
}
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
2018-02-16 02:40:16 +07:00
{
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;
item->msg_qos = message->msg_qos;
2018-02-16 02:40:16 +07:00
item->tick = tick;
item->len = message->len + message->remaining_len;
item->pending = QUEUED;
item->buffer = heap_caps_malloc(message->len + message->remaining_len, MQTT_OUTBOX_MEMORY);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
return NULL;
});
memcpy(item->buffer, message->data, message->len);
if (message->remaining_data) {
memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
}
STAILQ_INSERT_TAIL(outbox->list, item, next);
outbox->size += item->len;
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%"PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
2018-02-16 02:40:16 +07:00
return item;
}
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
2018-02-16 02:40:16 +07:00
if (item->msg_id == msg_id) {
return item;
}
}
return NULL;
}
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
2018-02-16 02:40:16 +07:00
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
if (item->pending == pending) {
if (tick) {
*tick = item->tick;
}
2018-02-16 02:40:16 +07:00
return item;
}
}
return NULL;
}
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
if (item == item_to_delete) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
return ESP_OK;
}
}
return ESP_FAIL;
2018-02-16 02:40:16 +07:00
}
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
{
if (item) {
*len = item->len;
*msg_id = item->msg_id;
*msg_type = item->msg_type;
*qos = item->msg_qos;
return (uint8_t *)item->buffer;
}
return NULL;
}
2018-02-16 02:40:16 +07:00
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
2018-02-16 02:40:16 +07:00
free(item->buffer);
free(item);
ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%"PRIu64, msg_id, msg_type, outbox_get_size(outbox));
2018-02-16 02:40:16 +07:00
return ESP_OK;
}
}
return ESP_FAIL;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
2018-02-16 02:40:16 +07:00
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
item->pending = pending;
2018-02-16 02:40:16 +07:00
return ESP_OK;
}
return ESP_FAIL;
}
pending_state_t outbox_item_get_pending(outbox_item_handle_t item)
{
if (item) {
return item->pending;
}
return QUEUED;
}
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
item->tick = tick;
return ESP_OK;
}
return ESP_FAIL;
}
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
int msg_id = -1;
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox->list, next) {
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
free(item->buffer);
outbox->size -= item->len;
msg_id = item->msg_id;
free(item);
return msg_id;
}
}
return msg_id;
}
2018-02-16 02:40:16 +07:00
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
2018-02-16 02:40:16 +07:00
{
int deleted_items = 0;
2018-02-16 02:40:16 +07:00
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
2018-02-16 02:40:16 +07:00
if (current_tick - item->tick > timeout) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
2018-02-16 02:40:16 +07:00
free(item->buffer);
outbox->size -= item->len;
2018-02-16 02:40:16 +07:00
free(item);
deleted_items ++;
2018-02-16 02:40:16 +07:00
}
}
return deleted_items;
2018-02-16 02:40:16 +07:00
}
uint64_t outbox_get_size(outbox_handle_t outbox)
2018-02-16 02:40:16 +07:00
{
return outbox->size;
2018-02-16 02:40:16 +07:00
}
void outbox_delete_all_items(outbox_handle_t outbox)
2018-02-16 02:40:16 +07:00
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox->list, next, tmp) {
STAILQ_REMOVE(outbox->list, item, outbox_item, next);
outbox->size -= item->len;
free(item->buffer);
free(item);
}
}
void outbox_destroy(outbox_handle_t outbox)
{
outbox_delete_all_items(outbox);
free(outbox->list);
2018-02-16 02:40:16 +07:00
free(outbox);
}
2019-03-14 20:26:28 +08:00
#endif /* CONFIG_MQTT_CUSTOM_OUTBOX */