Allow different bound executors on completion handlers

Summary: Resolves T15251, #23

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina, miljen

Maniphest Tasks: T15251

Differential Revision: https://repo.mireo.local/D32249
This commit is contained in:
Bruno Iljazovic
2024-11-27 10:49:15 +01:00
parent 913d8a102d
commit 944de413a3
17 changed files with 316 additions and 246 deletions

View File

@ -19,15 +19,7 @@ Upon creating an instance of the __Client__, it is necessary to provide an execu
The specified executor (or __ExecutionContext__'s executor) will become the default executor associated with the __Client__ The specified executor (or __ExecutionContext__'s executor) will become the default executor associated with the __Client__
and will be used for the execution of all the intermediate operations and a final completion handler for all asynchronous operations and will be used for the execution of all the intermediate operations and a final completion handler for all asynchronous operations
that have not bound an executor. that have not bound an executor.
If an executor is bound to an asynchronous operation, that executor will be used instead. If an executor is bound to an asynchronous operation, that executor will be used for the final completion handler instead.
The [refmem mqtt_client async_run] operation starts the __Client__, which initiates a series of internal asynchronous operations, all of which need an executor.
If the [refmem mqtt_client async_run] is called with a completion handler that has an associated executor,
then all the internal asynchronous operations will associate the same executor.
[important
The same executor *must* execute [refmem mqtt_client async_run] and all the subsequent `async_xxx` operations.
]
The following examples will demonstrate the previously described interactions. The following examples will demonstrate the previously described interactions.

View File

@ -68,16 +68,15 @@ or explicit strand.
Specifically, use __POST__ or __DISPATCH__ to delegate a function call to a strand, Specifically, use __POST__ or __DISPATCH__ to delegate a function call to a strand,
or __CO_SPAWN__ to spawn the coroutine into the strand. or __CO_SPAWN__ to spawn the coroutine into the strand.
For asynchronous functions, this will ensure that the initiation code is executed within the strand in a thread-safe manner. For asynchronous functions, this will ensure that the initiation code is executed within the strand in a thread-safe manner.
The associated executor of all `async_xxx`'s completion handlers must be the same strand. __Client__'s executor must be that same strand.
This will guarantee that the entire sequence of operations This will guarantee that the entire sequence of operations
- from the initiation code through any intermediate operations to the execution of the completion handler - - the initiation code and any intermediate operation -
is carried out within the strand, thereby ensuring thread safety. is carried out within the strand, thereby ensuring thread safety.
[important [important
To conclude, to achieve thread safety, To conclude, to achieve thread safety,
all the member functions of the __Client__ *must* be executed in *the same strand*. all the member functions of the __Client__ *must* be executed in *the same strand*.
This strand *must* be the associated executor of all the completion handlers across This strand must be given in the __Client__ constructor.
all `async_xxx` invocations.
] ]
The examples below demonstrate how to publish a "Hello World" Application Message The examples below demonstrate how to publish a "Hello World" Application Message

View File

@ -75,7 +75,10 @@ public:
template <typename... Args> template <typename... Args>
void complete(Args&&... args) { void complete(Args&&... args) {
asio::get_associated_cancellation_slot(_handler).clear(); asio::get_associated_cancellation_slot(_handler).clear();
std::move(_handler)(std::forward<Args>(args)...); asio::dispatch(
_handler_ex,
asio::prepend(std::move(_handler), std::forward<Args>(args)...)
);
} }
template <typename... Args> template <typename... Args>

View File

@ -91,9 +91,9 @@ public:
return asio::get_associated_allocator(_handler); return asio::get_associated_allocator(_handler);
} }
using executor_type = typename client_service::executor_type; using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _svc.get_executor(); return asio::get_associated_executor(_handler);
} }
template <typename CompletionCondition> template <typename CompletionCondition>
@ -111,6 +111,7 @@ public:
if (cc(error_code {}, 0) == 0 && _data_span.size()) { if (cc(error_code {}, 0) == 0 && _data_span.size()) {
return asio::post( return asio::post(
_svc.get_executor(),
asio::prepend( asio::prepend(
std::move(*this), on_read {}, error_code {}, std::move(*this), on_read {}, error_code {},
0, std::move(cc) 0, std::move(cc)

View File

@ -79,14 +79,6 @@ public:
return !_handler; return !_handler;
} }
auto get_executor() {
return asio::get_associated_executor(_handler);
}
auto get_allocator() {
return asio::get_associated_allocator(_handler);
}
bool throttled() const { bool throttled() const {
return _flags & send_flag::throttled; return _flags & send_flag::throttled;
} }
@ -148,6 +140,11 @@ public:
return allocator_type {}; return allocator_type {};
} }
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept {
return _svc.get_executor();
}
serial_num_t next_serial_num() { serial_num_t next_serial_num() {
return _last_serial_num = write_req::next_serial_num(_last_serial_num); return _last_serial_num = write_req::next_serial_num(_last_serial_num);
} }
@ -293,17 +290,9 @@ private:
_svc._replies.clear_fast_replies(); _svc._replies.clear_fast_replies();
auto alloc = write_queue.front().get_allocator();
auto ex = write_queue.front().get_executor();
_svc._stream.async_write( _svc._stream.async_write(
buffers, buffers,
asio::bind_allocator( asio::prepend(std::ref(*this), std::move(write_queue))
alloc,
asio::bind_executor(
ex,
asio::prepend(std::ref(*this), std::move(write_queue))
)
)
); );
} }

