mirror of
				https://github.com/espressif/esp-mqtt.git
				synced 2025-11-04 09:02:14 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			394 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			394 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
 | 
						|
 *
 | 
						|
 * SPDX-License-Identifier: Unlicense OR CC0-1.0
 | 
						|
 */
 | 
						|
#include <algorithm>
 | 
						|
#include <cstddef>
 | 
						|
#include <exception>
 | 
						|
#include <deque>
 | 
						|
#include <cstdint>
 | 
						|
#include <memory>
 | 
						|
#include <ranges>
 | 
						|
#include <utility>
 | 
						|
#include <vector>
 | 
						|
#include <string>
 | 
						|
#include <memory_resource>
 | 
						|
 | 
						|
#include "esp_log.h"
 | 
						|
#include "mqtt_outbox.h"
 | 
						|
 | 
						|
constexpr auto TAG = "custom_outbox";
 | 
						|
 | 
						|
/*
 | 
						|
 * The trace resource class is created here as an example on how to build a custom memory resource
 | 
						|
 * The class is only needed to show where we are allocating from and to track allocations and deallocations.
 | 
						|
 */
 | 
						|
class trace_resource : public std::pmr::memory_resource {
 | 
						|
public:
 | 
						|
    explicit trace_resource(std::string resource_name,  std::pmr::memory_resource *upstream_resource = std::pmr::get_default_resource()) : upstream{upstream_resource}, name{std::move(resource_name)} {}
 | 
						|
    [[nodiscard]] std::string_view get_name() const noexcept
 | 
						|
    {
 | 
						|
        return std::string_view(name);
 | 
						|
    }
 | 
						|
    [[nodiscard]] auto upstream_resource() const
 | 
						|
    {
 | 
						|
        return upstream;
 | 
						|
    }
 | 
						|
private:
 | 
						|
    void *do_allocate(std::size_t bytes,  std::size_t alignment) override
 | 
						|
    {
 | 
						|
        auto *allocated = upstream->allocate(bytes, alignment);
 | 
						|
        allocated_total += bytes;
 | 
						|
        ESP_LOGI(name.c_str(), "%s: %zu bytes allocated, %zu total bytes in use", name.c_str(), bytes, allocated_total);
 | 
						|
        return allocated;
 | 
						|
    }
 | 
						|
    void do_deallocate(void *ptr, std::size_t bytes, std::size_t alignment) override
 | 
						|
    {
 | 
						|
        upstream->deallocate(ptr, bytes, alignment);
 | 
						|
        ESP_LOGI(name.c_str(), "%s: %zu bytes deallocated, %zu total bytes in use", name.c_str(), bytes, allocated_total);
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] bool do_is_equal(const std::pmr::memory_resource &other) const noexcept override
 | 
						|
    {
 | 
						|
        return this == &other;
 | 
						|
    }
 | 
						|
    size_t allocated_total{};
 | 
						|
    std::pmr::memory_resource *upstream;
 | 
						|
    std::string name;
 | 
						|
};
 | 
						|
 | 
						|
