From a9dd6553f235a78376f57f31beca64f8ad95de85 Mon Sep 17 00:00:00 2001 From: Bruno Iljazovic Date: Wed, 15 Nov 2023 10:44:09 +0100 Subject: [PATCH] [mqtt-client] receive channel discards oldest message Reviewers: ivica Reviewed By: ivica Subscribers: korina Differential Revision: https://repo.mireo.local/D26538 --- doc/qbk/reference/Error_handling.qbk | 4 - include/async_mqtt5/detail/channel_traits.hpp | 84 +++++++++++++++++++ include/async_mqtt5/impl/client_service.hpp | 21 +++-- include/async_mqtt5/impl/publish_rec_op.hpp | 2 - include/async_mqtt5/mqtt_client.hpp | 4 +- test/unit/test/cancellation.cpp | 2 +- 6 files changed, 99 insertions(+), 18 deletions(-) create mode 100644 include/async_mqtt5/detail/channel_traits.hpp diff --git a/doc/qbk/reference/Error_handling.qbk b/doc/qbk/reference/Error_handling.qbk index 1170ab8..4bd47e2 100644 --- a/doc/qbk/reference/Error_handling.qbk +++ b/doc/qbk/reference/Error_handling.qbk @@ -25,10 +25,6 @@ may complete with, along with the reasons for their occurrence. to establish a connection with the Broker. The cause of this error may be attributed to the connection related parameters used during the initialization of the [reflink2 mqtt_client `mqtt_client`]. ]] - [[`boost::asio::experimental::error::channel_cancelled`] [ - This error occurs in scenarios identical to those causing `boost::asio::error::operation_aborted` - error code but it is exclusive to completion handlers associated with [refmem mqtt_client async_receive] calls. - ]] [[`async_mqtt5::client::error::pid_overrun`] [ This error code signifies that the Client was unable to allocate a Packet Identifier for the current operation due to the exhaustion of the available identifiers. diff --git a/include/async_mqtt5/detail/channel_traits.hpp b/include/async_mqtt5/detail/channel_traits.hpp new file mode 100644 index 0000000..e1732e0 --- /dev/null +++ b/include/async_mqtt5/detail/channel_traits.hpp @@ -0,0 +1,84 @@ +#ifndef ASYNC_MQTT5_CHANNEL_TRAITS_HPP +#define ASYNC_MQTT5_CHANNEL_TRAITS_HPP + +#include + +#include + +namespace async_mqtt5::detail { + +namespace asio = boost::asio; +using error_code = boost::system::error_code; + +template +class bounded_deque { + std::deque _buffer; + static constexpr size_t MAX_SIZE = 65535; + +public: + bounded_deque() = default; + bounded_deque(size_t n) : _buffer(n) {} + + size_t size() const { return _buffer.size(); } + + template + void push_back(E&& e) { + if (_buffer.size() == MAX_SIZE) + _buffer.pop_front(); + _buffer.push_back(std::forward(e)); + } + + void pop_front() { _buffer.pop_front(); } + + void clear() { _buffer.clear(); } + + const auto& front() const noexcept { return _buffer.front(); } + + auto& front() noexcept { return _buffer.front(); } +}; + +template +struct channel_traits { + template + struct rebind { + using other = channel_traits; + }; +}; + +template +struct channel_traits { + static_assert(sizeof...(Args) > 0); + template + struct rebind { + using other = channel_traits; + }; + + template + struct container { + using type = bounded_deque; + }; + + using receive_cancelled_signature = R(error_code, Args...); + + template + static void invoke_receive_cancelled(F f) { + std::forward(f)( + asio::error::operation_aborted, + typename std::decay_t()... + ); + } + + using receive_closed_signature = R(error_code, Args...); + + template + static void invoke_receive_closed(F f) { + std::forward(f)( + asio::error::operation_aborted, + typename std::decay_t()... + ); + } +}; + +} // namespace async_mqtt5::detail + +#endif // !ASYNC_MQTT5_CHANNEL_TRAITS_HPP diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index b70dfe8..099d71e 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -1,9 +1,10 @@ #ifndef ASYNC_MQTT5_CLIENT_SERVICE_HPP #define ASYNC_MQTT5_CLIENT_SERVICE_HPP -#include +#include #include +#include #include #include @@ -117,7 +118,9 @@ public: using executor_type = typename stream_type::executor_type; private: using tls_context_type = TlsContext; - using receive_channel = asio::experimental::concurrent_channel< + using receive_channel = asio::experimental::basic_concurrent_channel< + asio::any_io_executor, + channel_traits<>, void (error_code, std::string, std::string, publish_props) >; @@ -162,7 +165,7 @@ public: _stream(ex, _stream_context), _async_sender(*this), _active_span(_read_buff.cend(), _read_buff.cend()), - _rec_channel(ex, 128) + _rec_channel(ex, std::numeric_limits::max()) {} executor_type get_executor() const noexcept { @@ -208,6 +211,11 @@ public: return _stream_context.connack_prop(p); } + void run() { + _stream.open(); + _rec_channel.reset(); + } + void open_stream() { _stream.open(); } @@ -224,12 +232,7 @@ public: _cancel_ping.emit(asio::cancellation_type::terminal); _cancel_sentry.emit(asio::cancellation_type::terminal); - // cancelling the receive channel invokes all pending handlers with - // ec = asio::experimental::error::channel_cancelled - // adding another ec to the list of the possible client ecs - - // TODO: close() the channel instead, and open() it on the next run() - _rec_channel.cancel(); + _rec_channel.close(); _replies.cancel_unanswered(); _async_sender.cancel(); _stream.close(); diff --git a/include/async_mqtt5/impl/publish_rec_op.hpp b/include/async_mqtt5/impl/publish_rec_op.hpp index 381613e..421f3ec 100644 --- a/include/async_mqtt5/impl/publish_rec_op.hpp +++ b/include/async_mqtt5/impl/publish_rec_op.hpp @@ -195,8 +195,6 @@ private: } void complete() { - // TODO: if rv == false then the channel buffer is full and - // there is no listener; we may need to log this /* auto rv = */_svc_ptr->channel_store(std::move(_message)); } }; diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index c348737..9445654 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -143,7 +143,7 @@ public: * \brief Start the Client. */ void run() { - _svc_ptr->open_stream(); + _svc_ptr->run(); detail::ping_op { _svc_ptr } .perform(read_timeout - std::chrono::seconds(1)); detail::read_message_op { _svc_ptr }.perform(); @@ -637,7 +637,7 @@ public: * \par Error codes * The list of all possible error codes that this operation can finish with:\n * - `boost::system::errc::errc_t::success`\n - * - `boost::asio::experimental::error::channel_cancelled` + * - `boost::asio::error::operation_aborted`\n * * Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code. */ diff --git a/test/unit/test/cancellation.cpp b/test/unit/test/cancellation.cpp index 832d0f2..5144183 100644 --- a/test/unit/test/cancellation.cpp +++ b/test/unit/test/cancellation.cpp @@ -38,7 +38,7 @@ void cancel_async_receive() { c.async_receive([&handlers_called]( error_code ec, std::string, std::string, publish_props ) { - BOOST_CHECK_EQUAL(ec, asio::experimental::error::channel_cancelled); + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); handlers_called++; });