View File

@ -13,7 +13,6 @@
#include <string> #include <string>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include <vector>
#include <boost/asio/async_result.hpp> #include <boost/asio/async_result.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
@ -28,10 +27,7 @@
#include <async_mqtt5/impl/assemble_op.hpp> #include <async_mqtt5/impl/assemble_op.hpp>
#include <async_mqtt5/impl/async_sender.hpp> #include <async_mqtt5/impl/async_sender.hpp>
#include <async_mqtt5/impl/autoconnect_stream.hpp> #include <async_mqtt5/impl/autoconnect_stream.hpp>
#include <async_mqtt5/impl/ping_op.hpp>
#include <async_mqtt5/impl/read_message_op.hpp>
#include <async_mqtt5/impl/replies.hpp> #include <async_mqtt5/impl/replies.hpp>
#include <async_mqtt5/impl/sentry_op.hpp>
namespace async_mqtt5::detail { namespace async_mqtt5::detail {
@ -235,16 +231,19 @@ private:
void (error_code, std::string, std::string, publish_props) void (error_code, std::string, std::string, publish_props)
>; >;
template <typename ClientService, typename Handler>
friend class run_op;
template <typename ClientService> template <typename ClientService>
friend class async_sender; friend class async_sender;
template <typename ClientService, typename Handler> template <typename ClientService, typename Handler>
friend class assemble_op; friend class assemble_op;
template <typename ClientService, typename Executor> template <typename ClientService, typename Handler>
friend class ping_op; friend class ping_op;
template <typename ClientService, typename Executor> template <typename ClientService, typename Handler>
friend class sentry_op; friend class sentry_op;
template <typename ClientService> template <typename ClientService>
@ -264,10 +263,8 @@ private:
receive_channel _rec_channel; receive_channel _rec_channel;
asio::cancellation_signal _cancel_ping; asio::steady_timer _ping_timer;
asio::cancellation_signal _cancel_sentry; asio::steady_timer _sentry_timer;
asio::any_completion_handler<void(error_code)> _run_handler;
client_service(const client_service& other) : client_service(const client_service& other) :
_executor(other._executor), _stream_context(other._stream_context), _executor(other._executor), _stream_context(other._stream_context),
@ -275,7 +272,9 @@ private:
_replies(_executor), _replies(_executor),
_async_sender(*this), _async_sender(*this),
_active_span(_read_buff.cend(), _read_buff.cend()), _active_span(_read_buff.cend(), _read_buff.cend()),
_rec_channel(_executor, std::numeric_limits<size_t>::max()) _rec_channel(_executor, std::numeric_limits<size_t>::max()),
_ping_timer(_executor),
_sentry_timer(_executor)
{ {
_stream.clone_endpoints(other._stream); _stream.clone_endpoints(other._stream);
} }
@ -292,7 +291,9 @@ public:
_replies(ex), _replies(ex),
_async_sender(*this), _async_sender(*this),
_active_span(_read_buff.cend(), _read_buff.cend()), _active_span(_read_buff.cend(), _read_buff.cend()),
_rec_channel(ex, std::numeric_limits<size_t>::max()) _rec_channel(ex, std::numeric_limits<size_t>::max()),
_ping_timer(ex),
_sentry_timer(ex)
{} {}
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
@ -385,21 +386,6 @@ public:
return _stream_context.connack_properties(); return _stream_context.connack_properties();
} }
template <typename Handler>
void run(Handler&& handler) {
_run_handler = std::move(handler);
auto slot = asio::get_associated_cancellation_slot(_run_handler);
if (slot.is_connected()) {
using c_t = asio::cancellation_type_t;
slot.assign([&svc = *this](c_t c) {
if ((c & c_t::terminal) != c_t::none)
svc.cancel();
});
}
_stream.open();
_rec_channel.reset();
}
void open_stream() { void open_stream() {
_stream.open(); _stream.open();
} }
@ -413,25 +399,16 @@ public:
} }
void cancel() { void cancel() {
if (!_run_handler) return; if (!_stream.is_open()) return;
_cancel_ping.emit(asio::cancellation_type::terminal); _ping_timer.cancel();
_cancel_sentry.emit(asio::cancellation_type::terminal); _sentry_timer.cancel();
_rec_channel.close(); _rec_channel.close();
_replies.cancel_unanswered(); _replies.cancel_unanswered();
_async_sender.cancel(); _async_sender.cancel();
_stream.cancel(); _stream.cancel();
_stream.close(); _stream.close();
asio::get_associated_cancellation_slot(_run_handler).clear();
asio::post(
get_executor(),
asio::prepend(
std::move(_run_handler),
asio::error::operation_aborted
)
);
} }
uint16_t allocate_pid() { uint16_t allocate_pid() {
@ -469,7 +446,7 @@ public:
} }
} }
_cancel_ping.emit(asio::cancellation_type::total); _ping_timer.cancel();
} }
bool channel_store(decoders::publish_message message) { bool channel_store(decoders::publish_message message) {
@ -532,31 +509,6 @@ public:
}; };
template <typename ClientService>
class initiate_async_run {
std::shared_ptr<ClientService> _svc_ptr;
public:
explicit initiate_async_run(std::shared_ptr<ClientService> svc_ptr) :
_svc_ptr(std::move(svc_ptr))
{}
using executor_type = typename ClientService::executor_type;
executor_type get_executor() const noexcept {
return _svc_ptr->get_executor();
}
template <typename Handler>
void operator()(Handler&& handler) {
auto ex = asio::get_associated_executor(handler, get_executor());
_svc_ptr->run(std::move(handler));
detail::ping_op { _svc_ptr, ex }.perform();
detail::read_message_op { _svc_ptr, ex }.perform();
detail::sentry_op { _svc_ptr, ex }.perform();
}
};
} // namespace async_mqtt5::detail } // namespace async_mqtt5::detail
#endif // !ASYNC_MQTT5_CLIENT_SERVICE_HPP #endif // !ASYNC_MQTT5_CLIENT_SERVICE_HPP

