From b2338d4135f6093e3f84209642c6b2b08fa414af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Fri, 10 May 2024 15:12:18 +0200 Subject: [PATCH] async_run's associated ex will not replace mqtt_client's default ex Summary: related to T13767 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D29383 --- doc/qbk/10_executors.qbk | 90 +--------- .../hello_world_in_coro_multithreaded_env.cpp | 15 +- include/async_mqtt5/impl/client_service.hpp | 5 +- include/async_mqtt5/impl/ping_op.hpp | 19 +- include/async_mqtt5/impl/read_message_op.hpp | 15 +- include/async_mqtt5/impl/sentry_op.hpp | 17 +- include/async_mqtt5/mqtt_client.hpp | 9 +- test/integration/executors.cpp | 168 +++++++++--------- 8 files changed, 136 insertions(+), 202 deletions(-) diff --git a/doc/qbk/10_executors.qbk b/doc/qbk/10_executors.qbk index 1d6c8a4..733a1eb 100644 --- a/doc/qbk/10_executors.qbk +++ b/doc/qbk/10_executors.qbk @@ -15,27 +15,19 @@ and will be used for the execution of all the intermediate operations and a fina that have not bound an executor. If an executor is bound to an asynchronous operation, that executor will be used instead. -In this context, the [refmem mqtt_client async_run] operation is particularly important. -It starts the __Client__, which initiates a series of internal asynchronous operations, all of which need an executor. +The [refmem mqtt_client async_run] operation starts the __Client__, which initiates a series of internal asynchronous operations, all of which need an executor. If the [refmem mqtt_client async_run] is called with a completion handler that has an associated executor, -then all the internal asynchronous operations will also be associated with the same executor. -Otherwise, the default executor from the __Client__'s construction will be used instead. - -[note - If the [refmem mqtt_client async_run]'s completion handler has an associated executor, - *the associated executor will become the new default associated executor* instead of the executor provided in the constructor. -] +then all the internal asynchronous operations will associate the same executor. [important - The same executor *must* execute [refmem mqtt_client async_run] and all the subsequent async_xxx operations. + The same executor *must* execute [refmem mqtt_client async_run] and all the subsequent `async_xxx` operations. ] The following examples will demonstrate the previously described interactions. -[heading Example 1: using the constructor's executor as the default associated executor] +[heading Example: using the constructor's executor as the default associated executor] -In this code snippet, the __Client__ is constructed with a strand -without explicitly associating an executor with the completion handler of [refmem mqtt_client async_run]. +In this code snippet, the __Client__ is constructed with a strand. Consequently, the __Client__ adopts the strand as its new default executor, which is used to execute the [refmem mqtt_client async_publish] operation. @@ -66,76 +58,4 @@ int main() { ``` -[heading Example 2: binding an executor to async_run's handler overrides the default associated executor] - -In this code snippet, the __Client__ is constructed with __IOC__'s executor -with explicitly associating an executor (strand) with the completion handler of [refmem mqtt_client async_run]. -Consequently, the __Client__ adopts the strand as its new default executor, -which is used to execute the [refmem mqtt_client async_publish] operation. - -``` -int main() { - boost::asio::io_context ioc; - - // Create the Client with io_context's executor. - async_mqtt5::mqtt_client client(ioc.get_executor()); - - auto strand = boost::asio::make_strand(ioc.get_executor()); - client.brokers("", 1883) - // Bind the strand to async_run's completion handler. - // Strand is now the default associated executor. - .async_run(boost::asio::bind_executor(strand, boost::asio::detached)); - - client.async_publish( - "", "Hello world!", - async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, - [&client, &strand](async_mqtt5::error_code /* ec */) { - assert(strand.running_in_this_thread()); - client.cancel(); - } - ); - - ioc.run(); -} -``` - -[heading Example 3: binding an executor to a async_xxx call] - -In this code snippet, the __Client__ is constructed with __IOC__'s executor -without explicitly associating an executor with the completion handler of [refmem mqtt_client async_run]. -The [refmem mqtt_client async_publish] operation bound the strand as the associated executor. -Therefore, all the intermediate operations and the final completion handler will be executed -on the strand. - -[warning - This example only serves as a demonstration and should *not* be used. - It is *not* recommended that [refmem mqtt_client async_run] and other async_xxx operations execute on different executors! -] - -``` -int main() { - boost::asio::io_context ioc; - - async_mqtt5::mqtt_client client(ioc.get_executor()); - - auto strand = boost::asio::make_strand(ioc.get_executor()); - client.brokers("", 1883) - .async_run(boost::asio::detached); - - client.async_publish( - "", "Hello world!", - async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, - boost::asio::bind_executor( - strand, - [&client, &strand](async_mqtt5::error_code /* ec */) { - assert(strand.running_in_this_thread()); - client.cancel(); - } - ) - ); - - ioc.run(); -} -``` - [endsect] [/executors] diff --git a/example/hello_world_in_coro_multithreaded_env.cpp b/example/hello_world_in_coro_multithreaded_env.cpp index b17d501..7e3f4c6 100644 --- a/example/hello_world_in_coro_multithreaded_env.cpp +++ b/example/hello_world_in_coro_multithreaded_env.cpp @@ -28,15 +28,10 @@ boost::asio::awaitable publish_hello_world( assert(strand.running_in_this_thread()); // All these function calls will be executed by the strand that is executing the coroutine. - // All the completion handler's associated executors will be the same strand - // because the Client was constructed with it as the default associated executor, - // and no executors were associated with the async_run call. - - // Note: you can spawn the coroutine in another strand that is different - // from the one used in constructing the Client. - // However, then you must bind the second strand to async_run's handler. - - client.async_run(boost::asio::detached); + // All the completion handler's associated executors will be that same strand + // because the Client was constructed with it as the default associated executor. + client.brokers("", 1883) + .async_run(boost::asio::detached); auto&& [ec, rc, puback_props] = co_await client.async_publish( "", "Hello world!", async_mqtt5::retain_e::no, @@ -66,8 +61,6 @@ int main() { // Create the Client with the explicit strand as the default associated executor. client_type client(strand); - client.brokers("", 1883); - // Spawn the coroutine. // The executor that executes the coroutine must be the same executor // that is the Client's default associated executor. diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index 9baa9d7..f6e80a1 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -223,10 +223,10 @@ private: template friend class assemble_op; - template + template friend class ping_op; - template + template friend class sentry_op; template @@ -369,7 +369,6 @@ public: template void run(Handler&& handler) { - _executor = asio::get_associated_executor(handler, _executor); _run_handler = std::move(handler); auto slot = asio::get_associated_cancellation_slot(_run_handler); if (slot.is_connected()) { diff --git a/include/async_mqtt5/impl/ping_op.hpp b/include/async_mqtt5/impl/ping_op.hpp index b8abd05..cfc94a4 100644 --- a/include/async_mqtt5/impl/ping_op.hpp +++ b/include/async_mqtt5/impl/ping_op.hpp @@ -20,20 +20,28 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; -template +template class ping_op { +public: + using executor_type = Executor; +private: using client_service = ClientService; + struct on_timer {}; struct on_pingreq {}; std::shared_ptr _svc_ptr; + executor_type _executor; std::unique_ptr _ping_timer; asio::cancellation_state _cancellation_state; public: - ping_op(const std::shared_ptr& svc_ptr) : - _svc_ptr(svc_ptr), - _ping_timer(new asio::steady_timer(svc_ptr->get_executor())), + ping_op( + const std::shared_ptr& svc_ptr, + const executor_type& ex + ) : + _svc_ptr(svc_ptr), _executor(ex), + _ping_timer(new asio::steady_timer(_svc_ptr->get_executor())), _cancellation_state( svc_ptr->_cancel_ping.slot(), asio::enable_total_cancellation {}, @@ -44,9 +52,8 @@ public: ping_op(ping_op&&) noexcept = default; ping_op(const ping_op&) = delete; - using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return _svc_ptr->get_executor(); + return _executor; } using allocator_type = asio::recycling_allocator; diff --git a/include/async_mqtt5/impl/read_message_op.hpp b/include/async_mqtt5/impl/read_message_op.hpp index 87a8543..b5a0ff5 100644 --- a/include/async_mqtt5/impl/read_message_op.hpp +++ b/include/async_mqtt5/impl/read_message_op.hpp @@ -20,26 +20,31 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; -template +template class read_message_op { +public: + using executor_type = Executor; +private: using client_service = ClientService; + struct on_message {}; struct on_disconnect {}; std::shared_ptr _svc_ptr; + executor_type _executor; public: read_message_op( - const std::shared_ptr& svc_ptr + const std::shared_ptr& svc_ptr, + const executor_type& ex ) : - _svc_ptr(svc_ptr) + _svc_ptr(svc_ptr), _executor(ex) {} read_message_op(read_message_op&&) noexcept = default; read_message_op(const read_message_op&) = delete; - using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return _svc_ptr->get_executor(); + return _executor; } using allocator_type = asio::recycling_allocator; diff --git a/include/async_mqtt5/impl/sentry_op.hpp b/include/async_mqtt5/impl/sentry_op.hpp index f13e0f4..01105d9 100644 --- a/include/async_mqtt5/impl/sentry_op.hpp +++ b/include/async_mqtt5/impl/sentry_op.hpp @@ -16,31 +16,36 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; -template +template class sentry_op { +public: + using executor_type = Executor; +private: using client_service = ClientService; + struct on_timer {}; struct on_disconnect {}; static constexpr auto check_interval = std::chrono::seconds(3); std::shared_ptr _svc_ptr; + executor_type _executor; std::unique_ptr _sentry_timer; public: sentry_op( - const std::shared_ptr& svc_ptr + const std::shared_ptr& svc_ptr, + const executor_type& ex ) : - _svc_ptr(svc_ptr), - _sentry_timer(new asio::steady_timer(svc_ptr->get_executor())) + _svc_ptr(svc_ptr), _executor(ex), + _sentry_timer(new asio::steady_timer(_svc_ptr->get_executor())) {} sentry_op(sentry_op&&) noexcept = default; sentry_op(const sentry_op&) = delete; - using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { - return _svc_ptr->get_executor(); + return _executor; } using allocator_type = asio::recycling_allocator; diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index ca2289f..d0bed45 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -180,11 +180,12 @@ public: using Signature = void(error_code); auto initiation = [] (auto handler, const impl_type& impl) { - impl->run(std::move(handler)); + auto ex = asio::get_associated_executor(handler, impl->get_executor()); - detail::ping_op { impl }.perform(); - detail::read_message_op { impl }.perform(); - detail::sentry_op { impl }.perform(); + impl->run(std::move(handler)); + detail::ping_op { impl, ex }.perform(); + detail::read_message_op { impl, ex }.perform(); + detail::sentry_op { impl, ex }.perform(); }; return asio::async_initiate( diff --git a/test/integration/executors.cpp b/test/integration/executors.cpp index b559869..f835ebb 100644 --- a/test/integration/executors.cpp +++ b/test/integration/executors.cpp @@ -17,41 +17,31 @@ using namespace async_mqtt5; BOOST_AUTO_TEST_SUITE(executors) -BOOST_AUTO_TEST_CASE(async_run) { +BOOST_AUTO_TEST_CASE(bind_executor) { using test::after; using namespace std::chrono; - constexpr int expected_handlers_called = 9; + constexpr int expected_handlers_called = 8; int handlers_called = 0; // packets auto connect = encoders::encode_connect( "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt ); - auto connack = encoders::encode_connack( - false, reason_codes::success.value(), {} - ); + auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); auto publish_0 = encoders::encode_publish( 0, "t_0", "p_0", qos_e::at_most_once, retain_e::no, dup_e::no, {} ); auto publish_1 = encoders::encode_publish( 1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {} ); - auto puback = encoders::encode_puback( - 1, reason_codes::success.value(), {} - ); + auto puback = encoders::encode_puback(1, reason_codes::success.value(), {}); auto publish_2 = encoders::encode_publish( 2, "t_2", "p_2", qos_e::exactly_once, retain_e::no, dup_e::no, {} ); - auto pubrec = encoders::encode_pubrec( - 2, reason_codes::success.value(), {} - ); - auto pubrel = encoders::encode_pubrel( - 2, reason_codes::success.value(), {} - ); - auto pubcomp = encoders::encode_pubcomp( - 2, reason_codes::success.value(), {} - ); + auto pubrec = encoders::encode_pubrec(2, reason_codes::success.value(), {}); + auto pubrel = encoders::encode_pubrel(2, reason_codes::success.value(), {}); + auto pubcomp = encoders::encode_pubcomp(2, reason_codes::success.value(), {}); auto subscribe = encoders::encode_subscribe( 3, std::vector { { "t_0", {} } }, {} ); @@ -85,8 +75,6 @@ BOOST_AUTO_TEST_CASE(async_run) { .expect(unsubscribe) .complete_with(success, after(0ms)) .reply_with(unsuback, after(0ms)) - .expect(publish_1) - .complete_with(success, after(0ms)) .expect(disconnect) .complete_with(success, after(0ms)) ; @@ -102,91 +90,107 @@ BOOST_AUTO_TEST_CASE(async_run) { using client_type = mqtt_client; client_type c(executor); c.brokers("127.0.0.1") - .async_run(asio::bind_executor( - strand, - [&](error_code ec) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - } - )); + .async_run( + asio::bind_executor( + strand, + [&](error_code ec) { + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ) + ); c.async_publish( "t_0", "p_0", retain_e::no, {}, - [&](error_code ec) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - } + asio::bind_executor( + strand, + [&](error_code ec) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ) ); c.async_publish( "t_1", "p_1", retain_e::no, {}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - } + asio::bind_executor( + strand, + [&](error_code ec, reason_code rc, auto) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rc, rc.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ) ); c.async_publish( "t_2", "p_2", retain_e::no, {}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - } + asio::bind_executor( + strand, + [&](error_code ec, reason_code rc, auto) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rc, rc.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ) ); c.async_subscribe( subscribe_topic { "t_0", {} }, {}, - [&](error_code ec, std::vector rcs, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - } + asio::bind_executor( + strand, + [&](error_code ec, std::vector rcs, auto) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ) ); c.async_receive( - [&]( - error_code ec, - std::string rec_topic, std::string rec_payload, - publish_props - ) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_EQUAL("t_0", rec_topic); - BOOST_CHECK_EQUAL("p_0", rec_payload); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - c.async_unsubscribe( - "t_0", {}, - [&](error_code ec, std::vector rcs, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - c.async_publish( - "t_1", "p_1", retain_e::no, {}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - BOOST_CHECK_EQUAL(rc, reason_codes::empty); - BOOST_CHECK(strand.running_in_this_thread()); - ++handlers_called; - } - ); - c.async_disconnect( - [&](error_code ec) { + asio::bind_executor( + strand, + [&]( + error_code ec, + std::string rec_topic, std::string rec_payload, + publish_props + ) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL("t_0", rec_topic); + BOOST_CHECK_EQUAL("p_0", rec_payload); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + + c.async_unsubscribe( + "t_0", {}, + asio::bind_executor( + strand, + [&](error_code ec, std::vector rcs, auto) { BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); BOOST_CHECK(strand.running_in_this_thread()); ++handlers_called; + + c.async_disconnect( + asio::bind_executor( + strand, + [&](error_code ec) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ) + ); } - ); - } - ); - } + ) + ); + } + ) ); ioc.run_for(500ms);