diff --git a/doc/qbk/10_executors.qbk b/doc/qbk/10_executors.qbk index fe6bf5b..6033686 100644 --- a/doc/qbk/10_executors.qbk +++ b/doc/qbk/10_executors.qbk @@ -19,15 +19,7 @@ Upon creating an instance of the __Client__, it is necessary to provide an execu The specified executor (or __ExecutionContext__'s executor) will become the default executor associated with the __Client__ and will be used for the execution of all the intermediate operations and a final completion handler for all asynchronous operations that have not bound an executor. -If an executor is bound to an asynchronous operation, that executor will be used instead. - -The [refmem mqtt_client async_run] operation starts the __Client__, which initiates a series of internal asynchronous operations, all of which need an executor. -If the [refmem mqtt_client async_run] is called with a completion handler that has an associated executor, -then all the internal asynchronous operations will associate the same executor. - -[important - The same executor *must* execute [refmem mqtt_client async_run] and all the subsequent `async_xxx` operations. -] +If an executor is bound to an asynchronous operation, that executor will be used for the final completion handler instead. The following examples will demonstrate the previously described interactions. diff --git a/doc/qbk/11_multithreading.qbk b/doc/qbk/11_multithreading.qbk index d8063bb..3ee1073 100644 --- a/doc/qbk/11_multithreading.qbk +++ b/doc/qbk/11_multithreading.qbk @@ -68,16 +68,15 @@ or explicit strand. Specifically, use __POST__ or __DISPATCH__ to delegate a function call to a strand, or __CO_SPAWN__ to spawn the coroutine into the strand. For asynchronous functions, this will ensure that the initiation code is executed within the strand in a thread-safe manner. -The associated executor of all `async_xxx`'s completion handlers must be the same strand. +__Client__'s executor must be that same strand. This will guarantee that the entire sequence of operations -- from the initiation code through any intermediate operations to the execution of the completion handler - +- the initiation code and any intermediate operation - is carried out within the strand, thereby ensuring thread safety. [important To conclude, to achieve thread safety, all the member functions of the __Client__ *must* be executed in *the same strand*. - This strand *must* be the associated executor of all the completion handlers across - all `async_xxx` invocations. + This strand must be given in the __Client__ constructor. ] The examples below demonstrate how to publish a "Hello World" Application Message diff --git a/include/async_mqtt5/detail/cancellable_handler.hpp b/include/async_mqtt5/detail/cancellable_handler.hpp index 0386b15..16379d4 100644 --- a/include/async_mqtt5/detail/cancellable_handler.hpp +++ b/include/async_mqtt5/detail/cancellable_handler.hpp @@ -75,7 +75,10 @@ public: template void complete(Args&&... args) { asio::get_associated_cancellation_slot(_handler).clear(); - std::move(_handler)(std::forward(args)...); + asio::dispatch( + _handler_ex, + asio::prepend(std::move(_handler), std::forward(args)...) + ); } template diff --git a/include/async_mqtt5/impl/assemble_op.hpp b/include/async_mqtt5/impl/assemble_op.hpp index 8832090..33c9b0c 100644 --- a/include/async_mqtt5/impl/assemble_op.hpp +++ b/include/async_mqtt5/impl/assemble_op.hpp @@ -91,9 +91,9 @@ public: return asio::get_associated_allocator(_handler); } - using executor_type = typename client_service::executor_type; + using executor_type = asio::associated_executor_t; executor_type get_executor() const noexcept { - return _svc.get_executor(); + return asio::get_associated_executor(_handler); } template @@ -111,6 +111,7 @@ public: if (cc(error_code {}, 0) == 0 && _data_span.size()) { return asio::post( + _svc.get_executor(), asio::prepend( std::move(*this), on_read {}, error_code {}, 0, std::move(cc) diff --git a/include/async_mqtt5/impl/async_sender.hpp b/include/async_mqtt5/impl/async_sender.hpp index 471f000..b669c9a 100644 --- a/include/async_mqtt5/impl/async_sender.hpp +++ b/include/async_mqtt5/impl/async_sender.hpp @@ -79,14 +79,6 @@ public: return !_handler; } - auto get_executor() { - return asio::get_associated_executor(_handler); - } - - auto get_allocator() { - return asio::get_associated_allocator(_handler); - } - bool throttled() const { return _flags & send_flag::throttled; } @@ -148,6 +140,11 @@ public: return allocator_type {}; } + using executor_type = typename client_service::executor_type; + executor_type get_executor() const noexcept { + return _svc.get_executor(); + } + serial_num_t next_serial_num() { return _last_serial_num = write_req::next_serial_num(_last_serial_num); } @@ -293,17 +290,9 @@ private: _svc._replies.clear_fast_replies(); - auto alloc = write_queue.front().get_allocator(); - auto ex = write_queue.front().get_executor(); _svc._stream.async_write( buffers, - asio::bind_allocator( - alloc, - asio::bind_executor( - ex, - asio::prepend(std::ref(*this), std::move(write_queue)) - ) - ) + asio::prepend(std::ref(*this), std::move(write_queue)) ); } diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index f3aeacd..4a2393f 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include @@ -28,10 +27,7 @@ #include #include #include -#include -#include #include -#include namespace async_mqtt5::detail { @@ -235,16 +231,19 @@ private: void (error_code, std::string, std::string, publish_props) >; + template + friend class run_op; + template friend class async_sender; template friend class assemble_op; - template + template friend class ping_op; - template + template friend class sentry_op; template @@ -264,10 +263,8 @@ private: receive_channel _rec_channel; - asio::cancellation_signal _cancel_ping; - asio::cancellation_signal _cancel_sentry; - - asio::any_completion_handler _run_handler; + asio::steady_timer _ping_timer; + asio::steady_timer _sentry_timer; client_service(const client_service& other) : _executor(other._executor), _stream_context(other._stream_context), @@ -275,7 +272,9 @@ private: _replies(_executor), _async_sender(*this), _active_span(_read_buff.cend(), _read_buff.cend()), - _rec_channel(_executor, std::numeric_limits::max()) + _rec_channel(_executor, std::numeric_limits::max()), + _ping_timer(_executor), + _sentry_timer(_executor) { _stream.clone_endpoints(other._stream); } @@ -292,7 +291,9 @@ public: _replies(ex), _async_sender(*this), _active_span(_read_buff.cend(), _read_buff.cend()), - _rec_channel(ex, std::numeric_limits::max()) + _rec_channel(ex, std::numeric_limits::max()), + _ping_timer(ex), + _sentry_timer(ex) {} executor_type get_executor() const noexcept { @@ -385,21 +386,6 @@ public: return _stream_context.connack_properties(); } - template - void run(Handler&& handler) { - _run_handler = std::move(handler); - auto slot = asio::get_associated_cancellation_slot(_run_handler); - if (slot.is_connected()) { - using c_t = asio::cancellation_type_t; - slot.assign([&svc = *this](c_t c) { - if ((c & c_t::terminal) != c_t::none) - svc.cancel(); - }); - } - _stream.open(); - _rec_channel.reset(); - } - void open_stream() { _stream.open(); } @@ -413,25 +399,16 @@ public: } void cancel() { - if (!_run_handler) return; + if (!_stream.is_open()) return; - _cancel_ping.emit(asio::cancellation_type::terminal); - _cancel_sentry.emit(asio::cancellation_type::terminal); + _ping_timer.cancel(); + _sentry_timer.cancel(); _rec_channel.close(); _replies.cancel_unanswered(); _async_sender.cancel(); _stream.cancel(); _stream.close(); - - asio::get_associated_cancellation_slot(_run_handler).clear(); - asio::post( - get_executor(), - asio::prepend( - std::move(_run_handler), - asio::error::operation_aborted - ) - ); } uint16_t allocate_pid() { @@ -469,7 +446,7 @@ public: } } - _cancel_ping.emit(asio::cancellation_type::total); + _ping_timer.cancel(); } bool channel_store(decoders::publish_message message) { @@ -532,31 +509,6 @@ public: }; -template -class initiate_async_run { - std::shared_ptr _svc_ptr; -public: - explicit initiate_async_run(std::shared_ptr svc_ptr) : - _svc_ptr(std::move(svc_ptr)) - {} - - using executor_type = typename ClientService::executor_type; - executor_type get_executor() const noexcept { - return _svc_ptr->get_executor(); - } - - template - void operator()(Handler&& handler) { - auto ex = asio::get_associated_executor(handler, get_executor()); - - _svc_ptr->run(std::move(handler)); - detail::ping_op { _svc_ptr, ex }.perform(); - detail::read_message_op { _svc_ptr, ex }.perform(); - detail::sentry_op { _svc_ptr, ex }.perform(); - } -}; - - } // namespace async_mqtt5::detail #endif // !ASYNC_MQTT5_CLIENT_SERVICE_HPP diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index 47d2897..778f4a6 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -82,9 +82,9 @@ public: return asio::get_associated_allocator(_handler); } - using executor_type = asio::associated_executor_t; + using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); + return _svc_ptr->get_executor(); } void perform() { @@ -188,9 +188,7 @@ class terminal_disconnect_op { std::shared_ptr _svc_ptr; std::unique_ptr _timer; - using handler_type = cancellable_handler< - Handler, typename ClientService::executor_type - >; + using handler_type = Handler; handler_type _handler; public: @@ -200,7 +198,7 @@ public: ) : _svc_ptr(std::move(svc_ptr)), _timer(new asio::steady_timer(_svc_ptr->get_executor())), - _handler(std::move(handler), _svc_ptr->get_executor()) + _handler(std::move(handler)) {} terminal_disconnect_op(terminal_disconnect_op&&) = default; @@ -230,10 +228,10 @@ public: auto init_disconnect = []( auto handler, disconnect_ctx ctx, - const std::shared_ptr& svc_ptr + std::shared_ptr svc_ptr ) { disconnect_op { - svc_ptr, std::move(ctx), std::move(handler) + std::move(svc_ptr), std::move(ctx), std::move(handler) }.perform(); }; @@ -248,7 +246,7 @@ public: ); timed_disconnect.async_wait( - asioex::wait_for_one(), asio::prepend(std::move(*this)) + asioex::wait_for_one(), std::move(*this) ); } @@ -256,7 +254,7 @@ public: std::array /* ord */, error_code disconnect_ec, error_code /* timer_ec */ ) { - _handler.complete(disconnect_ec); + std::move(_handler)(disconnect_ec); } }; @@ -291,12 +289,12 @@ public: template decltype(auto) async_disconnect( disconnect_rc_e reason_code, const disconnect_props& props, - const std::shared_ptr& svc_ptr, + std::shared_ptr svc_ptr, CompletionToken&& token ) { using Signature = void (error_code); return asio::async_initiate( - initiate_async_disconnect(svc_ptr), token, + initiate_async_disconnect(std::move(svc_ptr)), token, reason_code, props ); } @@ -304,12 +302,12 @@ decltype(auto) async_disconnect( template decltype(auto) async_terminal_disconnect( disconnect_rc_e reason_code, const disconnect_props& props, - const std::shared_ptr& svc_ptr, + std::shared_ptr svc_ptr, CompletionToken&& token ) { using Signature = void (error_code); return asio::async_initiate( - initiate_async_disconnect(svc_ptr), token, + initiate_async_disconnect(std::move(svc_ptr)), token, reason_code, props ); } diff --git a/include/async_mqtt5/impl/ping_op.hpp b/include/async_mqtt5/impl/ping_op.hpp index b5252b5..c2f0f45 100644 --- a/include/async_mqtt5/impl/ping_op.hpp +++ b/include/async_mqtt5/impl/ping_op.hpp @@ -10,13 +10,10 @@ #include #include -#include -#include +#include #include #include -#include -#include #include #include @@ -27,33 +24,20 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; -template +template class ping_op { -public: - using executor_type = Executor; -private: using client_service = ClientService; + using handler_type = Handler; struct on_timer {}; struct on_pingreq {}; std::shared_ptr _svc_ptr; - executor_type _executor; - std::unique_ptr _ping_timer; - asio::cancellation_state _cancellation_state; + handler_type _handler; public: - ping_op( - std::shared_ptr svc_ptr, - executor_type ex - ) : - _svc_ptr(std::move(svc_ptr)), _executor(ex), - _ping_timer(new asio::steady_timer(_svc_ptr->get_executor())), - _cancellation_state( - _svc_ptr->_cancel_ping.slot(), - asio::enable_total_cancellation {}, - asio::enable_total_cancellation {} - ) + ping_op(std::shared_ptr svc_ptr, Handler&& handler) : + _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler)) {} ping_op(ping_op&&) noexcept = default; @@ -62,37 +46,28 @@ public: ping_op& operator=(ping_op&&) noexcept = default; ping_op& operator=(const ping_op&) = delete; - using allocator_type = asio::recycling_allocator; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { - return allocator_type {}; - } - - using cancellation_slot_type = asio::cancellation_slot; - cancellation_slot_type get_cancellation_slot() const noexcept { - return _cancellation_state.slot(); + return asio::get_associated_allocator(_handler); } + using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return _executor; + return _svc_ptr->get_executor(); } void perform() { - _ping_timer->expires_after(compute_wait_time()); - _ping_timer->async_wait( + _svc_ptr->_ping_timer.expires_after(compute_wait_time()); + _svc_ptr->_ping_timer.async_wait( asio::prepend(std::move(*this), on_timer {}) ); } - void operator()(on_timer, error_code) { - if ( - _cancellation_state.cancelled() == asio::cancellation_type::terminal || - !_svc_ptr->is_open() - ) - return; - else if (_cancellation_state.cancelled() == asio::cancellation_type::total) { - _cancellation_state.clear(); + void operator()(on_timer, error_code ec) { + if (!_svc_ptr->is_open()) + return complete(); + else if (ec == asio::error::operation_aborted) return perform(); - } auto pingreq = control_packet::of( no_pid, get_allocator(), encoders::encode_pingreq @@ -110,14 +85,10 @@ public: } void operator()(on_pingreq, error_code ec) { - if ( - _cancellation_state.cancelled() == asio::cancellation_type::terminal || - ec == asio::error::no_recovery - ) - return; + if (!ec || ec == asio::error::try_again) + return perform(); - _cancellation_state.clear(); - perform(); + complete(); } private: @@ -127,6 +98,10 @@ private: std::chrono::seconds(negotiated_ka) : duration(std::numeric_limits::max()); } + + void complete() { + return std::move(_handler)(); + } }; diff --git a/include/async_mqtt5/impl/publish_send_op.hpp b/include/async_mqtt5/impl/publish_send_op.hpp index 26c4941..de9eaaa 100644 --- a/include/async_mqtt5/impl/publish_send_op.hpp +++ b/include/async_mqtt5/impl/publish_send_op.hpp @@ -106,9 +106,9 @@ public: return asio::get_associated_allocator(_handler); } - using executor_type = asio::associated_executor_t; + using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); + return _svc_ptr->get_executor(); } void perform( diff --git a/include/async_mqtt5/impl/read_message_op.hpp b/include/async_mqtt5/impl/read_message_op.hpp index 7be4893..06ffe0e 100644 --- a/include/async_mqtt5/impl/read_message_op.hpp +++ b/include/async_mqtt5/impl/read_message_op.hpp @@ -8,7 +8,6 @@ #ifndef ASYNC_MQTT5_READ_MESSAGE_OP_HPP #define ASYNC_MQTT5_READ_MESSAGE_OP_HPP -#include #include #include @@ -30,24 +29,20 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; -template +template class read_message_op { -public: - using executor_type = Executor; -private: using client_service = ClientService; + using handler_type = Handler; struct on_message {}; struct on_disconnect {}; std::shared_ptr _svc_ptr; - executor_type _executor; + handler_type _handler; + public: - read_message_op( - std::shared_ptr svc_ptr, - executor_type ex - ) : - _svc_ptr(std::move(svc_ptr)), _executor(ex) + read_message_op(std::shared_ptr svc_ptr, Handler&& handler) + : _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler)) {} read_message_op(read_message_op&&) noexcept = default; @@ -56,13 +51,14 @@ public: read_message_op& operator=(read_message_op&&) noexcept = default; read_message_op& operator=(const read_message_op&) = delete; - using allocator_type = asio::recycling_allocator; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { - return allocator_type {}; + return asio::get_associated_allocator(_handler); } + using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return _executor; + return _svc_ptr->get_executor(); } void perform() { @@ -82,17 +78,19 @@ public: ); if (ec == asio::error::no_recovery) - return _svc_ptr->cancel(); + _svc_ptr->cancel(); - if (ec == asio::error::operation_aborted) - return; + if (ec) + return complete(); dispatch(control_code, first, last); } void operator()(on_disconnect, error_code ec) { - if (!ec) - perform(); + if (ec) + return complete(); + + perform(); } private: @@ -150,6 +148,9 @@ private: ); } + void complete() { + return std::move(_handler)(); + } }; diff --git a/include/async_mqtt5/impl/replies.hpp b/include/async_mqtt5/impl/replies.hpp index 984de52..4dd5280 100644 --- a/include/async_mqtt5/impl/replies.hpp +++ b/include/async_mqtt5/impl/replies.hpp @@ -62,7 +62,7 @@ private: error_code ec, byte_citer first = byte_citer {}, byte_citer last = byte_citer {} ) { - asio::dispatch(asio::prepend(std::move(_handler), ec, first, last)); + std::move(_handler)(ec, first, last); } void complete_post(const executor_type& ex, error_code ec) { diff --git a/include/async_mqtt5/impl/run_op.hpp b/include/async_mqtt5/impl/run_op.hpp new file mode 100644 index 0000000..fb19196 --- /dev/null +++ b/include/async_mqtt5/impl/run_op.hpp @@ -0,0 +1,142 @@ +// +// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASYNC_MQTT5_RUN_OP_HPP +#define ASYNC_MQTT5_RUN_OP_HPP + +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace async_mqtt5::detail { + +namespace asio = boost::asio; + +template +class run_op { + using client_service = ClientService; + + std::shared_ptr _svc_ptr; + + using handler_type = cancellable_handler< + Handler, + typename client_service::executor_type + >; + handler_type _handler; + +public: + run_op( + std::shared_ptr svc_ptr, + Handler&& handler + ) : + _svc_ptr(std::move(svc_ptr)), + _handler(std::move(handler), _svc_ptr->get_executor()) + { + auto slot = asio::get_associated_cancellation_slot(_handler); + if (slot.is_connected()) + slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) { + svc.cancel(); + }); + } + + run_op(run_op&&) noexcept = default; + run_op(const run_op&) = delete; + + run_op& operator=(run_op&&) noexcept = default; + run_op& operator=(const run_op&) = delete; + + using allocator_type = asio::associated_allocator_t; + allocator_type get_allocator() const noexcept { + return asio::get_associated_allocator(_handler); + } + + using executor_type = typename client_service::executor_type; + executor_type get_executor() const noexcept { + return _svc_ptr->get_executor(); + } + + void perform() { + namespace asioex = boost::asio::experimental; + + _svc_ptr->_stream.open(); + _svc_ptr->_rec_channel.reset(); + + auto init_read_message_op = []( + auto handler, std::shared_ptr svc_ptr + ) { + return read_message_op { std::move(svc_ptr), std::move(handler) } + .perform(); + }; + + auto init_ping_op = []( + auto handler, std::shared_ptr svc_ptr + ) { + return ping_op { std::move(svc_ptr), std::move(handler) } + .perform(); + }; + + auto init_senty_op = []( + auto handler, std::shared_ptr svc_ptr + ) { + return sentry_op { std::move(svc_ptr), std::move(handler) } + .perform(); + }; + + asioex::make_parallel_group( + asio::async_initiate( + init_read_message_op, asio::deferred, _svc_ptr + ), + asio::async_initiate( + init_ping_op, asio::deferred, _svc_ptr + ), + asio::async_initiate( + init_senty_op, asio::deferred, _svc_ptr + ) + ).async_wait(asioex::wait_for_all(), std::move(*this)); + } + + void operator()(std::array /* ord */) { + _handler.complete(asio::error::operation_aborted); + } +}; + +template +class initiate_async_run { + std::shared_ptr _svc_ptr; +public: + explicit initiate_async_run(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)) + {} + + using executor_type = typename ClientService::executor_type; + executor_type get_executor() const noexcept { + return _svc_ptr->get_executor(); + } + + template + void operator()(Handler&& handler) { + run_op { + _svc_ptr, std::move(handler) + }.perform(); + } +}; + + +} // end namespace async_mqtt5::detail + +#endif // !ASYNC_MQTT5_RUN_OP_HPP diff --git a/include/async_mqtt5/impl/sentry_op.hpp b/include/async_mqtt5/impl/sentry_op.hpp index abc7e23..228e12d 100644 --- a/include/async_mqtt5/impl/sentry_op.hpp +++ b/include/async_mqtt5/impl/sentry_op.hpp @@ -11,11 +11,7 @@ #include #include -#include -#include #include -#include -#include #include #include @@ -26,12 +22,10 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; -template +template class sentry_op { -public: - using executor_type = Executor; -private: using client_service = ClientService; + using handler_type = Handler; struct on_timer {}; struct on_disconnect {}; @@ -39,15 +33,11 @@ private: static constexpr auto check_interval = std::chrono::seconds(3); std::shared_ptr _svc_ptr; - executor_type _executor; - std::unique_ptr _sentry_timer; + handler_type _handler; public: - sentry_op( - std::shared_ptr svc_ptr, executor_type ex - ) : - _svc_ptr(std::move(svc_ptr)), _executor(ex), - _sentry_timer(new asio::steady_timer(_svc_ptr->get_executor())) + sentry_op(std::shared_ptr svc_ptr, Handler&& handler) : + _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler)) {} sentry_op(sentry_op&&) noexcept = default; @@ -56,30 +46,26 @@ public: sentry_op& operator=(sentry_op&&) noexcept = default; sentry_op& operator=(const sentry_op&) = delete; - using allocator_type = asio::recycling_allocator; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { - return allocator_type {}; - } - - using cancellation_slot_type = asio::cancellation_slot; - cancellation_slot_type get_cancellation_slot() const noexcept { - return _svc_ptr->_cancel_sentry.slot(); + return asio::get_associated_allocator(_handler); } + using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return _executor; + return _svc_ptr->get_executor(); } void perform() { - _sentry_timer->expires_after(check_interval); - _sentry_timer->async_wait( + _svc_ptr->_sentry_timer.expires_after(check_interval); + _svc_ptr->_sentry_timer.async_wait( asio::prepend(std::move(*this), on_timer {}) ); } void operator()(on_timer, error_code ec) { - if (ec == asio::error::operation_aborted || !_svc_ptr->is_open()) - return; + if (!_svc_ptr->is_open()) + return complete(); if (_svc_ptr->_replies.any_expired()) { auto props = disconnect_props {}; @@ -96,8 +82,15 @@ public: } void operator()(on_disconnect, error_code ec) { - if (!ec) - perform(); + if (ec) + return complete(); + + perform(); + } + +private: + void complete() { + return std::move(_handler)(); } }; diff --git a/include/async_mqtt5/impl/subscribe_op.hpp b/include/async_mqtt5/impl/subscribe_op.hpp index a7b0575..f912bda 100644 --- a/include/async_mqtt5/impl/subscribe_op.hpp +++ b/include/async_mqtt5/impl/subscribe_op.hpp @@ -82,9 +82,9 @@ public: return asio::get_associated_allocator(_handler); } - using executor_type = asio::associated_executor_t; + using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); + return _svc_ptr->get_executor(); } void perform( diff --git a/include/async_mqtt5/impl/unsubscribe_op.hpp b/include/async_mqtt5/impl/unsubscribe_op.hpp index 580243e..9ad86dd 100644 --- a/include/async_mqtt5/impl/unsubscribe_op.hpp +++ b/include/async_mqtt5/impl/unsubscribe_op.hpp @@ -81,9 +81,9 @@ public: return asio::get_associated_allocator(_handler); } - using executor_type = asio::associated_executor_t; + using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); + return _svc_ptr->get_executor(); } void perform( diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index dd7edac..adaf7c4 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include diff --git a/test/integration/executors.cpp b/test/integration/executors.cpp index 7c9dc13..88870d5 100644 --- a/test/integration/executors.cpp +++ b/test/integration/executors.cpp @@ -30,9 +30,11 @@ using namespace async_mqtt5; +using strand_type = asio::strand; + BOOST_AUTO_TEST_SUITE(executors) -BOOST_AUTO_TEST_CASE(bind_executor) { +void run_test(asio::io_context& ioc, strand_type io_ex, auto bind_async_run, auto bind_async_op) { using test::after; using namespace std::chrono_literals; @@ -94,21 +96,16 @@ BOOST_AUTO_TEST_CASE(bind_executor) { .complete_with(success, after(0ms)) ; - asio::io_context ioc; - auto executor = ioc.get_executor(); auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) + ioc, io_ex, std::move(broker_side) ); - auto strand = asio::make_strand(ioc); - using client_type = mqtt_client; - client_type c(executor); + client_type c(io_ex); c.brokers("127.0.0.1") .async_run( - asio::bind_executor( - strand, - [&](error_code ec) { + bind_async_run( + [&](strand_type strand, error_code ec) { BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; @@ -118,9 +115,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) { c.async_publish( "t_0", "p_0", retain_e::no, {}, - asio::bind_executor( - strand, - [&](error_code ec) { + bind_async_op( + [&](strand_type strand, error_code ec) { BOOST_TEST(!ec); BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; @@ -130,9 +126,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) { c.async_publish( "t_1", "p_1", retain_e::no, {}, - asio::bind_executor( - strand, - [&](error_code ec, reason_code rc, auto) { + bind_async_op( + [&](strand_type strand, error_code ec, reason_code rc, auto) { BOOST_TEST(!ec); BOOST_TEST(!rc); BOOST_TEST(strand.running_in_this_thread()); @@ -143,9 +138,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) { c.async_publish( "t_2", "p_2", retain_e::no, {}, - asio::bind_executor( - strand, - [&](error_code ec, reason_code rc, auto) { + bind_async_op( + [&](strand_type strand, error_code ec, reason_code rc, auto) { BOOST_TEST(!ec); BOOST_TEST(!rc); BOOST_TEST(strand.running_in_this_thread()); @@ -156,9 +150,11 @@ BOOST_AUTO_TEST_CASE(bind_executor) { c.async_subscribe( subscribe_topic { "t_0", {} }, {}, - asio::bind_executor( - strand, - [&](error_code ec, std::vector rcs, auto) { + bind_async_op( + [&]( + strand_type strand, + error_code ec, std::vector rcs, auto + ) { BOOST_TEST(!ec); BOOST_TEST(!rcs[0]); BOOST_TEST(strand.running_in_this_thread()); @@ -168,10 +164,9 @@ BOOST_AUTO_TEST_CASE(bind_executor) { ); c.async_receive( - asio::bind_executor( - strand, + bind_async_op( [&]( - error_code ec, + strand_type strand, error_code ec, std::string rec_topic, std::string rec_payload, publish_props ) { @@ -183,18 +178,19 @@ BOOST_AUTO_TEST_CASE(bind_executor) { c.async_unsubscribe( "t_0", {}, - asio::bind_executor( - strand, - [&](error_code ec, std::vector rcs, auto) { + bind_async_op( + [&]( + strand_type strand, + error_code ec, std::vector rcs, auto + ) { BOOST_TEST(!ec); BOOST_TEST(!rcs[0]); BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; c.async_disconnect( - asio::bind_executor( - strand, - [&](error_code ec) { + bind_async_op( + [&](strand_type strand, error_code ec) { BOOST_TEST(!ec); BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; @@ -213,6 +209,34 @@ BOOST_AUTO_TEST_CASE(bind_executor) { BOOST_TEST(broker.received_all_expected()); } +BOOST_AUTO_TEST_CASE(different_bound_executors) { + asio::io_context ioc; + auto bind_async_op = [&](auto h) { + auto strand = asio::make_strand(ioc); + return asio::bind_executor( + strand, + asio::prepend(std::move(h), strand) + ); + }; + run_test(ioc, asio::make_strand(ioc), bind_async_op, bind_async_op); +} + +BOOST_AUTO_TEST_CASE(default_executor) { + asio::io_context ioc; + auto io_ex = asio::make_strand(ioc); + auto bind_async_run = [&](auto h) { + auto strand = asio::make_strand(ioc); + return asio::bind_executor( + strand, + asio::prepend(std::move(h), strand) + ); + }; + auto bind_async_op = [&](auto h) { + return asio::prepend(std::move(h), io_ex); + }; + run_test(ioc, io_ex, bind_async_run, bind_async_op); +} + BOOST_AUTO_TEST_CASE(immediate_executor_async_publish) { constexpr int expected_handlers_called = 1; int handlers_called = 0;