View File

@ -82,9 +82,9 @@ public:
return asio::get_associated_allocator(_handler); return asio::get_associated_allocator(_handler);
} }
using executor_type = asio::associated_executor_t<handler_type>; using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler); return _svc_ptr->get_executor();
} }
void perform() { void perform() {
@ -188,9 +188,7 @@ class terminal_disconnect_op {
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
std::unique_ptr<asio::steady_timer> _timer; std::unique_ptr<asio::steady_timer> _timer;
using handler_type = cancellable_handler< using handler_type = Handler;
Handler, typename ClientService::executor_type
>;
handler_type _handler; handler_type _handler;
public: public:
@ -200,7 +198,7 @@ public:
) : ) :
_svc_ptr(std::move(svc_ptr)), _svc_ptr(std::move(svc_ptr)),
_timer(new asio::steady_timer(_svc_ptr->get_executor())), _timer(new asio::steady_timer(_svc_ptr->get_executor())),
_handler(std::move(handler), _svc_ptr->get_executor()) _handler(std::move(handler))
{} {}
terminal_disconnect_op(terminal_disconnect_op&&) = default; terminal_disconnect_op(terminal_disconnect_op&&) = default;
@ -230,10 +228,10 @@ public:
auto init_disconnect = []( auto init_disconnect = [](
auto handler, disconnect_ctx ctx, auto handler, disconnect_ctx ctx,
const std::shared_ptr<ClientService>& svc_ptr std::shared_ptr<ClientService> svc_ptr
) { ) {
disconnect_op { disconnect_op {
svc_ptr, std::move(ctx), std::move(handler) std::move(svc_ptr), std::move(ctx), std::move(handler)
}.perform(); }.perform();
}; };
@ -248,7 +246,7 @@ public:
); );
timed_disconnect.async_wait( timed_disconnect.async_wait(
asioex::wait_for_one(), asio::prepend(std::move(*this)) asioex::wait_for_one(), std::move(*this)
); );
} }
@ -256,7 +254,7 @@ public:
std::array<std::size_t, 2> /* ord */, std::array<std::size_t, 2> /* ord */,
error_code disconnect_ec, error_code /* timer_ec */ error_code disconnect_ec, error_code /* timer_ec */
) { ) {
_handler.complete(disconnect_ec); std::move(_handler)(disconnect_ec);
} }
}; };
@ -291,12 +289,12 @@ public:
template <typename ClientService, typename CompletionToken> template <typename ClientService, typename CompletionToken>
decltype(auto) async_disconnect( decltype(auto) async_disconnect(
disconnect_rc_e reason_code, const disconnect_props& props, disconnect_rc_e reason_code, const disconnect_props& props,
const std::shared_ptr<ClientService>& svc_ptr, std::shared_ptr<ClientService> svc_ptr,
CompletionToken&& token CompletionToken&& token
) { ) {
using Signature = void (error_code); using Signature = void (error_code);
return asio::async_initiate<CompletionToken, Signature>( return asio::async_initiate<CompletionToken, Signature>(
initiate_async_disconnect<ClientService, false>(svc_ptr), token, initiate_async_disconnect<ClientService, false>(std::move(svc_ptr)), token,
reason_code, props reason_code, props
); );
} }
@ -304,12 +302,12 @@ decltype(auto) async_disconnect(
template <typename ClientService, typename CompletionToken> template <typename ClientService, typename CompletionToken>
decltype(auto) async_terminal_disconnect( decltype(auto) async_terminal_disconnect(
disconnect_rc_e reason_code, const disconnect_props& props, disconnect_rc_e reason_code, const disconnect_props& props,
const std::shared_ptr<ClientService>& svc_ptr, std::shared_ptr<ClientService> svc_ptr,
CompletionToken&& token CompletionToken&& token
) { ) {
using Signature = void (error_code); using Signature = void (error_code);
return asio::async_initiate<CompletionToken, Signature>( return asio::async_initiate<CompletionToken, Signature>(
initiate_async_disconnect<ClientService, true>(svc_ptr), token, initiate_async_disconnect<ClientService, true>(std::move(svc_ptr)), token,
reason_code, props reason_code, props
); );
} }

View File

@ -10,13 +10,10 @@
#include <limits> #include <limits>
#include <chrono> #include <chrono>
#include <memory>
#include <boost/asio/cancellation_state.hpp> #include <boost/asio/error.hpp>
#include <boost/asio/consign.hpp> #include <boost/asio/consign.hpp>
#include <boost/asio/prepend.hpp> #include <boost/asio/prepend.hpp>
#include <boost/asio/recycling_allocator.hpp>
#include <boost/asio/steady_timer.hpp>
#include <async_mqtt5/detail/control_packet.hpp> #include <async_mqtt5/detail/control_packet.hpp>
#include <async_mqtt5/detail/internal_types.hpp> #include <async_mqtt5/detail/internal_types.hpp>
@ -27,33 +24,20 @@ namespace async_mqtt5::detail {
namespace asio = boost::asio; namespace asio = boost::asio;
template <typename ClientService, typename Executor> template <typename ClientService, typename Handler>
class ping_op { class ping_op {
public:
using executor_type = Executor;
private:
using client_service = ClientService; using client_service = ClientService;
using handler_type = Handler;
struct on_timer {}; struct on_timer {};
struct on_pingreq {}; struct on_pingreq {};
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
executor_type _executor; handler_type _handler;
std::unique_ptr<asio::steady_timer> _ping_timer;
asio::cancellation_state _cancellation_state;
public: public:
ping_op( ping_op(std::shared_ptr<client_service> svc_ptr, Handler&& handler) :
std::shared_ptr<client_service> svc_ptr, _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler))
executor_type ex
) :
_svc_ptr(std::move(svc_ptr)), _executor(ex),
_ping_timer(new asio::steady_timer(_svc_ptr->get_executor())),
_cancellation_state(
_svc_ptr->_cancel_ping.slot(),
asio::enable_total_cancellation {},
asio::enable_total_cancellation {}
)
{} {}
ping_op(ping_op&&) noexcept = default; ping_op(ping_op&&) noexcept = default;
@ -62,37 +46,28 @@ public:
ping_op& operator=(ping_op&&) noexcept = default; ping_op& operator=(ping_op&&) noexcept = default;
ping_op& operator=(const ping_op&) = delete; ping_op& operator=(const ping_op&) = delete;
using allocator_type = asio::recycling_allocator<void>; using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept { allocator_type get_allocator() const noexcept {
return allocator_type {}; return asio::get_associated_allocator(_handler);
}
using cancellation_slot_type = asio::cancellation_slot;
cancellation_slot_type get_cancellation_slot() const noexcept {
return _cancellation_state.slot();
} }
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _executor; return _svc_ptr->get_executor();
} }
void perform() { void perform() {
_ping_timer->expires_after(compute_wait_time()); _svc_ptr->_ping_timer.expires_after(compute_wait_time());
_ping_timer->async_wait( _svc_ptr->_ping_timer.async_wait(
asio::prepend(std::move(*this), on_timer {}) asio::prepend(std::move(*this), on_timer {})
); );
} }
void operator()(on_timer, error_code) { void operator()(on_timer, error_code ec) {
if ( if (!_svc_ptr->is_open())
_cancellation_state.cancelled() == asio::cancellation_type::terminal || return complete();
!_svc_ptr->is_open() else if (ec == asio::error::operation_aborted)
)
return;
else if (_cancellation_state.cancelled() == asio::cancellation_type::total) {
_cancellation_state.clear();
return perform(); return perform();
}
auto pingreq = control_packet<allocator_type>::of( auto pingreq = control_packet<allocator_type>::of(
no_pid, get_allocator(), encoders::encode_pingreq no_pid, get_allocator(), encoders::encode_pingreq
@ -110,14 +85,10 @@ public:
} }
void operator()(on_pingreq, error_code ec) { void operator()(on_pingreq, error_code ec) {
if ( if (!ec || ec == asio::error::try_again)
_cancellation_state.cancelled() == asio::cancellation_type::terminal || return perform();
ec == asio::error::no_recovery
)
return;
_cancellation_state.clear(); complete();
perform();
} }
private: private:
@ -127,6 +98,10 @@ private:
std::chrono::seconds(negotiated_ka) : std::chrono::seconds(negotiated_ka) :
duration(std::numeric_limits<duration::rep>::max()); duration(std::numeric_limits<duration::rep>::max());
} }
void complete() {
return std::move(_handler)();
}
}; };

