[mqtt-client] receive channel discards oldest message

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina

Differential Revision: https://repo.mireo.local/D26538
This commit is contained in:
Bruno Iljazovic
2023-11-15 10:44:09 +01:00
parent d77c97e3f2
commit a9dd6553f2
6 changed files with 99 additions and 18 deletions

View File

@@ -25,10 +25,6 @@ may complete with, along with the reasons for their occurrence.
to establish a connection with the Broker. The cause of this error may be attributed to the connection
related parameters used during the initialization of the [reflink2 mqtt_client `mqtt_client`].
]]
[[`boost::asio::experimental::error::channel_cancelled`] [
This error occurs in scenarios identical to those causing `boost::asio::error::operation_aborted`
error code but it is exclusive to completion handlers associated with [refmem mqtt_client async_receive] calls.
]]
[[`async_mqtt5::client::error::pid_overrun`] [
This error code signifies that the Client was unable to allocate a Packet Identifier for
the current operation due to the exhaustion of the available identifiers.

View File

@@ -0,0 +1,84 @@
#ifndef ASYNC_MQTT5_CHANNEL_TRAITS_HPP
#define ASYNC_MQTT5_CHANNEL_TRAITS_HPP
#include <deque>
#include <boost/asio/error.hpp>
namespace async_mqtt5::detail {
namespace asio = boost::asio;
using error_code = boost::system::error_code;
template <typename Element>
class bounded_deque {
std::deque<Element> _buffer;
static constexpr size_t MAX_SIZE = 65535;
public:
bounded_deque() = default;
bounded_deque(size_t n) : _buffer(n) {}
size_t size() const { return _buffer.size(); }
template <typename E>
void push_back(E&& e) {
if (_buffer.size() == MAX_SIZE)
_buffer.pop_front();
_buffer.push_back(std::forward<E>(e));
}
void pop_front() { _buffer.pop_front(); }
void clear() { _buffer.clear(); }
const auto& front() const noexcept { return _buffer.front(); }
auto& front() noexcept { return _buffer.front(); }
};
template <typename... Signatures>
struct channel_traits {
template <typename... NewSignatures>
struct rebind {
using other = channel_traits<NewSignatures...>;
};
};
template <typename R, typename... Args>
struct channel_traits<R(error_code, Args...)> {
static_assert(sizeof...(Args) > 0);
template <typename... NewSignatures>
struct rebind {
using other = channel_traits<NewSignatures...>;
};
template <typename Element>
struct container {
using type = bounded_deque<Element>;
};
using receive_cancelled_signature = R(error_code, Args...);
template <typename F>
static void invoke_receive_cancelled(F f) {
std::forward<F>(f)(
asio::error::operation_aborted,
typename std::decay_t<Args>()...
);
}
using receive_closed_signature = R(error_code, Args...);
template <typename F>
static void invoke_receive_closed(F f) {
std::forward<F>(f)(
asio::error::operation_aborted,
typename std::decay_t<Args>()...
);
}
};
} // namespace async_mqtt5::detail
#endif // !ASYNC_MQTT5_CHANNEL_TRAITS_HPP

View File

@@ -1,9 +1,10 @@
#ifndef ASYNC_MQTT5_CLIENT_SERVICE_HPP
#define ASYNC_MQTT5_CLIENT_SERVICE_HPP
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/experimental/basic_concurrent_channel.hpp>
#include <async_mqtt5/detail/internal_types.hpp>
#include <async_mqtt5/detail/channel_traits.hpp>
#include <async_mqtt5/impl/assemble_op.hpp>
#include <async_mqtt5/impl/async_sender.hpp>
@@ -117,7 +118,9 @@ public:
using executor_type = typename stream_type::executor_type;
private:
using tls_context_type = TlsContext;
using receive_channel = asio::experimental::concurrent_channel<
using receive_channel = asio::experimental::basic_concurrent_channel<
asio::any_io_executor,
channel_traits<>,
void (error_code, std::string, std::string, publish_props)
>;
@@ -162,7 +165,7 @@ public:
_stream(ex, _stream_context),
_async_sender(*this),
_active_span(_read_buff.cend(), _read_buff.cend()),
_rec_channel(ex, 128)
_rec_channel(ex, std::numeric_limits<size_t>::max())
{}
executor_type get_executor() const noexcept {
@@ -208,6 +211,11 @@ public:
return _stream_context.connack_prop(p);
}
void run() {
_stream.open();
_rec_channel.reset();
}
void open_stream() {
_stream.open();
}
@@ -224,12 +232,7 @@ public:
_cancel_ping.emit(asio::cancellation_type::terminal);
_cancel_sentry.emit(asio::cancellation_type::terminal);
// cancelling the receive channel invokes all pending handlers with
// ec = asio::experimental::error::channel_cancelled
// adding another ec to the list of the possible client ecs
// TODO: close() the channel instead, and open() it on the next run()
_rec_channel.cancel();
_rec_channel.close();
_replies.cancel_unanswered();
_async_sender.cancel();
_stream.close();

View File

@@ -195,8 +195,6 @@ private:
}
void complete() {
// TODO: if rv == false then the channel buffer is full and
// there is no listener; we may need to log this
/* auto rv = */_svc_ptr->channel_store(std::move(_message));
}
};

View File

@@ -143,7 +143,7 @@ public:
* \brief Start the Client.
*/
void run() {
_svc_ptr->open_stream();
_svc_ptr->run();
detail::ping_op { _svc_ptr }
.perform(read_timeout - std::chrono::seconds(1));
detail::read_message_op { _svc_ptr }.perform();
@@ -637,7 +637,7 @@ public:
* \par Error codes
* The list of all possible error codes that this operation can finish with:\n
* - `boost::system::errc::errc_t::success`\n
* - `boost::asio::experimental::error::channel_cancelled`
* - `boost::asio::error::operation_aborted`\n
*
* Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code.
*/

View File

@@ -38,7 +38,7 @@ void cancel_async_receive() {
c.async_receive([&handlers_called](
error_code ec, std::string, std::string, publish_props
) {
BOOST_CHECK_EQUAL(ec, asio::experimental::error::channel_cancelled);
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
handlers_called++;
});