diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index 7247279..1f636f5 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -2,7 +2,10 @@ #define ASYNC_MQTT5_DISCONNECT_OP_HPP #include +#include #include +#include +#include #include @@ -45,7 +48,13 @@ public: _svc_ptr(svc_ptr), _context(std::move(context)), _handler(std::move(handler), _svc_ptr->get_executor()) - {} + { + auto slot = asio::get_associated_cancellation_slot(_handler); + if (slot.is_connected()) + slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) { + svc.cancel(); + }); + } disconnect_op(disconnect_op&&) noexcept = default; disconnect_op(const disconnect_op&) = delete; @@ -146,10 +155,88 @@ private: } }; +template +class terminal_disconnect_op { + using client_service = ClientService; + + static constexpr uint8_t seconds = 5; + + std::shared_ptr _svc_ptr; + std::unique_ptr _timer; + + using handler_type = cancellable_handler< + Handler, typename ClientService::executor_type + >; + handler_type _handler; + +public: + terminal_disconnect_op( + const std::shared_ptr& svc_ptr, + Handler&& handler + ) : + _svc_ptr(svc_ptr), + _timer(new asio::steady_timer(_svc_ptr->get_executor())), + _handler(std::move(handler), _svc_ptr->get_executor()) + {} + + terminal_disconnect_op(terminal_disconnect_op&&) noexcept = default; + terminal_disconnect_op(const terminal_disconnect_op&) = delete; + + using executor_type = asio::associated_executor_t; + executor_type get_executor() const noexcept { + return asio::get_associated_executor(_handler); + } + + using allocator_type = asio::associated_allocator_t; + allocator_type get_allocator() const noexcept { + return asio::get_associated_allocator(_handler); + } + + using cancellation_slot_type = asio::associated_cancellation_slot_t; + cancellation_slot_type get_cancellation_slot() const noexcept { + return asio::get_associated_cancellation_slot(_handler); + } + + template + void perform(DisconnectContext&& context) { + namespace asioex = boost::asio::experimental; + + auto init_disconnect = []( + auto handler, disconnect_ctx ctx, + const std::shared_ptr& svc_ptr + ) { + disconnect_op { + svc_ptr, std::move(ctx), std::move(handler) + }.perform(); + }; + + _timer->expires_after(std::chrono::seconds(seconds)); + + auto timed_disconnect = asioex::make_parallel_group( + asio::async_initiate( + init_disconnect, asio::deferred, + std::forward(context), _svc_ptr + ), + _timer->async_wait(asio::deferred) + ); + + timed_disconnect.async_wait( + asioex::wait_for_one(), asio::prepend(std::move(*this)) + ); + } + + void operator()( + std::array /* ord */, + error_code disconnect_ec, error_code /* timer_ec */ + ) { + _handler.complete(disconnect_ec); + } +}; + template decltype(auto) async_disconnect( disconnect_rc_e reason_code, const disconnect_props& props, - bool terminal, const std::shared_ptr& svc_ptr, + const std::shared_ptr& svc_ptr, CompletionToken&& token ) { using Signature = void (error_code); @@ -165,11 +252,34 @@ decltype(auto) async_disconnect( return asio::async_initiate( initiation, token, - disconnect_ctx { reason_code, props, terminal }, + disconnect_ctx { reason_code, props, false }, svc_ptr ); } +template +decltype(auto) async_terminal_disconnect( + disconnect_rc_e reason_code, const disconnect_props& props, + const std::shared_ptr& svc_ptr, + CompletionToken&& token +) { + using Signature = void (error_code); + + auto initiation = []( + auto handler, disconnect_ctx ctx, + const std::shared_ptr& svc_ptr + ) { + terminal_disconnect_op { + svc_ptr, std::move(handler) + }.perform(std::move(ctx)); + }; + + return asio::async_initiate( + initiation, token, + disconnect_ctx { reason_code, props, true }, + svc_ptr + ); +} } // end namespace async_mqtt5::detail diff --git a/include/async_mqtt5/impl/publish_rec_op.hpp b/include/async_mqtt5/impl/publish_rec_op.hpp index 41684de..29900ea 100644 --- a/include/async_mqtt5/impl/publish_rec_op.hpp +++ b/include/async_mqtt5/impl/publish_rec_op.hpp @@ -191,7 +191,7 @@ private: props[prop::reason_string] = reason; return async_disconnect( disconnect_rc_e::malformed_packet, props, - false, _svc_ptr, asio::detached + _svc_ptr, asio::detached ); } diff --git a/include/async_mqtt5/impl/publish_send_op.hpp b/include/async_mqtt5/impl/publish_send_op.hpp index 3e6291a..af8e185 100644 --- a/include/async_mqtt5/impl/publish_send_op.hpp +++ b/include/async_mqtt5/impl/publish_send_op.hpp @@ -400,7 +400,7 @@ private: auto props = disconnect_props {}; props[prop::reason_string] = reason; async_disconnect( - disconnect_rc_e::malformed_packet, props, false, _svc_ptr, + disconnect_rc_e::malformed_packet, props, _svc_ptr, asio::detached ); } diff --git a/include/async_mqtt5/impl/re_auth_op.hpp b/include/async_mqtt5/impl/re_auth_op.hpp index bfbbdc4..e3562d6 100644 --- a/include/async_mqtt5/impl/re_auth_op.hpp +++ b/include/async_mqtt5/impl/re_auth_op.hpp @@ -125,7 +125,7 @@ private: auto props = disconnect_props {}; props[prop::reason_string] = std::move(message); - async_disconnect(reason, props, false, _svc_ptr, asio::detached); + async_disconnect(reason, props, _svc_ptr, asio::detached); } }; diff --git a/include/async_mqtt5/impl/read_message_op.hpp b/include/async_mqtt5/impl/read_message_op.hpp index 6be402f..87a8543 100644 --- a/include/async_mqtt5/impl/read_message_op.hpp +++ b/include/async_mqtt5/impl/read_message_op.hpp @@ -127,7 +127,7 @@ private: auto svc_ptr = _svc_ptr; // copy before this is moved async_disconnect( - disconnect_rc_e::malformed_packet, props, false, svc_ptr, + disconnect_rc_e::malformed_packet, props, svc_ptr, asio::prepend(std::move(*this), on_disconnect {}) ); } diff --git a/include/async_mqtt5/impl/sentry_op.hpp b/include/async_mqtt5/impl/sentry_op.hpp index 14211e4..f13e0f4 100644 --- a/include/async_mqtt5/impl/sentry_op.hpp +++ b/include/async_mqtt5/impl/sentry_op.hpp @@ -70,7 +70,7 @@ public: props[prop::reason_string] = "No reply received within 20 seconds"; auto svc_ptr = _svc_ptr; return async_disconnect( - disconnect_rc_e::unspecified_error, props, false, svc_ptr, + disconnect_rc_e::unspecified_error, props, svc_ptr, asio::prepend(std::move(*this), on_disconnect {}) ); } diff --git a/include/async_mqtt5/impl/subscribe_op.hpp b/include/async_mqtt5/impl/subscribe_op.hpp index 0396033..7829b32 100644 --- a/include/async_mqtt5/impl/subscribe_op.hpp +++ b/include/async_mqtt5/impl/subscribe_op.hpp @@ -255,7 +255,7 @@ private: auto props = disconnect_props {}; props[prop::reason_string] = reason; async_disconnect( - disconnect_rc_e::malformed_packet, props, false, _svc_ptr, + disconnect_rc_e::malformed_packet, props, _svc_ptr, asio::detached ); } diff --git a/include/async_mqtt5/impl/unsubscribe_op.hpp b/include/async_mqtt5/impl/unsubscribe_op.hpp index eef8014..54f6e0d 100644 --- a/include/async_mqtt5/impl/unsubscribe_op.hpp +++ b/include/async_mqtt5/impl/unsubscribe_op.hpp @@ -200,7 +200,7 @@ private: auto props = disconnect_props {}; props[prop::reason_string] = reason; async_disconnect( - disconnect_rc_e::malformed_packet, props, false, _svc_ptr, + disconnect_rc_e::malformed_packet, props, _svc_ptr, asio::detached ); } diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index c8231d3..d817f40 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -806,16 +806,14 @@ public: * \brief Disconnect the Client by sending a \__DISCONNECT\__ packet * with a specified Reason Code. This function has terminal effects. * - * \details Send a \__DISCONNECT\__ packet to the Broker with a Reason Code - * describing the reason for disconnection. + * \details The Client will attempt to send a \__DISCONNECT\__ packet to the Broker + * with a Reason Code describing the reason for disconnection. + * If the \__DISCONNECT\__ packet is successfully transmitted, + * or if `5 seconds` elapsed without a successful send, the Client will terminate the connection. * * \attention This function has terminal effects and will close the Client. * See \ref mqtt_client::cancel. * - * \note If you wish to close the Client regardless of its state, - * prefer calling the \ref cancel function instead. This function will only - * take effect when the connection has been successfully established. - * * \param reason_code Reason Code to notify * the Broker of the reason for disconnection. * \param props An instance of \__DISCONNECT_PROPS\__. @@ -841,7 +839,10 @@ public: * \par Error codes * The list of all possible error codes that this operation can finish with:\n * - `boost::system::errc::errc_t::success`\n - * - `boost::asio::error::operation_aborted`\n + * - `boost::asio::error::operation_aborted`[footnote + This error code can appear if the Client failed to send the \__DISCONNECT\__ packet to the Server. + Regardless, the connection to the Server is terminated, and the Client is cancelled. + ]\n * - \link async_mqtt5::client::error::malformed_packet \endlink * * Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code. @@ -853,10 +854,9 @@ public: ) { auto impl = _impl; _impl = impl->dup(); - return detail::async_disconnect( + return detail::async_terminal_disconnect( detail::disconnect_rc_e(static_cast(reason_code)), - props, true, impl, - std::forward(token) + props, impl, std::forward(token) ); } diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index e1ad2d3..2f7a834 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -24,7 +24,8 @@ enum operation_type { publish, receive, subscribe, - unsubscribe + unsubscribe, + disconnect }; enum cancel_type { @@ -85,13 +86,41 @@ void setup_cancel_op_test_case( c.async_receive( asio::bind_cancellation_slot( signal.slot(), - [&handlers_called]( + [&c, &handlers_called]( error_code ec, std::string t, std::string p, publish_props ) { ++handlers_called; BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(t == ""); BOOST_TEST(p == ""); + + // right now, emitting a terminal signal on async_receive + // does NOT cancel the client + c.cancel(); + } + ) + ); +} + +template < + test::operation_type op_type, + std::enable_if_t = true +> +void setup_cancel_op_test_case( + client_type& c, asio::cancellation_signal& signal, int& handlers_called +) { + c.async_run(asio::detached); + c.async_subscribe( + subscribe_topic { "topic", subscribe_options {} }, subscribe_props {}, + asio::bind_cancellation_slot( + signal.slot(), + [&handlers_called]( + error_code ec, std::vector rcs, suback_props + ) { + ++handlers_called; + BOOST_TEST(ec == asio::error::operation_aborted); + BOOST_TEST_REQUIRE(rcs.size() == 1u); + BOOST_TEST(rcs[0] == reason_codes::empty); } ) ); @@ -123,23 +152,18 @@ void setup_cancel_op_test_case( template < test::operation_type op_type, - std::enable_if_t = true + std::enable_if_t = true > void setup_cancel_op_test_case( client_type& c, asio::cancellation_signal& signal, int& handlers_called ) { c.async_run(asio::detached); - c.async_subscribe( - subscribe_topic { "topic", subscribe_options {} }, subscribe_props {}, + c.async_disconnect( asio::bind_cancellation_slot( signal.slot(), - [&handlers_called]( - error_code ec, std::vector rcs, suback_props - ) { + [&handlers_called](error_code ec) { ++handlers_called; BOOST_TEST(ec == asio::error::operation_aborted); - BOOST_TEST_REQUIRE(rcs.size() == 1u); - BOOST_TEST(rcs[0] == reason_codes::empty); } ) ); @@ -194,8 +218,7 @@ BOOST_AUTO_TEST_CASE(client_cancel_async_receive) { run_cancel_op_test(); } -// hangs -BOOST_AUTO_TEST_CASE(signal_emit_async_receive, *boost::unit_test::disabled()) { +BOOST_AUTO_TEST_CASE(signal_emit_async_receive) { run_cancel_op_test(); } @@ -215,6 +238,10 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_unsubscribe) { run_cancel_op_test(); } +BOOST_AUTO_TEST_CASE(signal_emit_async_disconnect) { + run_cancel_op_test(); +} + struct shared_test_data { error_code success {}; error_code fail = asio::error::not_connected; diff --git a/test/unit/disconnect_op.cpp b/test/unit/disconnect_op.cpp index 6cfdfb4..becc5cc 100644 --- a/test/unit/disconnect_op.cpp +++ b/test/unit/disconnect_op.cpp @@ -42,7 +42,6 @@ void run_malformed_props_test(const disconnect_props& dprops) { BOOST_TEST(handlers_called == expected_handlers_called); } - BOOST_AUTO_TEST_CASE(malformed_reason_string) { disconnect_props dprops; dprops[prop::reason_string] = std::string { 0x01 }; @@ -64,10 +63,182 @@ BOOST_AUTO_TEST_CASE(malformed_user_property_value) { run_malformed_props_test(dprops); } -BOOST_AUTO_TEST_CASE(omit_props) { - using test::after; - using namespace std::chrono; +struct shared_test_data { + error_code success {}; + error_code fail = asio::error::not_connected; + const std::string connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + ); + const std::string connack = encoders::encode_connack( + true, reason_codes::success.value(), {} + ); + const std::string disconnect = encoders::encode_disconnect( + reason_codes::normal_disconnection.value(), disconnect_props {} + ); +}; + +using test::after; +using namespace std::chrono_literals; +using client_type = mqtt_client; + +template +void run_test(test::msg_exchange broker_side, TestCase&& test_case) { + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + asio::steady_timer timer(executor); + client_type c(executor); + c.brokers("127.0.0.1") + .async_run(asio::detached); + + test_case(c, timer); + + ioc.run(); + BOOST_TEST(broker.received_all_expected()); +} + +BOOST_FIXTURE_TEST_CASE(successful_disconnect, shared_test_data) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(disconnect) + .complete_with(success, after(0ms)); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(100ms); + timer.async_wait([&](error_code) { + c.async_disconnect( + [&](error_code ec) { + handlers_called++; + BOOST_TEST(!ec); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(successful_disconnect_in_queue, shared_test_data) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + // packets + auto publish_qos0 = encoders::encode_publish( + 0, "topic", "payload", qos_e::at_most_once, retain_e::no, dup_e::no, {} + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos0) + .complete_with(success, after(1ms)) + .expect(disconnect) + .complete_with(success, after(0ms)); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_publish( + "topic", "payload", retain_e::no, {}, + [&handlers_called](error_code ec) { + BOOST_TEST(handlers_called == 0); + handlers_called++; + BOOST_TEST(!ec); + } + ); + c.async_disconnect( + [&](error_code ec) { + BOOST_TEST(handlers_called == 1); + handlers_called++; + BOOST_TEST(!ec); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(disconnect_on_disconnected_client, shared_test_data) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_disconnect( + [&](error_code ec) { + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(disconnect_in_queue_on_disconnected_client, shared_test_data) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_publish( + "topic", "payload", retain_e::no, {}, + [&handlers_called](error_code ec) { + BOOST_TEST(handlers_called == 1); + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + } + ); + c.async_disconnect( + [&](error_code ec) { + BOOST_TEST(handlers_called == 0); + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(omit_props, shared_test_data) { constexpr int expected_handlers_called = 1; int handlers_called = 0; @@ -75,58 +246,41 @@ BOOST_AUTO_TEST_CASE(omit_props) { co_props[prop::maximum_packet_size] = 20; // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack( + auto connack_with_max_packet = encoders::encode_connack( false, reason_codes::success.value(), co_props ); disconnect_props props; props[prop::reason_string] = std::string(50, 'a'); - auto disconnect = encoders::encode_disconnect( + auto big_disconnect = encoders::encode_disconnect( reason_codes::normal_disconnection.value(), props ); - auto disconnect_no_props = encoders::encode_disconnect( - reason_codes::normal_disconnection.value(), disconnect_props {} - ); - - error_code success {}; test::msg_exchange broker_side; broker_side .expect(connect) .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(disconnect_no_props) + .reply_with(connack_with_max_packet, after(0ms)) + .expect(disconnect) .complete_with(success, after(0ms)); - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_disconnect( + disconnect_rc_e::normal_disconnection, props, + [&](error_code ec) { + handlers_called++; + BOOST_TEST(!ec); + } + ); + }); + } ); - using client_type = mqtt_client; - client_type c(executor); - c.brokers("127.0.0.1") - .async_run(asio::detached); - - asio::steady_timer timer(c.get_executor()); - timer.expires_after(50ms); - timer.async_wait([&](error_code) { - c.async_disconnect( - disconnect_rc_e::normal_disconnection, props, - [&](error_code ec) { - handlers_called++; - BOOST_TEST(!ec); - } - ); - }); - - ioc.run_for(2s); BOOST_TEST(handlers_called == expected_handlers_called); - BOOST_TEST(broker.received_all_expected()); } BOOST_AUTO_TEST_SUITE_END()