View File

@ -106,9 +106,9 @@ public:
return asio::get_associated_allocator(_handler); return asio::get_associated_allocator(_handler);
} }
using executor_type = asio::associated_executor_t<handler_type>; using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler); return _svc_ptr->get_executor();
} }
void perform( void perform(

View File

@ -8,7 +8,6 @@
#ifndef ASYNC_MQTT5_READ_MESSAGE_OP_HPP #ifndef ASYNC_MQTT5_READ_MESSAGE_OP_HPP
#define ASYNC_MQTT5_READ_MESSAGE_OP_HPP #define ASYNC_MQTT5_READ_MESSAGE_OP_HPP
#include <chrono>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
@ -30,24 +29,20 @@ namespace async_mqtt5::detail {
namespace asio = boost::asio; namespace asio = boost::asio;
template <typename ClientService, typename Executor> template <typename ClientService, typename Handler>
class read_message_op { class read_message_op {
public:
using executor_type = Executor;
private:
using client_service = ClientService; using client_service = ClientService;
using handler_type = Handler;
struct on_message {}; struct on_message {};
struct on_disconnect {}; struct on_disconnect {};
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
executor_type _executor; handler_type _handler;
public: public:
read_message_op( read_message_op(std::shared_ptr<client_service> svc_ptr, Handler&& handler)
std::shared_ptr<client_service> svc_ptr, : _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler))
executor_type ex
) :
_svc_ptr(std::move(svc_ptr)), _executor(ex)
{} {}
read_message_op(read_message_op&&) noexcept = default; read_message_op(read_message_op&&) noexcept = default;
@ -56,13 +51,14 @@ public:
read_message_op& operator=(read_message_op&&) noexcept = default; read_message_op& operator=(read_message_op&&) noexcept = default;
read_message_op& operator=(const read_message_op&) = delete; read_message_op& operator=(const read_message_op&) = delete;
using allocator_type = asio::recycling_allocator<void>; using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept { allocator_type get_allocator() const noexcept {
return allocator_type {}; return asio::get_associated_allocator(_handler);
} }
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _executor; return _svc_ptr->get_executor();
} }
void perform() { void perform() {
@ -82,17 +78,19 @@ public:
); );
if (ec == asio::error::no_recovery) if (ec == asio::error::no_recovery)
return _svc_ptr->cancel(); _svc_ptr->cancel();
if (ec == asio::error::operation_aborted) if (ec)
return; return complete();
dispatch(control_code, first, last); dispatch(control_code, first, last);
} }
void operator()(on_disconnect, error_code ec) { void operator()(on_disconnect, error_code ec) {
if (!ec) if (ec)
perform(); return complete();
perform();
} }
private: private:
@ -150,6 +148,9 @@ private:
); );
} }
void complete() {
return std::move(_handler)();
}
}; };

