diff --git a/include/async_mqtt5/detail/cancellable_handler.hpp b/include/async_mqtt5/detail/cancellable_handler.hpp index cbfee05..6188750 100644 --- a/include/async_mqtt5/detail/cancellable_handler.hpp +++ b/include/async_mqtt5/detail/cancellable_handler.hpp @@ -11,8 +11,9 @@ #include #include #include +#include #include -#include +#include #include #include @@ -41,11 +42,6 @@ public: cancellable_handler(cancellable_handler&&) = default; cancellable_handler(const cancellable_handler&) = delete; - using executor_type = tracking_type; - executor_type get_executor() const noexcept { - return _handler_ex; - } - using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); @@ -56,7 +52,20 @@ public: return _cancellation_state.slot(); } - asio::cancellation_type_t cancelled() const noexcept { + using executor_type = tracking_type; + executor_type get_executor() const noexcept { + return _handler_ex; + } + + using immediate_executor_type = + asio::associated_immediate_executor_t; + immediate_executor_type get_immediate_executor() const noexcept { + // get_associated_immediate_executor will require asio::execution::blocking.never + // on the default executor. + return asio::get_associated_immediate_executor(_handler, _executor); + } + + asio::cancellation_type_t cancelled() const { return _cancellation_state.cancelled(); } @@ -67,10 +76,11 @@ public: } template - void complete_post(Args&&... args) { + void complete_immediate(Args&&... args) { asio::get_associated_cancellation_slot(_handler).clear(); - asio::post( - _executor, + auto ex = get_immediate_executor(); + asio::dispatch( + ex, asio::prepend(std::move(_handler), std::forward(args)...) ); } diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index d325446..8da1595 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -8,6 +8,9 @@ #ifndef ASYNC_MQTT5_DISCONNECT_OP_HPP #define ASYNC_MQTT5_DISCONNECT_OP_HPP +#include +#include +#include #include #include #include @@ -65,20 +68,23 @@ public: disconnect_op(disconnect_op&&) = default; disconnect_op(const disconnect_op&) = delete; - using executor_type = asio::associated_executor_t; - executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); - } + disconnect_op& operator=(disconnect_op&&) noexcept = default; + disconnect_op& operator=(const disconnect_op&) = delete; using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } + using executor_type = asio::associated_executor_t; + executor_type get_executor() const noexcept { + return asio::get_associated_executor(_handler); + } + void perform() { error_code ec = validate_disconnect(_context.props); if (ec) - return complete_post(ec); + return complete_immediate(ec); auto disconnect = control_packet::of( no_pid, get_allocator(), @@ -162,8 +168,8 @@ private: _handler.complete(ec); } - void complete_post(error_code ec) { - _handler.complete_post(ec); + void complete_immediate(error_code ec) { + _handler.complete_immediate(ec); } }; @@ -194,10 +200,8 @@ public: terminal_disconnect_op(terminal_disconnect_op&&) = default; terminal_disconnect_op(const terminal_disconnect_op&) = delete; - using executor_type = asio::associated_executor_t; - executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); - } + terminal_disconnect_op& operator=(terminal_disconnect_op&&) noexcept = default; + terminal_disconnect_op& operator=(const terminal_disconnect_op&) = delete; using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { @@ -209,6 +213,11 @@ public: return asio::get_associated_cancellation_slot(_handler); } + using executor_type = asio::associated_executor_t; + executor_type get_executor() const noexcept { + return asio::get_associated_executor(_handler); + } + template void perform(DisconnectContext&& context) { namespace asioex = boost::asio::experimental; diff --git a/include/async_mqtt5/impl/publish_send_op.hpp b/include/async_mqtt5/impl/publish_send_op.hpp index b7f9333..9b26506 100644 --- a/include/async_mqtt5/impl/publish_send_op.hpp +++ b/include/async_mqtt5/impl/publish_send_op.hpp @@ -8,6 +8,8 @@ #ifndef ASYNC_MQTT5_PUBLISH_SEND_OP_HPP #define ASYNC_MQTT5_PUBLISH_SEND_OP_HPP +#include +#include #include #include @@ -86,19 +88,22 @@ public: }); } - publish_send_op(publish_send_op&& other) = default; + publish_send_op(publish_send_op&&) = default; publish_send_op(const publish_send_op&) = delete; - using executor_type = asio::associated_executor_t; - executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); - } + publish_send_op& operator=(publish_send_op&&) noexcept = default; + publish_send_op& operator=(const publish_send_op&) = delete; using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } + using executor_type = asio::associated_executor_t; + executor_type get_executor() const noexcept { + return asio::get_associated_executor(_handler); + } + void perform( std::string topic, std::string payload, retain_e retain, const publish_props& props @@ -107,12 +112,12 @@ public: if constexpr (qos_type != qos_e::at_most_once) { packet_id = _svc_ptr->allocate_pid(); if (packet_id == 0) - return complete_post(client::error::pid_overrun, packet_id); + return complete_immediate(client::error::pid_overrun, packet_id); } auto ec = validate_publish(topic, payload, retain, props); if (ec) - return complete_post(ec, packet_id); + return complete_immediate(ec, packet_id); _serial_num = _svc_ptr->next_serial_num(); @@ -126,7 +131,7 @@ public: auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size) .value_or(default_max_send_size); if (publish.size() > max_packet_size) - return complete_post(client::error::packet_too_large, packet_id); + return complete_immediate(client::error::packet_too_large, packet_id); send_publish(std::move(publish)); } @@ -429,8 +434,8 @@ private: qos_e q = qos_type, std::enable_if_t = true > - void complete_post(error_code ec, uint16_t) { - _handler.complete_post(ec); + void complete_immediate(error_code ec, uint16_t) { + _handler.complete_immediate(ec); } template < @@ -457,10 +462,10 @@ private: bool > = true > - void complete_post(error_code ec, uint16_t packet_id) { + void complete_immediate(error_code ec, uint16_t packet_id) { if (packet_id != 0) _svc_ptr->free_pid(packet_id, false); - _handler.complete_post(ec, reason_codes::empty, Props {}); + _handler.complete_immediate(ec, reason_codes::empty, Props {}); } }; diff --git a/include/async_mqtt5/impl/subscribe_op.hpp b/include/async_mqtt5/impl/subscribe_op.hpp index aa3c6e0..71027cd 100644 --- a/include/async_mqtt5/impl/subscribe_op.hpp +++ b/include/async_mqtt5/impl/subscribe_op.hpp @@ -11,6 +11,8 @@ #include #include +#include +#include #include #include @@ -66,16 +68,19 @@ public: subscribe_op(subscribe_op&&) = default; subscribe_op(const subscribe_op&) = delete; - using executor_type = asio::associated_executor_t; - executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); - } + subscribe_op& operator=(subscribe_op&&) noexcept = default; + subscribe_op& operator=(const subscribe_op&) = delete; using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } + using executor_type = asio::associated_executor_t; + executor_type get_executor() const noexcept { + return asio::get_associated_executor(_handler); + } + void perform( const std::vector& topics, const subscribe_props& props @@ -84,14 +89,14 @@ public: uint16_t packet_id = _svc_ptr->allocate_pid(); if (packet_id == 0) - return complete_post(client::error::pid_overrun, packet_id); + return complete_immediate(client::error::pid_overrun, packet_id); if (_num_topics == 0) - return complete_post(client::error::invalid_topic, packet_id); + return complete_immediate(client::error::invalid_topic, packet_id); auto ec = validate_subscribe(topics, props); if (ec) - return complete_post(ec, packet_id); + return complete_immediate(ec, packet_id); auto subscribe = control_packet::of( with_pid, get_allocator(), @@ -102,7 +107,7 @@ public: auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size) .value_or(default_max_send_size); if (subscribe.size() > max_packet_size) - return complete_post(client::error::packet_too_large, packet_id); + return complete_immediate(client::error::packet_too_large, packet_id); send_subscribe(std::move(subscribe)); } @@ -267,10 +272,10 @@ private: ); } - void complete_post(error_code ec, uint16_t packet_id) { + void complete_immediate(error_code ec, uint16_t packet_id) { if (packet_id != 0) _svc_ptr->free_pid(packet_id); - _handler.complete_post( + _handler.complete_immediate( ec, std::vector(_num_topics, reason_codes::empty), suback_props {} ); diff --git a/include/async_mqtt5/impl/unsubscribe_op.hpp b/include/async_mqtt5/impl/unsubscribe_op.hpp index eba579a..e1fdf93 100644 --- a/include/async_mqtt5/impl/unsubscribe_op.hpp +++ b/include/async_mqtt5/impl/unsubscribe_op.hpp @@ -9,6 +9,7 @@ #define ASYNC_MQTT5_UNSUBSCRIBE_OP_HPP #include +#include #include #include @@ -61,16 +62,19 @@ public: unsubscribe_op(unsubscribe_op&&) = default; unsubscribe_op(const unsubscribe_op&) = delete; - using executor_type = asio::associated_executor_t; - executor_type get_executor() const noexcept { - return asio::get_associated_executor(_handler); - } + unsubscribe_op& operator=(unsubscribe_op&&) noexcept = default; + unsubscribe_op& operator=(const unsubscribe_op&) = delete; using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } + using executor_type = asio::associated_executor_t; + executor_type get_executor() const noexcept { + return asio::get_associated_executor(_handler); + } + void perform( const std::vector& topics, const unsubscribe_props& props @@ -79,14 +83,14 @@ public: uint16_t packet_id = _svc_ptr->allocate_pid(); if (packet_id == 0) - return complete_post(client::error::pid_overrun, packet_id); + return complete_immediate(client::error::pid_overrun, packet_id); if (_num_topics == 0) - return complete_post(client::error::invalid_topic, packet_id); + return complete_immediate(client::error::invalid_topic, packet_id); auto ec = validate_unsubscribe(topics, props); if (ec) - return complete_post(ec, packet_id); + return complete_immediate(ec, packet_id); auto unsubscribe = control_packet::of( with_pid, get_allocator(), @@ -97,7 +101,7 @@ public: auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size) .value_or(default_max_send_size); if (unsubscribe.size() > max_packet_size) - return complete_post(client::error::packet_too_large, packet_id); + return complete_immediate(client::error::packet_too_large, packet_id); send_unsubscribe(std::move(unsubscribe)); } @@ -212,10 +216,10 @@ private: ); } - void complete_post(error_code ec, uint16_t packet_id) { + void complete_immediate(error_code ec, uint16_t packet_id) { if (packet_id != 0) _svc_ptr->free_pid(packet_id); - _handler.complete_post( + _handler.complete_immediate( ec, std::vector(_num_topics, reason_codes::empty), unsuback_props {} ); diff --git a/test/include/test_common/test_stream.hpp b/test/include/test_common/test_stream.hpp index 0ebb1e0..2425406 100644 --- a/test/include/test_common/test_stream.hpp +++ b/test/include/test_common/test_stream.hpp @@ -8,6 +8,7 @@ #ifndef ASYNC_MQTT5_TEST_TEST_STREAM_HPP #define ASYNC_MQTT5_TEST_TEST_STREAM_HPP +#include #include #include #include @@ -48,7 +49,13 @@ private: friend class write_op; public: - test_stream_impl(executor_type ex) : _ex(std::move(ex)) {} + explicit test_stream_impl(executor_type ex) : _ex(std::move(ex)) {} + + test_stream_impl(test_stream_impl&&) = default; + test_stream_impl(const test_stream_impl&) = delete; + + test_stream_impl& operator=(test_stream_impl&&) = default; + test_stream_impl& operator=(const test_stream_impl&) = delete; executor_type get_executor() const noexcept { return _ex; @@ -128,20 +135,23 @@ public: read_op(read_op&&) = default; read_op(const read_op&) = delete; - using executor_type = test_stream_impl::executor_type; - executor_type get_executor() const noexcept { - return _stream_impl->get_executor(); - } + read_op& operator=(read_op&&) noexcept = default; + read_op& operator=(const read_op&) = delete; using allocator_type = asio::recycling_allocator; allocator_type get_allocator() const noexcept { return allocator_type {}; } + using executor_type = test_stream_impl::executor_type; + executor_type get_executor() const noexcept { + return _stream_impl->get_executor(); + } + template void perform(const BufferType& buffer) { if (!_stream_impl->is_open() || !_stream_impl->is_connected()) - return complete_post(asio::error::not_connected, 0); + return complete_immediate(asio::error::not_connected, 0); _stream_impl->_test_broker->read_from_network( buffer, @@ -156,8 +166,8 @@ public: } private: - void complete_post(error_code ec, size_t bytes_read) { - _handler.complete_post(ec, bytes_read); + void complete_immediate(error_code ec, size_t bytes_read) { + _handler.complete_immediate(ec, bytes_read); } void complete(error_code ec, size_t bytes_read) { @@ -183,16 +193,19 @@ public: write_op(write_op&&) = default; write_op(const write_op&) = delete; - using executor_type = test_stream_impl::executor_type; - executor_type get_executor() const noexcept { - return _stream_impl->get_executor(); - } + write_op& operator=(write_op&&) noexcept = default; + write_op& operator=(const write_op&) = delete; using allocator_type = asio::recycling_allocator; allocator_type get_allocator() const noexcept { return allocator_type {}; } + using executor_type = test_stream_impl::executor_type; + executor_type get_executor() const noexcept { + return _stream_impl->get_executor(); + } + template void perform(const BufferType& buffers) { if (!_stream_impl->is_open() || !_stream_impl->is_connected()) diff --git a/test/integration/executors.cpp b/test/integration/executors.cpp index b35ee24..c2ad328 100644 --- a/test/integration/executors.cpp +++ b/test/integration/executors.cpp @@ -10,9 +10,13 @@ #include #include +#include +#include #include #include +#include + #include #include @@ -101,8 +105,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) { asio::bind_executor( strand, [&](error_code ec) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(ec == asio::error::operation_aborted); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; } ) @@ -113,8 +117,8 @@ BOOST_AUTO_TEST_CASE(bind_executor) { asio::bind_executor( strand, [&](error_code ec) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(!ec); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; } ) @@ -125,9 +129,9 @@ BOOST_AUTO_TEST_CASE(bind_executor) { asio::bind_executor( strand, [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(!ec); + BOOST_TEST(!rc); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; } ) @@ -138,9 +142,9 @@ BOOST_AUTO_TEST_CASE(bind_executor) { asio::bind_executor( strand, [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(!ec); + BOOST_TEST(!rc); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; } ) @@ -151,9 +155,9 @@ BOOST_AUTO_TEST_CASE(bind_executor) { asio::bind_executor( strand, [&](error_code ec, std::vector rcs, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(!ec); + BOOST_TEST(!rcs[0]); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; } ) @@ -167,10 +171,10 @@ BOOST_AUTO_TEST_CASE(bind_executor) { std::string rec_topic, std::string rec_payload, publish_props ) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_EQUAL("t_0", rec_topic); - BOOST_CHECK_EQUAL("p_0", rec_payload); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(!ec); + BOOST_TEST("t_0" == rec_topic); + BOOST_TEST("p_0" == rec_payload); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; c.async_unsubscribe( @@ -178,17 +182,17 @@ BOOST_AUTO_TEST_CASE(bind_executor) { asio::bind_executor( strand, [&](error_code ec, std::vector rcs, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(!ec); + BOOST_TEST(!rcs[0]); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; c.async_disconnect( asio::bind_executor( strand, [&](error_code ec) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK(strand.running_in_this_thread()); + BOOST_TEST(!ec); + BOOST_TEST(strand.running_in_this_thread()); ++handlers_called; } ) @@ -201,8 +205,94 @@ BOOST_AUTO_TEST_CASE(bind_executor) { ); ioc.run_for(500ms); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); + BOOST_TEST(handlers_called == expected_handlers_called); + BOOST_TEST(broker.received_all_expected()); } +BOOST_AUTO_TEST_CASE(immediate_executor_async_publish) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + + using client_type = mqtt_client; + client_type c(ioc.get_executor()); + + auto strand = asio::make_strand(ioc); + + auto handler = asio::bind_immediate_executor( + strand, + [&handlers_called, &strand](error_code ec) { + ++handlers_called; + BOOST_TEST(strand.running_in_this_thread()); + BOOST_TEST(ec); + } + ); + c.async_publish( + "invalid/#", "", retain_e::no, publish_props {}, std::move(handler) + ); + + ioc.run(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(immediate_executor_async_subscribe) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + + using client_type = mqtt_client; + client_type c(ioc.get_executor()); + + auto strand = asio::make_strand(ioc); + + auto handler = asio::bind_immediate_executor( + strand, + [&handlers_called, &strand]( + error_code ec, std::vector, suback_props + ) { + ++handlers_called; + BOOST_TEST(strand.running_in_this_thread()); + BOOST_TEST(ec); + } + ); + c.async_subscribe( + { "+topic" }, subscribe_props {}, std::move(handler) + ); + + ioc.run(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(immediate_executor_async_unsubscribe) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + + using client_type = mqtt_client; + client_type c(ioc.get_executor()); + + auto strand = asio::make_strand(ioc); + + auto handler = asio::bind_immediate_executor( + strand, + [&handlers_called, &strand]( + error_code ec, std::vector, unsuback_props + ) { + ++handlers_called; + BOOST_TEST(strand.running_in_this_thread()); + BOOST_TEST(ec); + } + ); + c.async_unsubscribe( + "some/topic#", unsubscribe_props {}, std::move(handler) + ); + + ioc.run(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + + BOOST_AUTO_TEST_SUITE_END()