struct outbox_item {
 | 
						|
    /* Defining the allocator_type to let compiler know that our type is allocator aware,
 | 
						|
     * This way the allocator used for the outbox is propagated to the messages*/
 | 
						|
    using allocator_type = std::pmr::polymorphic_allocator<>;
 | 
						|
 | 
						|
    /* Few strong types to diferetiate parameters*/
 | 
						|
    enum class id_t : int {};
 | 
						|
    enum class type_t : int {};
 | 
						|
    enum class qos_t : int {};
 | 
						|
 | 
						|
    /* Allocator aware constructors */
 | 
						|
    outbox_item(
 | 
						|
        std::pmr::vector<uint8_t> message,
 | 
						|
        id_t msg_id,
 | 
						|
        type_t msg_type,
 | 
						|
        qos_t msg_qos,
 | 
						|
        outbox_tick_t tick,
 | 
						|
        pending_state_t pending_state,
 | 
						|
        allocator_type alloc = {}
 | 
						|
    ) : message(std::move(message), alloc), id(msg_id), type(msg_type), qos(msg_qos), tick(tick), pending_state(pending_state) {}
 | 
						|
 | 
						|
    /*Copy and move constructors have an extra allocator parameter, for copy default and allocator aware are the same.*/
 | 
						|
    outbox_item(const outbox_item &other, allocator_type alloc = {}) : message(other.message, alloc), id(other.id), type(other.type), qos(other.qos), tick(other.tick), pending_state(other.pending_state) {}
 | 
						|
    outbox_item(outbox_item &&other, allocator_type alloc) noexcept : message(std::move(other.message), alloc), id(other.id), type(other.type), qos(other.qos),  tick(other.tick), pending_state(other.pending_state)
 | 
						|
    {}
 | 
						|
 | 
						|
    outbox_item(const outbox_item &) = default;
 | 
						|
    outbox_item(outbox_item &&other) = default;
 | 
						|
    outbox_item &operator=(const outbox_item &rhs) = default;
 | 
						|
    outbox_item &operator=(outbox_item &&other) = default;
 | 
						|
    ~outbox_item() = default;
 | 
						|
 | 
						|
    /* Getters to support outbox operation */
 | 
						|
    [[nodiscard]] auto state() const noexcept
 | 
						|
    {
 | 
						|
        return pending_state;
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] allocator_type get_allocator() const
 | 
						|
    {
 | 
						|
        return message.get_allocator();
 | 
						|
    }
 | 
						|
 | 
						|
    void set(pending_state state) noexcept
 | 
						|
    {
 | 
						|
        pending_state = state;
 | 
						|
    }
 | 
						|
 | 
						|
    void set(outbox_tick_t n_tick) noexcept
 | 
						|
    {
 | 
						|
        tick = n_tick;
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] auto get_id() const noexcept
 | 
						|
    {
 | 
						|
        return id;
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] auto get_type() const noexcept
 | 
						|
    {
 | 
						|
        return type;
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] auto get_tick() const noexcept
 | 
						|
    {
 | 
						|
        return tick;
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] auto get_data(size_t *len, uint16_t *msg_id, int *msg_type, int *msg_qos)
 | 
						|
    {
 | 
						|
        *len = message.size();
 | 
						|
        *msg_id = static_cast<uint16_t>(id);
 | 
						|
        *msg_type = static_cast<int>(type);
 | 
						|
        *msg_qos =  static_cast<int>(qos);
 | 
						|
        return message.data();
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] auto get_size() const noexcept
 | 
						|
    {
 | 
						|
        return message.size();
 | 
						|
    }
 | 
						|
 | 
						|
private:
 | 
						|
    std::pmr::vector<uint8_t> message;
 | 
						|
    id_t id;
 | 
						|
    type_t type;
 | 
						|
    qos_t qos;
 | 
						|
    outbox_tick_t tick;
 | 
						|
    pending_state_t pending_state;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * For the outbox_t we let the special member functions as default and
 | 
						|
 * we don't extend the allocator aware versions for the sake of the simplicity, since the operations are not needed in the usage.
 | 
						|
 */
 | 
						|
struct outbox_t {
 | 
						|
    using allocator_type = std::pmr::polymorphic_allocator<>;
 | 
						|
    explicit outbox_t(allocator_type alloc = {}) : queue(alloc) {}
 | 
						|
 | 
						|
    outbox_item_handle_t get(outbox_item::id_t msg_id)
 | 
						|
    {
 | 
						|
        if (auto item = std::ranges::find_if(queue, [msg_id](auto & item) {
 | 
						|
        return item.get_id() == msg_id;
 | 
						|
        });
 | 
						|
        item != std::end(queue)) {
 | 
						|
            return &(*item);
 | 
						|
        }
 | 
						|
        return nullptr;
 | 
						|
    }
 | 
						|
 | 
						|
    int delete_expired(outbox_tick_t current_tick, outbox_tick_t timeout)
 | 
						|
    {
 | 
						|
        return std::erase_if(queue, [current_tick, timeout, this](const outbox_item & item) {
 | 
						|
            if (current_tick - item.get_tick() > timeout) {
 | 
						|
                total_size -= item.get_size();
 | 
						|
                return true;
 | 
						|
            }
 | 
						|
            return false;
 | 
						|
        });
 | 
						|
    }
 | 
						|
 | 
						|
    outbox_item::id_t delete_single_expired(outbox_tick_t current_tick, outbox_tick_t timeout)
 | 
						|
    {
 | 
						|
        if (auto erase = std::ranges::find_if(queue, [current_tick, timeout](auto & item) {
 | 
						|
        return (current_tick - item.get_tick() > timeout);
 | 
						|
        }); erase != std::end(queue)) {
 | 
						|
            auto msg_id = erase->get_id();
 | 
						|
            total_size -= erase->get_size();
 | 
						|
            queue.erase(erase);
 | 
						|
            return msg_id;
 | 
						|
        }
 | 
						|
        return outbox_item::id_t{-1};
 | 
						|
    }
 | 
						|
 | 
						|
    auto erase(outbox_item_handle_t to_erase)
 | 
						|
    {
 | 
						|
        return erase_if([to_erase](auto & item) {
 | 
						|
            return &item == to_erase;
 | 
						|
        });
 | 
						|
    }
 | 
						|
 | 
						|
    auto erase(outbox_item::id_t msg_id, outbox_item::type_t msg_type)
 | 
						|
    {
 | 
						|
        return erase_if([msg_id, msg_type](auto & item) {
 | 
						|
            return (item.get_id() == msg_id && (item.get_type() == msg_type));
 | 
						|
        });
 | 
						|
    }
 | 
						|
 | 
						|
    [[nodiscard]] auto size() const noexcept
 | 
						|
    {
 | 
						|
        return total_size;
 | 
						|
    }
 | 
						|
 | 
						|
    void clear()
 | 
						|
    {
 | 
						|
        queue.clear();
 | 
						|
    }
 | 
						|
 | 
						|
    outbox_item_handle_t enqueue(outbox_message_handle_t message, outbox_tick_t tick) noexcept
 | 
						|
    {
 | 
						|
        try {
 | 
						|
            auto &item =
 | 
						|
                queue.emplace_back(std::pmr::vector<uint8_t> {message->data, message->data + message->len},
 | 
						|
                                   outbox_item::id_t{message->msg_id},
 | 
						|
                                   outbox_item::type_t{message->msg_type},
 | 
						|
                                   outbox_item::qos_t{message->msg_qos},
 | 
						|
                                   tick,
 | 
						|
                                   QUEUED
 | 
						|
                                  );
 | 
						|
            total_size += item.get_size();
 | 
						|
            ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%" PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(this));
 | 
						|
            return &item;
 | 
						|
        } catch (const std::exception &e) {
 | 
						|
            return nullptr;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    outbox_item_handle_t dequeue(pending_state_t state, outbox_tick_t *tick)
 | 
						|
    {
 | 
						|
        if (auto item = std::ranges::find_if(queue, [state](auto & item) {
 | 
						|
        return item.state() == state;
 | 
						|
        });
 | 
						|
        item != std::end(queue)) {
 | 
						|
            if (tick != nullptr) {
 | 
						|
                *tick = item->get_tick();
 | 
						|
            }
 | 
						|
            return &(*item);
 | 
						|
        }
 | 
						|
        return nullptr;
 | 
						|
    }
 | 
						|
    [[nodiscard]] allocator_type get_allocator() const
 | 
						|
    {
 | 
						|
        return queue.get_allocator();
 | 
						|
    }
 | 
						|
private:
 | 
						|
    [[nodiscard]] esp_err_t erase_if(std::predicate<outbox_item &> auto &&predicate)
 | 
						|
    {
 | 
						|
        if (auto to_erase = std::ranges::find_if(queue, predicate); to_erase != std::end(queue)) {
 | 
						|
            total_size -= to_erase->get_size();
 | 
						|
            queue.erase(to_erase);
 | 
						|
            return ESP_OK;
 | 
						|
        }
 | 
						|
        return ESP_FAIL;
 | 
						|
    }
 | 
						|
    std::size_t total_size{};
 | 
						|
    std::pmr::deque<outbox_item> queue ;
 | 
						|
};
 | 
						|
 | 
						|
extern "C" {
 | 
						|
 | 
						|
    outbox_handle_t outbox_init()
 | 
						|
    {
 | 
						|
        /* First we create a fixed size memory buffer to be used. */
 | 
						|
        static constexpr  auto work_memory_size = 16 * 1024;
 | 
						|
        static std::array<std::byte, work_memory_size> resource_buffer{};
 | 
						|
        try {
 | 
						|
            /*
 | 
						|
             * Since the outbox is managed by a C API we can't rely on C++ automatic cleanup and smart pointers but, on production code it would be better to add the
 | 
						|
             * memory resources to outbox_t, applying RAII principles, and make only outbox_item allocator aware. For the sake of the example we are keeping them
 | 
						|
             * separated to explictly show the relations.
 | 
						|
             * First we create the monotonic buffer and add null_memory_resource as upstream. This way if our working memory is exausted an exception is thrown.
 | 
						|
                */
 | 
						|
            auto *monotonic_resource = new std::pmr::monotonic_buffer_resource{resource_buffer.data(), resource_buffer.size(), std::pmr::null_memory_resource()};
 | 
						|
            /*Here we add our custom trace wrapper type to trace allocations and deallocations*/
 | 
						|
            auto  *trace_monotonic = new trace_resource("Monotonic", monotonic_resource);
 | 
						|
 | 
						|
            /* We compose monotonic buffer with pool resource, since the monotonic deallocate is a no-op and we need to remove messages to not go out of memory.*/
 | 
						|
            auto  *pool_resource = new std::pmr::unsynchronized_pool_resource{trace_monotonic};
 | 
						|
            auto  *trace_pool = new trace_resource("Pool", pool_resource);
 | 
						|
            /* Our outbox class is created using the trace_pool as memory resource */
 | 
						|
            auto *outbox = new outbox_t{trace_pool};
 | 
						|
            return outbox;
 | 
						|
        } catch (const std::exception &e) {
 | 
						|
            ESP_LOGD(TAG, "Not enough memory to construct the outbox, review the resource_buffer size");
 | 
						|
            return nullptr;
 | 
						|
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
 | 
						|
    {
 | 
						|
        return outbox->enqueue(message, tick);
 | 
						|
    }
 | 
						|
 | 
						|
    outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
 | 
						|
    {
 | 
						|
        return outbox->get(outbox_item::id_t{msg_id});
 | 
						|
    }
 | 
						|
 | 
						|
    outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
 | 
						|
    {
 | 
						|
        return outbox->dequeue(pending, tick);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
uint8_t *outbox_item_get_data(outbox_item_handle_t item,  size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
 | 
						|
{
 | 
						|
    if (item == nullptr) {
 | 
						|
        return nullptr;
 | 
						|
    }
 | 
						|
    return item->get_data(len, msg_id, msg_type, qos);
 | 
						|
}
 | 
						|
 | 
						|
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
 | 
						|
{
 | 
						|
    return outbox->erase(item_to_delete);
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
 | 
						|
{
 | 
						|
    return outbox->erase(outbox_item::id_t{msg_id}, outbox_item::type_t{msg_type});
 | 
						|
}
 | 
						|
 | 
						|
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
 | 
						|
{
 | 
						|
    return static_cast<int>(outbox->delete_single_expired(current_tick, timeout));
 | 
						|
}
 | 
						|
 | 
						|
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
 | 
						|
{
 | 
						|
    return outbox->delete_expired(current_tick, timeout);
 | 
						|
}
 | 
						|
 | 
						|
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
 | 
						|
{
 | 
						|
    if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) {
 | 
						|
        item->set(pending);
 | 
						|
        return ESP_OK;
 | 
						|
    }
 | 
						|
    return ESP_FAIL;
 | 
						|
}
 | 
						|
 | 
						|
pending_state_t outbox_item_get_pending(outbox_item_handle_t item)
 | 
						|
{
 | 
						|
    if (item != nullptr) {
 | 
						|
        return item->state();
 | 
						|
    }
 | 
						|
    return QUEUED;
 | 
						|
}
 | 
						|
 | 
						|
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
 | 
						|
{
 | 
						|
    if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) {
 | 
						|
        item->set(tick);
 | 
						|
        return ESP_OK;
 | 
						|
    }
 | 
						|
    return ESP_FAIL;
 | 
						|
}
 | 
						|
 | 
						|
uint64_t outbox_get_size(outbox_handle_t outbox)
 | 
						|
{
 | 
						|
    return outbox->size();
 | 
						|
}
 | 
						|
 | 
						|
void outbox_delete_all_items(outbox_handle_t outbox)
 | 
						|
{
 | 
						|
    outbox->clear();
 | 
						|
}
 | 
						|
 | 
						|
void outbox_destroy(outbox_handle_t outbox)
 | 
						|
{
 | 
						|
    auto  *trace_pool = static_cast<trace_resource *>(outbox->get_allocator().resource());
 | 
						|
    auto *pool_resource = static_cast<std::pmr::unsynchronized_pool_resource *>(trace_pool->upstream_resource());
 | 
						|
    auto  *trace_monotonic = static_cast<trace_resource *>(pool_resource->upstream_resource());
 | 
						|
    auto *monotonic_resource = static_cast<std::pmr::monotonic_buffer_resource *>(trace_monotonic->upstream_resource());
 | 
						|
 | 
						|
    delete monotonic_resource;
 | 
						|
    delete trace_monotonic;
 | 
						|
    delete pool_resource;
 | 
						|
    delete trace_pool;
 | 
						|
    delete outbox;
 | 
						|
}
 |