View File

@ -62,7 +62,7 @@ private:
error_code ec, error_code ec,
byte_citer first = byte_citer {}, byte_citer last = byte_citer {} byte_citer first = byte_citer {}, byte_citer last = byte_citer {}
) { ) {
asio::dispatch(asio::prepend(std::move(_handler), ec, first, last)); std::move(_handler)(ec, first, last);
} }
void complete_post(const executor_type& ex, error_code ec) { void complete_post(const executor_type& ex, error_code ec) {

View File

@ -0,0 +1,142 @@
//
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASYNC_MQTT5_RUN_OP_HPP
#define ASYNC_MQTT5_RUN_OP_HPP
#include <memory>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/associated_cancellation_slot.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <async_mqtt5/detail/cancellable_handler.hpp>
#include <async_mqtt5/detail/control_packet.hpp>
#include <async_mqtt5/detail/internal_types.hpp>
#include <async_mqtt5/impl/read_message_op.hpp>
#include <async_mqtt5/impl/ping_op.hpp>
#include <async_mqtt5/impl/sentry_op.hpp>
namespace async_mqtt5::detail {
namespace asio = boost::asio;
template <typename ClientService, typename Handler>
class run_op {
using client_service = ClientService;
std::shared_ptr<client_service> _svc_ptr;
using handler_type = cancellable_handler<
Handler,
typename client_service::executor_type
>;
handler_type _handler;
public:
run_op(
std::shared_ptr<client_service> svc_ptr,
Handler&& handler
) :
_svc_ptr(std::move(svc_ptr)),
_handler(std::move(handler), _svc_ptr->get_executor())
{
auto slot = asio::get_associated_cancellation_slot(_handler);
if (slot.is_connected())
slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
svc.cancel();
});
}
run_op(run_op&&) noexcept = default;
run_op(const run_op&) = delete;
run_op& operator=(run_op&&) noexcept = default;
run_op& operator=(const run_op&) = delete;
using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
}
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept {
return _svc_ptr->get_executor();
}
void perform() {
namespace asioex = boost::asio::experimental;
_svc_ptr->_stream.open();
_svc_ptr->_rec_channel.reset();
auto init_read_message_op = [](
auto handler, std::shared_ptr<client_service> svc_ptr
) {
return read_message_op { std::move(svc_ptr), std::move(handler) }
.perform();
};
auto init_ping_op = [](
auto handler, std::shared_ptr<client_service> svc_ptr
) {
return ping_op { std::move(svc_ptr), std::move(handler) }
.perform();
};
auto init_senty_op = [](
auto handler, std::shared_ptr<client_service> svc_ptr
) {
return sentry_op { std::move(svc_ptr), std::move(handler) }
.perform();
};
asioex::make_parallel_group(
asio::async_initiate<const asio::deferred_t, void ()>(
init_read_message_op, asio::deferred, _svc_ptr
),
asio::async_initiate<const asio::deferred_t, void ()>(
init_ping_op, asio::deferred, _svc_ptr
),
asio::async_initiate<const asio::deferred_t, void ()>(
init_senty_op, asio::deferred, _svc_ptr
)
).async_wait(asioex::wait_for_all(), std::move(*this));
}
void operator()(std::array<std::size_t, 3> /* ord */) {
_handler.complete(asio::error::operation_aborted);
}
};
template <typename ClientService>
class initiate_async_run {
std::shared_ptr<ClientService> _svc_ptr;
public:
explicit initiate_async_run(std::shared_ptr<ClientService> svc_ptr) :
_svc_ptr(std::move(svc_ptr))
{}
using executor_type = typename ClientService::executor_type;
executor_type get_executor() const noexcept {
return _svc_ptr->get_executor();
}
template <typename Handler>
void operator()(Handler&& handler) {
run_op<ClientService, Handler> {
_svc_ptr, std::move(handler)
}.perform();
}
};
} // end namespace async_mqtt5::detail
#endif // !ASYNC_MQTT5_RUN_OP_HPP

