From fa014fc33725095d86cf33c6dbeb99b5c7297b14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Fri, 19 Jan 2024 13:45:09 +0100 Subject: [PATCH] Add op cancellation tests Summary: related to T12015 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27459 --- include/async_mqtt5/impl/unsubscribe_op.hpp | 2 +- test/integration/cancellation.cpp | 401 +++++++++++++------- test/integration/publish_receive.cpp | 3 +- test/unit/publish_send_op.cpp | 38 -- 4 files changed, 256 insertions(+), 188 deletions(-) diff --git a/include/async_mqtt5/impl/unsubscribe_op.hpp b/include/async_mqtt5/impl/unsubscribe_op.hpp index a38f5e9..354f4bc 100644 --- a/include/async_mqtt5/impl/unsubscribe_op.hpp +++ b/include/async_mqtt5/impl/unsubscribe_op.hpp @@ -126,7 +126,7 @@ public: _svc_ptr->async_wait_reply( control_code_e::unsuback, packet_id, - asio::prepend(std::move(*this), on_unsuback{}, std::move(packet)) + asio::prepend(std::move(*this), on_unsuback {}, std::move(packet)) ); } diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index a7fb964..d917531 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -1,6 +1,8 @@ #include #include +#include +#include #include #include #include @@ -9,10 +11,22 @@ #include +#include "test_common/message_exchange.hpp" +#include "test_common/test_service.hpp" +#include "test_common/test_stream.hpp" + using namespace async_mqtt5; namespace async_mqtt5::test { +enum operation_type { + async_run = 1, + publish, + receive, + subscribe, + unsubscribe +}; + enum cancel_type { ioc_stop = 1, client_cancel, @@ -21,178 +35,227 @@ enum cancel_type { } // end namespace async_mqtt5::test -template -void cancel_async_receive() { - using namespace test; +using stream_type = asio::ip::tcp::socket; +using client_type = mqtt_client; - constexpr int num_handlers = 3; - constexpr int expected_handlers_called = type == ioc_stop ? 0 : num_handlers; - int handlers_called = 0; - - asio::io_context ioc; - - using stream_type = asio::ip::tcp::socket; - using client_type = mqtt_client; - client_type c(ioc, ""); - - c.brokers("127.0.0.1", 1883) - .credentials("test-cli", "", "") - .async_run(asio::detached); - - auto handler = [&handlers_called]( - error_code ec, std::string, std::string, publish_props - ) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - handlers_called++; - }; - - std::vector signals(3); - - for (auto i = 0; i < num_handlers; ++i) - c.async_receive(asio::bind_cancellation_slot( - signals[i].slot(), - std::move(handler) - )); - - asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::milliseconds(10)); - timer.async_wait([&](auto) { - if constexpr (type == ioc_stop) - ioc.stop(); - else if constexpr (type == client_cancel) - c.cancel(); - else if constexpr (type == signal_emit) - std::for_each( - signals.begin(), signals.end(), - [](auto& signal) { - signal.emit(asio::cancellation_type_t::terminal); - } - ); - }); - - ioc.run_for(std::chrono::milliseconds(20)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); +template < + test::operation_type op_type, + std::enable_if_t = true +> +void setup_cancel_op_test_case( + client_type& c, asio::cancellation_signal& signal, int& handlers_called +) { + c.async_run( + asio::bind_cancellation_slot( + signal.slot(), + [&handlers_called](error_code ec) { + handlers_called++; + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + } + ) + ); } -template -void cancel_async_publish() { +template < + test::operation_type op_type, + std::enable_if_t = true +> +void setup_cancel_op_test_case( + client_type& c, asio::cancellation_signal& signal, int& handlers_called +) { + c.async_run(asio::detached); + c.async_publish("topic", "payload", retain_e::no, publish_props {}, + asio::bind_cancellation_slot( + signal.slot(), + [&handlers_called](error_code ec) { + handlers_called++; + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + } + ) + ); +} + +template < + test::operation_type op_type, + std::enable_if_t = true +> +void setup_cancel_op_test_case( + client_type& c, asio::cancellation_signal& signal, int& handlers_called +) { + c.async_run(asio::detached); + c.async_receive( + asio::bind_cancellation_slot( + signal.slot(), + [&handlers_called]( + error_code ec, std::string t, std::string p, publish_props + ) { + handlers_called++; + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + BOOST_CHECK_EQUAL(t, ""); + BOOST_CHECK_EQUAL(p, ""); + } + ) + ); +} + +template < + test::operation_type op_type, + std::enable_if_t = true +> +void setup_cancel_op_test_case( + client_type& c, asio::cancellation_signal& signal, int& handlers_called +) { + c.async_run(asio::detached); + c.async_unsubscribe( + "topic" ,unsubscribe_props {}, + asio::bind_cancellation_slot( + signal.slot(), + [&handlers_called]( + error_code ec, std::vector rcs, unsuback_props + ) { + handlers_called++; + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + // TODO: be consistent with complete_post + //BOOST_ASSERT(rcs.size() == 1); + //BOOST_CHECK(rcs[0] == reason_codes::empty); + } + ) + ); +} + +template < + test::operation_type op_type, + std::enable_if_t = true +> +void setup_cancel_op_test_case( + client_type& c, asio::cancellation_signal& signal, int& handlers_called +) { + c.async_run(asio::detached); + c.async_subscribe( + subscribe_topic { "topic", subscribe_options {} }, subscribe_props {}, + asio::bind_cancellation_slot( + signal.slot(), + [&handlers_called]( + error_code ec, std::vector rcs, suback_props + ) { + handlers_called++; + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + // TODO: be consistent with complete_post + //BOOST_ASSERT(rcs.size() == 1); + //BOOST_CHECK(rcs[0] == reason_codes::empty); + } + ) + ); +} + +template +void run_cancel_op_test() { using namespace test; - constexpr int expected_handlers_called = type == ioc_stop ? 0 : 3; + constexpr int expected_handlers_called = c_type == ioc_stop ? 0 : 1; int handlers_called = 0; asio::io_context ioc; - - using stream_type = asio::ip::tcp::socket; - using client_type = mqtt_client; client_type c(ioc, ""); + c.brokers("127.0.0.1"); - c.brokers("127.0.0.1", 1883) - .credentials("test-cli", "", "") - .async_run(asio::detached); - - std::vector signals(3); - - c.async_publish( - "topic", "payload", retain_e::yes, {}, - asio::bind_cancellation_slot( - signals[0].slot(), - [&handlers_called](error_code ec) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - handlers_called++; - } - ) - ); - - c.async_publish( - "topic", "payload", retain_e::yes, {}, - asio::bind_cancellation_slot( - signals[1].slot(), - [&handlers_called](error_code ec, reason_code rc, puback_props) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - BOOST_CHECK_EQUAL(rc, reason_codes::empty); - handlers_called++; - } - ) - ); - - - c.async_publish( - "topic", "payload", retain_e::yes, {}, - asio::bind_cancellation_slot( - signals[2].slot(), - [&handlers_called](error_code ec, reason_code rc, pubcomp_props) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - BOOST_CHECK_EQUAL(rc, reason_codes::empty); - handlers_called++; - } - ) - ); + asio::cancellation_signal signal; + setup_cancel_op_test_case(c, signal, handlers_called); asio::steady_timer timer(c.get_executor()); timer.expires_after(std::chrono::milliseconds(10)); timer.async_wait([&](auto) { - if constexpr (type == ioc_stop) + if constexpr (c_type == ioc_stop) ioc.stop(); - else if constexpr (type == client_cancel) + else if constexpr (c_type == client_cancel) c.cancel(); - else if constexpr (type == signal_emit) - std::for_each( - signals.begin(), signals.end(), - [](auto& signal) { - signal.emit(asio::cancellation_type_t::terminal); - } - ); + else if constexpr (c_type == signal_emit) + signal.emit(asio::cancellation_type_t::terminal); }); - ioc.run(); + + ioc.run_for(20ms); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_SUITE(cancellation/*, *boost::unit_test::disabled()*/) - -BOOST_AUTO_TEST_CASE(ioc_stop_async_receive) { - cancel_async_receive(); +// hangs +BOOST_AUTO_TEST_CASE(ioc_stop_async_run, *boost::unit_test::disabled()) { + run_cancel_op_test(); } -BOOST_AUTO_TEST_CASE(client_cancel_async_receive) { - cancel_async_receive(); -} - -BOOST_AUTO_TEST_CASE(signal_emit_async_receive) { - cancel_async_receive(); -} - -// passes on debian, hangs on windows in io_context destructor -BOOST_AUTO_TEST_CASE(ioc_stop_async_publish, *boost::unit_test::disabled() ) { - cancel_async_publish(); -} - -BOOST_AUTO_TEST_CASE(client_cancel_async_publish) { - cancel_async_publish(); -} - -BOOST_AUTO_TEST_CASE(signal_emit_async_publish) { - cancel_async_publish(); +BOOST_AUTO_TEST_CASE(client_cancel_async_run) { + run_cancel_op_test(); } BOOST_AUTO_TEST_CASE(signal_emit_async_run) { + run_cancel_op_test(); +} + +// hangs +BOOST_AUTO_TEST_CASE(ioc_stop_async_publish, *boost::unit_test::disabled() ) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(client_cancel_async_publish) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(signal_emit_async_publish) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(ioc_stop_async_receive) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(client_cancel_async_receive) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(signal_emit_async_receive) { + run_cancel_op_test(); +} + +// hangs +BOOST_AUTO_TEST_CASE(ioc_stop_async_subscribe, *boost::unit_test::disabled()) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(client_cancel_async_subscribe) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(signal_emit_async_subscribe) { + run_cancel_op_test(); +} + +// hangs +BOOST_AUTO_TEST_CASE(ioc_stop_async_unsubscribe, *boost::unit_test::disabled()) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(client_cancel_async_unsubscribe) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(signal_emit_async_unsubscribe) { + run_cancel_op_test(); +} + +BOOST_AUTO_TEST_CASE(signal_emit_async_run_cancels_client) { using namespace test; constexpr int expected_handlers_called = 2; int handlers_called = 0; asio::io_context ioc; - - using stream_type = asio::ip::tcp::socket; - using client_type = mqtt_client; client_type c(ioc, ""); asio::cancellation_signal signal; c.brokers("127.0.0.1", 1883) - .credentials("test-cli", "", "") .async_run( asio::bind_cancellation_slot( signal.slot(), @@ -221,40 +284,83 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_run) { BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } +struct shared_test_data { + error_code success {}; + error_code fail = asio::error::not_connected; + + const std::string connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + const std::string connack = encoders::encode_connack( + false, reason_codes::success.value(), {} + ); + + const std::string topic = "topic"; + const std::string payload = "payload"; + + const std::string publish_qos1 = encoders::encode_publish( + 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + + const std::string puback = encoders::encode_puback(1, uint8_t(0x00), {}); +}; + +using test::after; +using namespace std::chrono; + #ifdef BOOST_ASIO_HAS_CO_AWAIT constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable); -BOOST_AUTO_TEST_CASE(rerunning_the_client) { +BOOST_FIXTURE_TEST_CASE(rerunning_the_client, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1) + .complete_with(success, after(1ms)) + .reply_with(puback, after(2ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1) + .complete_with(success, after(1ms)) + .reply_with(puback, after(2ms)); + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); co_spawn(ioc, - [&ioc]() -> asio::awaitable { - using stream_type = asio::ip::tcp::socket; - using client_type = mqtt_client; - client_type c(ioc, ""); - - c.brokers("broker.hivemq.com,broker.hivemq.com", 1883) // to avoid reconnect backoff + [&]() -> asio::awaitable { + mqtt_client c(ioc, ""); + c.brokers("127.0.0.1,127.0.0.1", 1883) // to avoid reconnect backoff .async_run(asio::detached); - auto [ec] = co_await c.async_publish( - "t", "p", retain_e::yes, publish_props {}, use_nothrow_awaitable + auto [ec, rc, props] = co_await c.async_publish( + topic, payload, retain_e::no, publish_props {}, use_nothrow_awaitable ); BOOST_CHECK(!ec); + BOOST_CHECK(!rc); c.cancel(); - auto [cec] = co_await c.async_publish( - "ct", "cp", retain_e::yes, publish_props {}, use_nothrow_awaitable + auto [cec, crc, cprops] = co_await c.async_publish( + topic, payload, retain_e::no, publish_props {}, use_nothrow_awaitable ); - BOOST_CHECK(cec == asio::error::operation_aborted); + BOOST_CHECK_EQUAL(cec, asio::error::operation_aborted); + BOOST_CHECK_EQUAL(crc, reason_codes::empty); c.async_run(asio::detached); - auto [rec] = co_await c.async_publish( - "ct", "cp", retain_e::yes, publish_props {}, use_nothrow_awaitable + auto [rec, rrc, rprops] = co_await c.async_publish( + topic, payload, retain_e::no, publish_props {}, use_nothrow_awaitable ); BOOST_CHECK(!rec); + BOOST_CHECK(!rrc); co_await c.async_disconnect(use_nothrow_awaitable); co_return; @@ -263,6 +369,7 @@ BOOST_AUTO_TEST_CASE(rerunning_the_client) { ); ioc.run(); + BOOST_CHECK(broker.received_all_expected()); } #endif diff --git a/test/integration/publish_receive.cpp b/test/integration/publish_receive.cpp index 8585c50..9c9c741 100644 --- a/test/integration/publish_receive.cpp +++ b/test/integration/publish_receive.cpp @@ -46,7 +46,7 @@ struct shared_test_data { }; using test::after; -using std::chrono_literals::operator ""ms; +using namespace std::chrono; void run_test(test::msg_exchange broker_side) { constexpr int expected_handlers_called = 1; @@ -125,7 +125,6 @@ BOOST_FIXTURE_TEST_CASE(test_receive_publish_qos2, shared_test_data) { run_test(std::move(broker_side)); } - BOOST_FIXTURE_TEST_CASE(fail_to_send_pubrec, shared_test_data) { auto publish_dup = encoders::encode_publish( 1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::yes, {} diff --git a/test/unit/publish_send_op.cpp b/test/unit/publish_send_op.cpp index 9293e76..bf2c9c2 100644 --- a/test/unit/publish_send_op.cpp +++ b/test/unit/publish_send_op.cpp @@ -264,42 +264,4 @@ BOOST_AUTO_TEST_CASE(test_topic_alias_maximum) { BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } -BOOST_AUTO_TEST_CASE(test_publish_cancellation) { - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - asio::io_context ioc; - using client_service_type = test::test_service; - auto svc_ptr = std::make_shared(ioc.get_executor()); - svc_ptr->run([](error_code){}); - asio::cancellation_signal cancel_signal; - - auto h = [&handlers_called](error_code ec, reason_code rc, puback_props) { - ++handlers_called; - BOOST_CHECK(ec == asio::error::operation_aborted); - BOOST_CHECK_EQUAL(rc, reason_codes::empty); - }; - - auto handler = asio::bind_cancellation_slot(cancel_signal.slot(), std::move(h)); - - asio::steady_timer timer(ioc.get_executor()); - timer.expires_after(std::chrono::milliseconds(60)); - timer.async_wait( - [&cancel_signal](error_code) { - cancel_signal.emit(asio::cancellation_type::terminal); - } - ); - - detail::publish_send_op< - client_service_type, decltype(handler), qos_e::at_least_once - > { svc_ptr, std::move(handler) } - .perform( - "test", "payload", retain_e::no, {} - ); - - ioc.run_for(std::chrono::milliseconds(500)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); -} - - BOOST_AUTO_TEST_SUITE_END()