diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index 1f636f5..979677f 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -98,14 +98,17 @@ public: _svc_ptr->async_send( wire_data, no_serial, send_flag::terminal, - asio::consign( - asio::prepend(std::move(*this), on_disconnect {}), - std::move(disconnect) + asio::prepend( + std::move(*this), + on_disconnect {}, std::move(disconnect) ) ); } - void operator()(on_disconnect, error_code ec) { + void operator()( + on_disconnect, + control_packet disconnect, error_code ec + ) { // The connection must be closed even // if we failed to send the DISCONNECT packet // with Reason Code of 0x80 or greater. @@ -116,14 +119,17 @@ public: ) return complete(asio::error::operation_aborted); + if (ec == asio::error::try_again) { + if (_context.terminal) + return send_disconnect(std::move(disconnect)); + return complete(error_code {}); + } + if (_context.terminal) { _svc_ptr->cancel(); return complete(error_code {}); } - if (ec == asio::error::try_again) - return complete(error_code {}); - _svc_ptr->close_stream(); _svc_ptr->open_stream(); diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index 655af62..ca2289f 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -872,8 +872,8 @@ public: * * \par Completion condition * The asynchronous operation will complete when one of the following conditions is true:\n - * - The Client has attempted to send a \__DISCONNECT\__ packet, regardless of whether - * the sending was successful or not.\n + * - The Client has sent a \__DISCONNECT\__ packet.\n + * - 5 seconds have elapsed without a successful send.\n * - An error occurred. This is indicated by an associated \__ERROR_CODE\__ in the handler.\n * * \par Error codes diff --git a/test/integration/disconnect.cpp b/test/integration/disconnect.cpp new file mode 100644 index 0000000..60af5ea --- /dev/null +++ b/test/integration/disconnect.cpp @@ -0,0 +1,304 @@ +#include + +#include +#include +#include + +#include + +#include "test_common/message_exchange.hpp" +#include "test_common/test_broker.hpp" +#include "test_common/test_stream.hpp" + +using namespace async_mqtt5; + +BOOST_AUTO_TEST_SUITE(disconnect/*, *boost::unit_test::disabled()*/) + +struct shared_test_data { + error_code success {}; + error_code fail = asio::error::not_connected; + + const std::string connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + ); + const std::string connack = encoders::encode_connack( + true, reason_codes::success.value(), {} + ); + const std::string disconnect = encoders::encode_disconnect( + reason_codes::normal_disconnection.value(), disconnect_props {} + ); +}; + +using test::after; +using namespace std::chrono_literals; +using client_type = mqtt_client; + +template +void run_test(test::msg_exchange broker_side, TestCase&& test_case) { + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + asio::steady_timer timer(executor); + client_type c(executor); + c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff + .async_run(asio::detached); + + test_case(c, timer); + + ioc.run(); + BOOST_TEST(broker.received_all_expected()); +} + +BOOST_FIXTURE_TEST_CASE(successful_disconnect, shared_test_data) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(disconnect) + .complete_with(success, after(0ms)); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(100ms); + timer.async_wait([&](error_code) { + c.async_disconnect( + [&](error_code ec) { + handlers_called++; + BOOST_TEST(!ec); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(successful_disconnect_in_queue, shared_test_data) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + // packets + auto publish_qos0 = encoders::encode_publish( + 0, "topic", "payload", qos_e::at_most_once, retain_e::no, dup_e::no, {} + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos0) + .complete_with(success, after(1ms)) + .expect(disconnect) + .complete_with(success, after(0ms)); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_publish( + "topic", "payload", retain_e::no, {}, + [&handlers_called](error_code ec) { + BOOST_TEST(handlers_called == 0); + handlers_called++; + BOOST_TEST(!ec); + } + ); + c.async_disconnect( + [&](error_code ec) { + BOOST_TEST(handlers_called == 1); + handlers_called++; + BOOST_TEST(!ec); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(disconnect_on_disconnected_client, shared_test_data) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .expect(connect); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_disconnect( + [&](error_code ec) { + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(disconnect_in_queue_on_disconnected_client, shared_test_data) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .expect(connect); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_publish( + "topic", "payload", retain_e::no, {}, + [&handlers_called](error_code ec) { + BOOST_TEST(handlers_called == 1); + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + } + ); + c.async_disconnect( + [&](error_code ec) { + BOOST_TEST(handlers_called == 0); + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(resend_terminal_disconnect, shared_test_data) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(disconnect) + .complete_with(success, after(0ms)); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer&) { + c.async_disconnect( + [&](error_code ec) { + handlers_called++; + BOOST_TEST(!ec); + } + ); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_FIXTURE_TEST_CASE(dont_resend_non_terminal_disconnect, shared_test_data) { + auto malformed_publish_1 = encoders::encode_publish( + 1, "malformed topic", "malformed payload", + static_cast(0b11), retain_e::yes, dup_e::no, {} + ); + auto malformed_publish_2 = encoders::encode_publish( + 2, "malformed topic", "malformed payload", + static_cast(0b11), retain_e::yes, dup_e::no, {} + ); + + auto disconnect_malformed_publish = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), + test::dprops_with_reason_string("Malformed PUBLISH received: QoS bits set to 0b11") + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(malformed_publish_1, malformed_publish_2, after(10ms)) + .expect(disconnect_malformed_publish) + .complete_with(success, after(0ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.cancel(); + }); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(omit_props, shared_test_data) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + connack_props co_props; + co_props[prop::maximum_packet_size] = 20; + + // packets + auto connack_with_max_packet = encoders::encode_connack( + false, reason_codes::success.value(), co_props + ); + + disconnect_props props; + props[prop::reason_string] = std::string(50, 'a'); + auto big_disconnect = encoders::encode_disconnect( + reason_codes::normal_disconnection.value(), props + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack_with_max_packet, after(0ms)) + .expect(disconnect) + .complete_with(success, after(0ms)); + + run_test( + std::move(broker_side), + [&](client_type& c, asio::steady_timer& timer) { + timer.expires_after(50ms); + timer.async_wait([&](error_code) { + c.async_disconnect( + disconnect_rc_e::normal_disconnection, props, + [&](error_code ec) { + handlers_called++; + BOOST_TEST(!ec); + } + ); + }); + } + ); + + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/unit/disconnect_op.cpp b/test/unit/disconnect_op.cpp index becc5cc..46db1cc 100644 --- a/test/unit/disconnect_op.cpp +++ b/test/unit/disconnect_op.cpp @@ -1,17 +1,10 @@ #include #include -#include - -#include #include -#include - -#include "test_common/message_exchange.hpp" #include "test_common/test_service.hpp" -#include "test_common/test_stream.hpp" using namespace async_mqtt5; @@ -63,224 +56,4 @@ BOOST_AUTO_TEST_CASE(malformed_user_property_value) { run_malformed_props_test(dprops); } -struct shared_test_data { - error_code success {}; - error_code fail = asio::error::not_connected; - - const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt - ); - const std::string connack = encoders::encode_connack( - true, reason_codes::success.value(), {} - ); - const std::string disconnect = encoders::encode_disconnect( - reason_codes::normal_disconnection.value(), disconnect_props {} - ); -}; - -using test::after; -using namespace std::chrono_literals; -using client_type = mqtt_client; - -template -void run_test(test::msg_exchange broker_side, TestCase&& test_case) { - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - asio::steady_timer timer(executor); - client_type c(executor); - c.brokers("127.0.0.1") - .async_run(asio::detached); - - test_case(c, timer); - - ioc.run(); - BOOST_TEST(broker.received_all_expected()); -} - -BOOST_FIXTURE_TEST_CASE(successful_disconnect, shared_test_data) { - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - test::msg_exchange broker_side; - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(disconnect) - .complete_with(success, after(0ms)); - - run_test( - std::move(broker_side), - [&](client_type& c, asio::steady_timer& timer) { - timer.expires_after(100ms); - timer.async_wait([&](error_code) { - c.async_disconnect( - [&](error_code ec) { - handlers_called++; - BOOST_TEST(!ec); - } - ); - }); - } - ); - - BOOST_TEST(handlers_called == expected_handlers_called); -} - -BOOST_FIXTURE_TEST_CASE(successful_disconnect_in_queue, shared_test_data) { - constexpr int expected_handlers_called = 2; - int handlers_called = 0; - - // packets - auto publish_qos0 = encoders::encode_publish( - 0, "topic", "payload", qos_e::at_most_once, retain_e::no, dup_e::no, {} - ); - - test::msg_exchange broker_side; - broker_side - .expect(connect) - .complete_with(success, after(1ms)) - .reply_with(connack, after(2ms)) - .expect(publish_qos0) - .complete_with(success, after(1ms)) - .expect(disconnect) - .complete_with(success, after(0ms)); - - run_test( - std::move(broker_side), - [&](client_type& c, asio::steady_timer& timer) { - timer.expires_after(50ms); - timer.async_wait([&](error_code) { - c.async_publish( - "topic", "payload", retain_e::no, {}, - [&handlers_called](error_code ec) { - BOOST_TEST(handlers_called == 0); - handlers_called++; - BOOST_TEST(!ec); - } - ); - c.async_disconnect( - [&](error_code ec) { - BOOST_TEST(handlers_called == 1); - handlers_called++; - BOOST_TEST(!ec); - } - ); - }); - } - ); - - BOOST_TEST(handlers_called == expected_handlers_called); -} - -BOOST_FIXTURE_TEST_CASE(disconnect_on_disconnected_client, shared_test_data) { - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - test::msg_exchange broker_side; - broker_side - .expect(connect); - - run_test( - std::move(broker_side), - [&](client_type& c, asio::steady_timer& timer) { - timer.expires_after(50ms); - timer.async_wait([&](error_code) { - c.async_disconnect( - [&](error_code ec) { - handlers_called++; - BOOST_TEST(ec == asio::error::operation_aborted); - } - ); - }); - } - ); - - BOOST_TEST(handlers_called == expected_handlers_called); -} - -BOOST_FIXTURE_TEST_CASE(disconnect_in_queue_on_disconnected_client, shared_test_data) { - constexpr int expected_handlers_called = 2; - int handlers_called = 0; - - test::msg_exchange broker_side; - broker_side - .expect(connect); - - run_test( - std::move(broker_side), - [&](client_type& c, asio::steady_timer& timer) { - timer.expires_after(50ms); - timer.async_wait([&](error_code) { - c.async_publish( - "topic", "payload", retain_e::no, {}, - [&handlers_called](error_code ec) { - BOOST_TEST(handlers_called == 1); - handlers_called++; - BOOST_TEST(ec == asio::error::operation_aborted); - } - ); - c.async_disconnect( - [&](error_code ec) { - BOOST_TEST(handlers_called == 0); - handlers_called++; - BOOST_TEST(ec == asio::error::operation_aborted); - } - ); - }); - } - ); - - BOOST_TEST(handlers_called == expected_handlers_called); -} - -BOOST_FIXTURE_TEST_CASE(omit_props, shared_test_data) { - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - connack_props co_props; - co_props[prop::maximum_packet_size] = 20; - - // packets - auto connack_with_max_packet = encoders::encode_connack( - false, reason_codes::success.value(), co_props - ); - - disconnect_props props; - props[prop::reason_string] = std::string(50, 'a'); - auto big_disconnect = encoders::encode_disconnect( - reason_codes::normal_disconnection.value(), props - ); - - test::msg_exchange broker_side; - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack_with_max_packet, after(0ms)) - .expect(disconnect) - .complete_with(success, after(0ms)); - - run_test( - std::move(broker_side), - [&](client_type& c, asio::steady_timer& timer) { - timer.expires_after(50ms); - timer.async_wait([&](error_code) { - c.async_disconnect( - disconnect_rc_e::normal_disconnection, props, - [&](error_code ec) { - handlers_called++; - BOOST_TEST(!ec); - } - ); - }); - } - ); - - BOOST_TEST(handlers_called == expected_handlers_called); -} - BOOST_AUTO_TEST_SUITE_END()