View File

@ -11,11 +11,7 @@
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/prepend.hpp> #include <boost/asio/prepend.hpp>
#include <boost/asio/recycling_allocator.hpp>
#include <boost/asio/steady_timer.hpp>
#include <async_mqtt5/error.hpp> #include <async_mqtt5/error.hpp>
#include <async_mqtt5/types.hpp> #include <async_mqtt5/types.hpp>
@ -26,12 +22,10 @@ namespace async_mqtt5::detail {
namespace asio = boost::asio; namespace asio = boost::asio;
template <typename ClientService, typename Executor> template <typename ClientService, typename Handler>
class sentry_op { class sentry_op {
public:
using executor_type = Executor;
private:
using client_service = ClientService; using client_service = ClientService;
using handler_type = Handler;
struct on_timer {}; struct on_timer {};
struct on_disconnect {}; struct on_disconnect {};
@ -39,15 +33,11 @@ private:
static constexpr auto check_interval = std::chrono::seconds(3); static constexpr auto check_interval = std::chrono::seconds(3);
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
executor_type _executor; handler_type _handler;
std::unique_ptr<asio::steady_timer> _sentry_timer;
public: public:
sentry_op( sentry_op(std::shared_ptr<client_service> svc_ptr, Handler&& handler) :
std::shared_ptr<client_service> svc_ptr, executor_type ex _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler))
) :
_svc_ptr(std::move(svc_ptr)), _executor(ex),
_sentry_timer(new asio::steady_timer(_svc_ptr->get_executor()))
{} {}
sentry_op(sentry_op&&) noexcept = default; sentry_op(sentry_op&&) noexcept = default;
@ -56,30 +46,26 @@ public:
sentry_op& operator=(sentry_op&&) noexcept = default; sentry_op& operator=(sentry_op&&) noexcept = default;
sentry_op& operator=(const sentry_op&) = delete; sentry_op& operator=(const sentry_op&) = delete;
using allocator_type = asio::recycling_allocator<void>; using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept { allocator_type get_allocator() const noexcept {
return allocator_type {}; return asio::get_associated_allocator(_handler);
}
using cancellation_slot_type = asio::cancellation_slot;
cancellation_slot_type get_cancellation_slot() const noexcept {
return _svc_ptr->_cancel_sentry.slot();
} }
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _executor; return _svc_ptr->get_executor();
} }
void perform() { void perform() {
_sentry_timer->expires_after(check_interval); _svc_ptr->_sentry_timer.expires_after(check_interval);
_sentry_timer->async_wait( _svc_ptr->_sentry_timer.async_wait(
asio::prepend(std::move(*this), on_timer {}) asio::prepend(std::move(*this), on_timer {})
); );
} }
void operator()(on_timer, error_code ec) { void operator()(on_timer, error_code ec) {
if (ec == asio::error::operation_aborted || !_svc_ptr->is_open()) if (!_svc_ptr->is_open())
return; return complete();
if (_svc_ptr->_replies.any_expired()) { if (_svc_ptr->_replies.any_expired()) {
auto props = disconnect_props {}; auto props = disconnect_props {};
@ -96,8 +82,15 @@ public:
} }
void operator()(on_disconnect, error_code ec) { void operator()(on_disconnect, error_code ec) {
if (!ec) if (ec)
perform(); return complete();
perform();
}
private:
void complete() {
return std::move(_handler)();
} }
}; };

