async_run's associated ex will not replace mqtt_client's default ex

Summary: related to T13767

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Differential Revision: https://repo.mireo.local/D29383
This commit is contained in:
Korina Šimičević
2024-05-10 15:12:18 +02:00
parent 794f48e915
commit b2338d4135
8 changed files with 136 additions and 202 deletions

View File

@ -15,27 +15,19 @@ and will be used for the execution of all the intermediate operations and a fina
that have not bound an executor. that have not bound an executor.
If an executor is bound to an asynchronous operation, that executor will be used instead. If an executor is bound to an asynchronous operation, that executor will be used instead.
In this context, the [refmem mqtt_client async_run] operation is particularly important. The [refmem mqtt_client async_run] operation starts the __Client__, which initiates a series of internal asynchronous operations, all of which need an executor.
It starts the __Client__, which initiates a series of internal asynchronous operations, all of which need an executor.
If the [refmem mqtt_client async_run] is called with a completion handler that has an associated executor, If the [refmem mqtt_client async_run] is called with a completion handler that has an associated executor,
then all the internal asynchronous operations will also be associated with the same executor. then all the internal asynchronous operations will associate the same executor.
Otherwise, the default executor from the __Client__'s construction will be used instead.
[note
If the [refmem mqtt_client async_run]'s completion handler has an associated executor,
*the associated executor will become the new default associated executor* instead of the executor provided in the constructor.
]
[important [important
The same executor *must* execute [refmem mqtt_client async_run] and all the subsequent async_xxx operations. The same executor *must* execute [refmem mqtt_client async_run] and all the subsequent `async_xxx` operations.
] ]
The following examples will demonstrate the previously described interactions. The following examples will demonstrate the previously described interactions.
[heading Example 1: using the constructor's executor as the default associated executor] [heading Example: using the constructor's executor as the default associated executor]
In this code snippet, the __Client__ is constructed with a strand In this code snippet, the __Client__ is constructed with a strand.
without explicitly associating an executor with the completion handler of [refmem mqtt_client async_run].
Consequently, the __Client__ adopts the strand as its new default executor, Consequently, the __Client__ adopts the strand as its new default executor,
which is used to execute the [refmem mqtt_client async_publish] operation. which is used to execute the [refmem mqtt_client async_publish] operation.
@ -66,76 +58,4 @@ int main() {
``` ```
[heading Example 2: binding an executor to async_run's handler overrides the default associated executor]
In this code snippet, the __Client__ is constructed with __IOC__'s executor
with explicitly associating an executor (strand) with the completion handler of [refmem mqtt_client async_run].
Consequently, the __Client__ adopts the strand as its new default executor,
which is used to execute the [refmem mqtt_client async_publish] operation.
```
int main() {
boost::asio::io_context ioc;
// Create the Client with io_context's executor.
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc.get_executor());
auto strand = boost::asio::make_strand(ioc.get_executor());
client.brokers("<your-mqtt-broker>", 1883)
// Bind the strand to async_run's completion handler.
// Strand is now the default associated executor.
.async_run(boost::asio::bind_executor(strand, boost::asio::detached));
client.async_publish<async_mqtt5::qos_e::at_most_once>(
"<topic>", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
[&client, &strand](async_mqtt5::error_code /* ec */) {
assert(strand.running_in_this_thread());
client.cancel();
}
);
ioc.run();
}
```
[heading Example 3: binding an executor to a async_xxx call]
In this code snippet, the __Client__ is constructed with __IOC__'s executor
without explicitly associating an executor with the completion handler of [refmem mqtt_client async_run].
The [refmem mqtt_client async_publish] operation bound the strand as the associated executor.
Therefore, all the intermediate operations and the final completion handler will be executed
on the strand.
[warning
This example only serves as a demonstration and should *not* be used.
It is *not* recommended that [refmem mqtt_client async_run] and other async_xxx operations execute on different executors!
]
```
int main() {
boost::asio::io_context ioc;
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc.get_executor());
auto strand = boost::asio::make_strand(ioc.get_executor());
client.brokers("<your-mqtt-broker>", 1883)
.async_run(boost::asio::detached);
client.async_publish<async_mqtt5::qos_e::at_most_once>(
"<topic>", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
boost::asio::bind_executor(
strand,
[&client, &strand](async_mqtt5::error_code /* ec */) {
assert(strand.running_in_this_thread());
client.cancel();
}
)
);
ioc.run();
}
```
[endsect] [/executors] [endsect] [/executors]

View File

@ -28,15 +28,10 @@ boost::asio::awaitable<void> publish_hello_world(
assert(strand.running_in_this_thread()); assert(strand.running_in_this_thread());
// All these function calls will be executed by the strand that is executing the coroutine. // All these function calls will be executed by the strand that is executing the coroutine.
// All the completion handler's associated executors will be the same strand // All the completion handler's associated executors will be that same strand
// because the Client was constructed with it as the default associated executor, // because the Client was constructed with it as the default associated executor.
// and no executors were associated with the async_run call. client.brokers("<your-mqtt-broker>", 1883)
.async_run(boost::asio::detached);
// Note: you can spawn the coroutine in another strand that is different
// from the one used in constructing the Client.
// However, then you must bind the second strand to async_run's handler.
client.async_run(boost::asio::detached);
auto&& [ec, rc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>( auto&& [ec, rc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"<your-mqtt-topic>", "Hello world!", async_mqtt5::retain_e::no, "<your-mqtt-topic>", "Hello world!", async_mqtt5::retain_e::no,
@ -66,8 +61,6 @@ int main() {
// Create the Client with the explicit strand as the default associated executor. // Create the Client with the explicit strand as the default associated executor.
client_type client(strand); client_type client(strand);
client.brokers("<your-mqtt-broker>", 1883);
// Spawn the coroutine. // Spawn the coroutine.
// The executor that executes the coroutine must be the same executor // The executor that executes the coroutine must be the same executor
// that is the Client's default associated executor. // that is the Client's default associated executor.

View File

@ -223,10 +223,10 @@ private:
template <typename ClientService, typename Handler> template <typename ClientService, typename Handler>
friend class assemble_op; friend class assemble_op;
template <typename ClientService> template <typename ClientService, typename Executor>
friend class ping_op; friend class ping_op;
template <typename ClientService> template <typename ClientService, typename Executor>
friend class sentry_op; friend class sentry_op;
template <typename ClientService> template <typename ClientService>
@ -369,7 +369,6 @@ public:
template <typename Handler> template <typename Handler>
void run(Handler&& handler) { void run(Handler&& handler) {
_executor = asio::get_associated_executor(handler, _executor);
_run_handler = std::move(handler); _run_handler = std::move(handler);
auto slot = asio::get_associated_cancellation_slot(_run_handler); auto slot = asio::get_associated_cancellation_slot(_run_handler);
if (slot.is_connected()) { if (slot.is_connected()) {

View File

@ -20,20 +20,28 @@ namespace async_mqtt5::detail {
namespace asio = boost::asio; namespace asio = boost::asio;
template <typename ClientService> template <typename ClientService, typename Executor>
class ping_op { class ping_op {
public:
using executor_type = Executor;
private:
using client_service = ClientService; using client_service = ClientService;
struct on_timer {}; struct on_timer {};
struct on_pingreq {}; struct on_pingreq {};
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
executor_type _executor;
std::unique_ptr<asio::steady_timer> _ping_timer; std::unique_ptr<asio::steady_timer> _ping_timer;
asio::cancellation_state _cancellation_state; asio::cancellation_state _cancellation_state;
public: public:
ping_op(const std::shared_ptr<client_service>& svc_ptr) : ping_op(
_svc_ptr(svc_ptr), const std::shared_ptr<client_service>& svc_ptr,
_ping_timer(new asio::steady_timer(svc_ptr->get_executor())), const executor_type& ex
) :
_svc_ptr(svc_ptr), _executor(ex),
_ping_timer(new asio::steady_timer(_svc_ptr->get_executor())),
_cancellation_state( _cancellation_state(
svc_ptr->_cancel_ping.slot(), svc_ptr->_cancel_ping.slot(),
asio::enable_total_cancellation {}, asio::enable_total_cancellation {},
@ -44,9 +52,8 @@ public:
ping_op(ping_op&&) noexcept = default; ping_op(ping_op&&) noexcept = default;
ping_op(const ping_op&) = delete; ping_op(const ping_op&) = delete;
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _svc_ptr->get_executor(); return _executor;
} }
using allocator_type = asio::recycling_allocator<void>; using allocator_type = asio::recycling_allocator<void>;

View File

@ -20,26 +20,31 @@ namespace async_mqtt5::detail {
namespace asio = boost::asio; namespace asio = boost::asio;
template <typename ClientService> template <typename ClientService, typename Executor>
class read_message_op { class read_message_op {
public:
using executor_type = Executor;
private:
using client_service = ClientService; using client_service = ClientService;
struct on_message {}; struct on_message {};
struct on_disconnect {}; struct on_disconnect {};
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
executor_type _executor;
public: public:
read_message_op( read_message_op(
const std::shared_ptr<client_service>& svc_ptr const std::shared_ptr<client_service>& svc_ptr,
const executor_type& ex
) : ) :
_svc_ptr(svc_ptr) _svc_ptr(svc_ptr), _executor(ex)
{} {}
read_message_op(read_message_op&&) noexcept = default; read_message_op(read_message_op&&) noexcept = default;
read_message_op(const read_message_op&) = delete; read_message_op(const read_message_op&) = delete;
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _svc_ptr->get_executor(); return _executor;
} }
using allocator_type = asio::recycling_allocator<void>; using allocator_type = asio::recycling_allocator<void>;

View File

@ -16,31 +16,36 @@ namespace async_mqtt5::detail {
namespace asio = boost::asio; namespace asio = boost::asio;
template <typename ClientService> template <typename ClientService, typename Executor>
class sentry_op { class sentry_op {
public:
using executor_type = Executor;
private:
using client_service = ClientService; using client_service = ClientService;
struct on_timer {}; struct on_timer {};
struct on_disconnect {}; struct on_disconnect {};
static constexpr auto check_interval = std::chrono::seconds(3); static constexpr auto check_interval = std::chrono::seconds(3);
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
executor_type _executor;
std::unique_ptr<asio::steady_timer> _sentry_timer; std::unique_ptr<asio::steady_timer> _sentry_timer;
public: public:
sentry_op( sentry_op(
const std::shared_ptr<client_service>& svc_ptr const std::shared_ptr<client_service>& svc_ptr,
const executor_type& ex
) : ) :
_svc_ptr(svc_ptr), _svc_ptr(svc_ptr), _executor(ex),
_sentry_timer(new asio::steady_timer(svc_ptr->get_executor())) _sentry_timer(new asio::steady_timer(_svc_ptr->get_executor()))
{} {}
sentry_op(sentry_op&&) noexcept = default; sentry_op(sentry_op&&) noexcept = default;
sentry_op(const sentry_op&) = delete; sentry_op(const sentry_op&) = delete;
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _svc_ptr->get_executor(); return _executor;
} }
using allocator_type = asio::recycling_allocator<void>; using allocator_type = asio::recycling_allocator<void>;

View File

@ -180,11 +180,12 @@ public:
using Signature = void(error_code); using Signature = void(error_code);
auto initiation = [] (auto handler, const impl_type& impl) { auto initiation = [] (auto handler, const impl_type& impl) {
impl->run(std::move(handler)); auto ex = asio::get_associated_executor(handler, impl->get_executor());
detail::ping_op { impl }.perform(); impl->run(std::move(handler));
detail::read_message_op { impl }.perform(); detail::ping_op { impl, ex }.perform();
detail::sentry_op { impl }.perform(); detail::read_message_op { impl, ex }.perform();
detail::sentry_op { impl, ex }.perform();
}; };
return asio::async_initiate<CompletionToken, Signature>( return asio::async_initiate<CompletionToken, Signature>(

View File

@ -17,41 +17,31 @@ using namespace async_mqtt5;
BOOST_AUTO_TEST_SUITE(executors) BOOST_AUTO_TEST_SUITE(executors)
BOOST_AUTO_TEST_CASE(async_run) { BOOST_AUTO_TEST_CASE(bind_executor) {
using test::after; using test::after;
using namespace std::chrono; using namespace std::chrono;
constexpr int expected_handlers_called = 9; constexpr int expected_handlers_called = 8;
int handlers_called = 0; int handlers_called = 0;
// packets // packets
auto connect = encoders::encode_connect( auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
); );
auto connack = encoders::encode_connack( auto connack = encoders::encode_connack(false, reason_codes::success.value(), {});
false, reason_codes::success.value(), {}
);
auto publish_0 = encoders::encode_publish( auto publish_0 = encoders::encode_publish(
0, "t_0", "p_0", qos_e::at_most_once, retain_e::no, dup_e::no, {} 0, "t_0", "p_0", qos_e::at_most_once, retain_e::no, dup_e::no, {}
); );
auto publish_1 = encoders::encode_publish( auto publish_1 = encoders::encode_publish(
1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {} 1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {}
); );
auto puback = encoders::encode_puback( auto puback = encoders::encode_puback(1, reason_codes::success.value(), {});
1, reason_codes::success.value(), {}
);
auto publish_2 = encoders::encode_publish( auto publish_2 = encoders::encode_publish(
2, "t_2", "p_2", qos_e::exactly_once, retain_e::no, dup_e::no, {} 2, "t_2", "p_2", qos_e::exactly_once, retain_e::no, dup_e::no, {}
); );
auto pubrec = encoders::encode_pubrec( auto pubrec = encoders::encode_pubrec(2, reason_codes::success.value(), {});
2, reason_codes::success.value(), {} auto pubrel = encoders::encode_pubrel(2, reason_codes::success.value(), {});
); auto pubcomp = encoders::encode_pubcomp(2, reason_codes::success.value(), {});
auto pubrel = encoders::encode_pubrel(
2, reason_codes::success.value(), {}
);
auto pubcomp = encoders::encode_pubcomp(
2, reason_codes::success.value(), {}
);
auto subscribe = encoders::encode_subscribe( auto subscribe = encoders::encode_subscribe(
3, std::vector<subscribe_topic> { { "t_0", {} } }, {} 3, std::vector<subscribe_topic> { { "t_0", {} } }, {}
); );
@ -85,8 +75,6 @@ BOOST_AUTO_TEST_CASE(async_run) {
.expect(unsubscribe) .expect(unsubscribe)
.complete_with(success, after(0ms)) .complete_with(success, after(0ms))
.reply_with(unsuback, after(0ms)) .reply_with(unsuback, after(0ms))
.expect(publish_1)
.complete_with(success, after(0ms))
.expect(disconnect) .expect(disconnect)
.complete_with(success, after(0ms)) .complete_with(success, after(0ms))
; ;
@ -102,91 +90,107 @@ BOOST_AUTO_TEST_CASE(async_run) {
using client_type = mqtt_client<test::test_stream>; using client_type = mqtt_client<test::test_stream>;
client_type c(executor); client_type c(executor);
c.brokers("127.0.0.1") c.brokers("127.0.0.1")
.async_run(asio::bind_executor( .async_run(
strand, asio::bind_executor(
[&](error_code ec) { strand,
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); [&](error_code ec) {
BOOST_CHECK(strand.running_in_this_thread()); BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
++handlers_called; BOOST_CHECK(strand.running_in_this_thread());
} ++handlers_called;
)); }
)
);
c.async_publish<qos_e::at_most_once>( c.async_publish<qos_e::at_most_once>(
"t_0", "p_0", retain_e::no, {}, "t_0", "p_0", retain_e::no, {},
[&](error_code ec) { asio::bind_executor(
BOOST_CHECK_MESSAGE(!ec, ec.message()); strand,
BOOST_CHECK(strand.running_in_this_thread()); [&](error_code ec) {
++handlers_called; BOOST_CHECK_MESSAGE(!ec, ec.message());
} BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
)
); );
c.async_publish<qos_e::at_least_once>( c.async_publish<qos_e::at_least_once>(
"t_1", "p_1", retain_e::no, {}, "t_1", "p_1", retain_e::no, {},
[&](error_code ec, reason_code rc, auto) { asio::bind_executor(
BOOST_CHECK_MESSAGE(!ec, ec.message()); strand,
BOOST_CHECK_MESSAGE(!rc, rc.message()); [&](error_code ec, reason_code rc, auto) {
BOOST_CHECK(strand.running_in_this_thread()); BOOST_CHECK_MESSAGE(!ec, ec.message());
++handlers_called; BOOST_CHECK_MESSAGE(!rc, rc.message());
} BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
)
); );
c.async_publish<qos_e::exactly_once>( c.async_publish<qos_e::exactly_once>(
"t_2", "p_2", retain_e::no, {}, "t_2", "p_2", retain_e::no, {},
[&](error_code ec, reason_code rc, auto) { asio::bind_executor(
BOOST_CHECK_MESSAGE(!ec, ec.message()); strand,
BOOST_CHECK_MESSAGE(!rc, rc.message()); [&](error_code ec, reason_code rc, auto) {
BOOST_CHECK(strand.running_in_this_thread()); BOOST_CHECK_MESSAGE(!ec, ec.message());
++handlers_called; BOOST_CHECK_MESSAGE(!rc, rc.message());
} BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
)
); );
c.async_subscribe( c.async_subscribe(
subscribe_topic { "t_0", {} }, {}, subscribe_topic { "t_0", {} }, {},
[&](error_code ec, std::vector<reason_code> rcs, auto) { asio::bind_executor(
BOOST_CHECK_MESSAGE(!ec, ec.message()); strand,
BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); [&](error_code ec, std::vector<reason_code> rcs, auto) {
BOOST_CHECK(strand.running_in_this_thread()); BOOST_CHECK_MESSAGE(!ec, ec.message());
++handlers_called; BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message());
} BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
)
); );
c.async_receive( c.async_receive(
[&]( asio::bind_executor(
error_code ec, strand,
std::string rec_topic, std::string rec_payload, [&](
publish_props error_code ec,
) { std::string rec_topic, std::string rec_payload,
BOOST_CHECK_MESSAGE(!ec, ec.message()); publish_props
BOOST_CHECK_EQUAL("t_0", rec_topic); ) {
BOOST_CHECK_EQUAL("p_0", rec_payload); BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK(strand.running_in_this_thread()); BOOST_CHECK_EQUAL("t_0", rec_topic);
++handlers_called; BOOST_CHECK_EQUAL("p_0", rec_payload);
c.async_unsubscribe( BOOST_CHECK(strand.running_in_this_thread());
"t_0", {}, ++handlers_called;
[&](error_code ec, std::vector<reason_code> rcs, auto) {
BOOST_CHECK_MESSAGE(!ec, ec.message()); c.async_unsubscribe(
BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); "t_0", {},
BOOST_CHECK(strand.running_in_this_thread()); asio::bind_executor(
++handlers_called; strand,
c.async_publish<qos_e::at_least_once>( [&](error_code ec, std::vector<reason_code> rcs, auto) {
"t_1", "p_1", retain_e::no, {},
[&](error_code ec, reason_code rc, auto) {
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
BOOST_CHECK_EQUAL(rc, reason_codes::empty);
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
);
c.async_disconnect(
[&](error_code ec) {
BOOST_CHECK_MESSAGE(!ec, ec.message()); BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message());
BOOST_CHECK(strand.running_in_this_thread()); BOOST_CHECK(strand.running_in_this_thread());
++handlers_called; ++handlers_called;
c.async_disconnect(
asio::bind_executor(
strand,
[&](error_code ec) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
)
);
} }
); )
} );
); }
} )
); );
ioc.run_for(500ms); ioc.run_for(500ms);