View File

@ -82,9 +82,9 @@ public:
return asio::get_associated_allocator(_handler); return asio::get_associated_allocator(_handler);
} }
using executor_type = asio::associated_executor_t<handler_type>; using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler); return _svc_ptr->get_executor();
} }
void perform( void perform(

View File

@ -81,9 +81,9 @@ public:
return asio::get_associated_allocator(_handler); return asio::get_associated_allocator(_handler);
} }
using executor_type = asio::associated_executor_t<handler_type>; using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler); return _svc_ptr->get_executor();
} }
void perform( void perform(

View File

@ -23,6 +23,7 @@
#include <async_mqtt5/detail/rebind_executor.hpp> #include <async_mqtt5/detail/rebind_executor.hpp>
#include <async_mqtt5/impl/client_service.hpp> #include <async_mqtt5/impl/client_service.hpp>
#include <async_mqtt5/impl/run_op.hpp>
#include <async_mqtt5/impl/publish_send_op.hpp> #include <async_mqtt5/impl/publish_send_op.hpp>
#include <async_mqtt5/impl/re_auth_op.hpp> #include <async_mqtt5/impl/re_auth_op.hpp>
#include <async_mqtt5/impl/subscribe_op.hpp> #include <async_mqtt5/impl/subscribe_op.hpp>

View File

@ -30,9 +30,11 @@
using namespace async_mqtt5; using namespace async_mqtt5;
using strand_type = asio::strand<asio::any_io_executor>;
BOOST_AUTO_TEST_SUITE(executors) BOOST_AUTO_TEST_SUITE(executors)
BOOST_AUTO_TEST_CASE(bind_executor) { void run_test(asio::io_context& ioc, strand_type io_ex, auto bind_async_run, auto bind_async_op) {
using test::after; using test::after;
using namespace std::chrono_literals; using namespace std::chrono_literals;
@ -94,21 +96,16 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
.complete_with(success, after(0ms)) .complete_with(success, after(0ms))
; ;
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>( auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side) ioc, io_ex, std::move(broker_side)
); );
auto strand = asio::make_strand(ioc);
using client_type = mqtt_client<test::test_stream>; using client_type = mqtt_client<test::test_stream>;
client_type c(executor); client_type c(io_ex);
c.brokers("127.0.0.1") c.brokers("127.0.0.1")
.async_run( .async_run(
asio::bind_executor( bind_async_run(
strand, [&](strand_type strand, error_code ec) {
[&](error_code ec) {
BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST(strand.running_in_this_thread()); BOOST_TEST(strand.running_in_this_thread());
++handlers_called; ++handlers_called;
@ -118,9 +115,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
c.async_publish<qos_e::at_most_once>( c.async_publish<qos_e::at_most_once>(
"t_0", "p_0", retain_e::no, {}, "t_0", "p_0", retain_e::no, {},
asio::bind_executor( bind_async_op(
strand, [&](strand_type strand, error_code ec) {
[&](error_code ec) {
BOOST_TEST(!ec); BOOST_TEST(!ec);
BOOST_TEST(strand.running_in_this_thread()); BOOST_TEST(strand.running_in_this_thread());
++handlers_called; ++handlers_called;
@ -130,9 +126,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
c.async_publish<qos_e::at_least_once>( c.async_publish<qos_e::at_least_once>(
"t_1", "p_1", retain_e::no, {}, "t_1", "p_1", retain_e::no, {},
asio::bind_executor( bind_async_op(
strand, [&](strand_type strand, error_code ec, reason_code rc, auto) {
[&](error_code ec, reason_code rc, auto) {
BOOST_TEST(!ec); BOOST_TEST(!ec);
BOOST_TEST(!rc); BOOST_TEST(!rc);
BOOST_TEST(strand.running_in_this_thread()); BOOST_TEST(strand.running_in_this_thread());
@ -143,9 +138,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
c.async_publish<qos_e::exactly_once>( c.async_publish<qos_e::exactly_once>(
"t_2", "p_2", retain_e::no, {}, "t_2", "p_2", retain_e::no, {},
asio::bind_executor( bind_async_op(
strand, [&](strand_type strand, error_code ec, reason_code rc, auto) {
[&](error_code ec, reason_code rc, auto) {
BOOST_TEST(!ec); BOOST_TEST(!ec);
BOOST_TEST(!rc); BOOST_TEST(!rc);
BOOST_TEST(strand.running_in_this_thread()); BOOST_TEST(strand.running_in_this_thread());
@ -156,9 +150,11 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
c.async_subscribe( c.async_subscribe(
subscribe_topic { "t_0", {} }, {}, subscribe_topic { "t_0", {} }, {},
asio::bind_executor( bind_async_op(
strand, [&](
[&](error_code ec, std::vector<reason_code> rcs, auto) { strand_type strand,
error_code ec, std::vector<reason_code> rcs, auto
) {
BOOST_TEST(!ec); BOOST_TEST(!ec);
BOOST_TEST(!rcs[0]); BOOST_TEST(!rcs[0]);
BOOST_TEST(strand.running_in_this_thread()); BOOST_TEST(strand.running_in_this_thread());
@ -168,10 +164,9 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
); );
c.async_receive( c.async_receive(
asio::bind_executor( bind_async_op(
strand,
[&]( [&](
error_code ec, strand_type strand, error_code ec,
std::string rec_topic, std::string rec_payload, std::string rec_topic, std::string rec_payload,
publish_props publish_props
) { ) {
@ -183,18 +178,19 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
c.async_unsubscribe( c.async_unsubscribe(
"t_0", {}, "t_0", {},
asio::bind_executor( bind_async_op(
strand, [&](
[&](error_code ec, std::vector<reason_code> rcs, auto) { strand_type strand,
error_code ec, std::vector<reason_code> rcs, auto
) {
BOOST_TEST(!ec); BOOST_TEST(!ec);
BOOST_TEST(!rcs[0]); BOOST_TEST(!rcs[0]);
BOOST_TEST(strand.running_in_this_thread()); BOOST_TEST(strand.running_in_this_thread());
++handlers_called; ++handlers_called;
c.async_disconnect( c.async_disconnect(
asio::bind_executor( bind_async_op(
strand, [&](strand_type strand, error_code ec) {
[&](error_code ec) {
BOOST_TEST(!ec); BOOST_TEST(!ec);
BOOST_TEST(strand.running_in_this_thread()); BOOST_TEST(strand.running_in_this_thread());
++handlers_called; ++handlers_called;
@ -213,6 +209,34 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
BOOST_TEST(broker.received_all_expected()); BOOST_TEST(broker.received_all_expected());
} }
BOOST_AUTO_TEST_CASE(different_bound_executors) {
asio::io_context ioc;
auto bind_async_op = [&](auto h) {
auto strand = asio::make_strand(ioc);
return asio::bind_executor(
strand,
asio::prepend(std::move(h), strand)
);
};
run_test(ioc, asio::make_strand(ioc), bind_async_op, bind_async_op);
}
BOOST_AUTO_TEST_CASE(default_executor) {
asio::io_context ioc;
auto io_ex = asio::make_strand(ioc);
auto bind_async_run = [&](auto h) {
auto strand = asio::make_strand(ioc);
return asio::bind_executor(
strand,
asio::prepend(std::move(h), strand)
);
};
auto bind_async_op = [&](auto h) {
return asio::prepend(std::move(h), io_ex);
};
run_test(ioc, io_ex, bind_async_run, bind_async_op);
}
BOOST_AUTO_TEST_CASE(immediate_executor_async_publish) { BOOST_AUTO_TEST_CASE(immediate_executor_async_publish) {
constexpr int expected_handlers_called = 1; constexpr int expected_handlers_called = 1;
int handlers_called = 0; int handlers_called = 0;