From 0de02e3c53fc42f321a0c90a6ab863039165f217 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Mon, 5 Feb 2024 12:11:35 +0100 Subject: [PATCH] Add async mutex unit tests Summary: related to T12015 - relax coroutine tests - add async mutex unit tests Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27719 --- include/async_mqtt5/detail/async_mutex.hpp | 2 +- test/integration/coroutine.cpp | 30 ++-- test/integration/send_publish.cpp | 28 ++++ test/unit/async_mutex.cpp | 172 +++++++++++++++++++++ test/unit/error.cpp | 19 ++- 5 files changed, 228 insertions(+), 23 deletions(-) create mode 100644 test/unit/async_mutex.cpp diff --git a/include/async_mqtt5/detail/async_mutex.hpp b/include/async_mqtt5/detail/async_mutex.hpp index ca5865f..d43fcbd 100644 --- a/include/async_mqtt5/detail/async_mutex.hpp +++ b/include/async_mqtt5/detail/async_mutex.hpp @@ -183,7 +183,7 @@ private: }); } - // Executes operation immediatelly if mutex is not locked + // Executes operation immediately if mutex is not locked // or queues it for later execution otherwise. In both cases // the operation will be executed in a manner equivalent // to asio::post to avoid recursion. diff --git a/test/integration/coroutine.cpp b/test/integration/coroutine.cpp index f6c0d32..6fc0a71 100644 --- a/test/integration/coroutine.cpp +++ b/test/integration/coroutine.cpp @@ -59,27 +59,27 @@ asio::awaitable sanity_check(mqtt_client& c) { "test/mqtt-test", "hello world with qos0!", retain_e::yes, publish_props {}, use_nothrow_awaitable ); - BOOST_CHECK(!ec_0); + BOOST_TEST_WARN(!ec_0); auto [ec_1, puback_rc, puback_props] = co_await c.template async_publish( "test/mqtt-test", "hello world with qos1!", retain_e::yes, publish_props {}, use_nothrow_awaitable ); - BOOST_CHECK(!ec_1); - BOOST_CHECK(!puback_rc); + BOOST_TEST_WARN(!ec_1); + BOOST_TEST_WARN(!puback_rc); auto [ec_2, pubcomp_rc, pubcomp_props] = co_await c.template async_publish( "test/mqtt-test", "hello world with qos2!", retain_e::yes, publish_props {}, use_nothrow_awaitable ); - BOOST_CHECK(!ec_2); - BOOST_CHECK(!pubcomp_rc); + BOOST_TEST_WARN(!ec_2); + BOOST_TEST_WARN(!pubcomp_rc); subscribe_topic sub_topic = subscribe_topic { "test/mqtt-test", async_mqtt5::subscribe_options { - qos_e::exactly_once, + qos_e::at_least_once, no_local_e::no, retain_as_published_e::retain, retain_handling_e::send @@ -89,16 +89,16 @@ asio::awaitable sanity_check(mqtt_client& c) { auto [sub_ec, sub_codes, sub_props] = co_await c.async_subscribe( sub_topic, subscribe_props {}, use_nothrow_awaitable ); - BOOST_CHECK(!sub_ec); - BOOST_CHECK(!sub_codes[0]); - auto [rec, topic, payload, publish_props] = co_await c.async_receive(use_nothrow_awaitable); + BOOST_TEST_WARN(!sub_ec); + if (!sub_codes[0]) + auto [rec, topic, payload, publish_props] = co_await c.async_receive(use_nothrow_awaitable); auto [unsub_ec, unsub_codes, unsub_props] = co_await c.async_unsubscribe( "test/mqtt-test", unsubscribe_props {}, use_nothrow_awaitable ); - BOOST_CHECK(!unsub_ec); - BOOST_CHECK(!unsub_codes[0]); + BOOST_TEST_WARN(!unsub_ec); + BOOST_TEST_WARN(!unsub_codes[0]); co_await c.async_disconnect(use_nothrow_awaitable); co_return; @@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) { timer.async_wait( [&](boost::system::error_code ec) { - BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!"); + BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); ioc.stop(); } @@ -157,7 +157,7 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) { timer.async_wait( [&](boost::system::error_code ec) { - BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!"); + BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); ioc.stop(); } @@ -193,7 +193,7 @@ BOOST_AUTO_TEST_CASE(openssl_tls_client_check) { timer.async_wait( [&](boost::system::error_code ec) { - BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!"); + BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); ioc.stop(); } @@ -231,7 +231,7 @@ BOOST_AUTO_TEST_CASE(websocket_tls_client_check) { timer.async_wait( [&](boost::system::error_code ec) { - BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!"); + BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); ioc.stop(); } diff --git a/test/integration/send_publish.cpp b/test/integration/send_publish.cpp index ada76b6..386ee60 100644 --- a/test/integration/send_publish.cpp +++ b/test/integration/send_publish.cpp @@ -201,6 +201,34 @@ BOOST_FIXTURE_TEST_CASE(fail_to_send_pubrel, shared_test_data) { ); } +BOOST_FIXTURE_TEST_CASE(fail_to_receive_pubcomp, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos2) + .complete_with(success, after(1ms)) + .reply_with(pubrec, after(2ms)) + .expect(pubrel) + .complete_with(success, after(1ms)) + .send(fail, after(100ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(pubrel) + .complete_with(success, after(1ms)) + .reply_with(pubcomp, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, pubcomp_props) { + BOOST_TEST(!ec); + BOOST_TEST(rc == reason_codes::success); + } + ); +} + BOOST_FIXTURE_TEST_CASE(receive_malformed_puback, shared_test_data) { // packets auto publish_qos1_dup = encoders::encode_publish( diff --git a/test/unit/async_mutex.cpp b/test/unit/async_mutex.cpp new file mode 100644 index 0000000..6114ed8 --- /dev/null +++ b/test/unit/async_mutex.cpp @@ -0,0 +1,172 @@ +#include + +#include +#include +#include +#include + +#include + +using namespace async_mqtt5; +using error_code = boost::system::error_code; +using async_mutex = detail::async_mutex; + +BOOST_AUTO_TEST_SUITE(async_mutex_unit/*, *boost::unit_test::disabled()*/) + +BOOST_AUTO_TEST_CASE(lock_mutex) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::thread_pool tp(1); + async_mutex mutex(tp.executor()); + + mutex.lock([&mutex, &handlers_called](error_code ec) { + ++handlers_called; + BOOST_TEST(!ec); + BOOST_TEST(mutex.is_locked()); + mutex.unlock(); + BOOST_TEST(!mutex.is_locked()); + }); + + tp.wait(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(get_executor) { + asio::thread_pool tp(1); + auto ex = tp.get_executor(); + async_mutex mutex(ex); + BOOST_CHECK(mutex.get_executor() == ex); +} + +BOOST_AUTO_TEST_CASE(bind_executor) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + asio::thread_pool tp(1); + + async_mutex mutex(tp.get_executor()); + auto s1 = asio::make_strand(tp.get_executor()); + auto s2 = asio::make_strand(tp.get_executor()); + + mutex.lock( + asio::bind_executor( + s1, + [&](error_code ec) mutable { + ++handlers_called; + BOOST_TEST(!ec); + BOOST_TEST(s1.running_in_this_thread()); + BOOST_TEST(!s2.running_in_this_thread()); + mutex.unlock(); + } + ) + ); + + mutex.lock( + asio::bind_executor( + s2, + [&](error_code ec) mutable { + ++handlers_called; + BOOST_TEST(!ec); + BOOST_TEST(!s1.running_in_this_thread()); + BOOST_TEST(s2.running_in_this_thread()); + mutex.unlock(); + } + ) + ); + + tp.wait(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(per_op_cancellation) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + asio::io_context ioc; + asio::cancellation_signal cs; + + async_mutex mutex(asio::make_strand(ioc.get_executor())); + + // mutex must be locked in order to cancel a pending operation + mutex.lock( + [&mutex, &handlers_called](error_code ec) { + ++handlers_called; + BOOST_TEST(!ec); + mutex.unlock(); + } + ); + + mutex.lock( + asio::bind_cancellation_slot( + cs.slot(), + [&handlers_called](error_code ec) { + ++handlers_called; + BOOST_TEST(ec == asio::error::operation_aborted); + } + ) + ); + cs.emit(asio::cancellation_type_t::terminal); + cs.slot().clear(); + + ioc.run(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(cancel_ops_by_destructor) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + asio::io_context ioc; + + { + async_mutex mutex(ioc.get_executor()); + + auto op = [&handlers_called](error_code ec) { + handlers_called++; + BOOST_TEST(!ec); + }; + + auto cancelled_op = [&handlers_called](error_code ec) { + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + }; + + mutex.lock(std::move(op)); // will be immediately posted with ec = success + mutex.lock(std::move(cancelled_op)); + } + + ioc.run(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(cancel_ops) { + constexpr int expected_handlers_called = 5; + int handlers_called = 0; + + asio::io_context ioc; + async_mutex mutex(ioc.get_executor()); + + auto op = [&mutex, &handlers_called](error_code ec) { + handlers_called++; + BOOST_TEST(!ec); + mutex.unlock(); + }; + + auto cancelled_op = [&handlers_called](error_code ec) { + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + }; + + mutex.lock(std::move(op)); + + // pending operations that will be cancelled + for (int i = 0; i < expected_handlers_called - 1; ++i) + mutex.lock(cancelled_op); + + mutex.cancel(); + ioc.run(); + BOOST_TEST(handlers_called == expected_handlers_called); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/unit/error.cpp b/test/unit/error.cpp index 24df10c..fdd0352 100644 --- a/test/unit/error.cpp +++ b/test/unit/error.cpp @@ -27,16 +27,21 @@ BOOST_AUTO_TEST_CASE(client_ec_to_string) { }; const client::client_ec_category& cat = client::get_error_code_category(); - BOOST_CHECK_NO_THROW(cat.name()); + BOOST_TEST(cat.name()); constexpr auto default_output = "Unknown client error."; for (auto ec : ecs) - BOOST_CHECK(cat.message(static_cast(ec)) != default_output); + BOOST_TEST(cat.message(static_cast(ec)) != default_output); // default branch - BOOST_CHECK(cat.message(1) == default_output); + BOOST_TEST(cat.message(1) == default_output); } +BOOST_AUTO_TEST_CASE(client_ec_to_stream) { + std::ostringstream stream; + stream << client::error::invalid_topic; + BOOST_TEST(stream.str() == client_error_to_string(client::error::invalid_topic)); +} BOOST_AUTO_TEST_CASE(reason_code_to_string) { // Ensure that all branches of the switch/case are covered @@ -62,14 +67,14 @@ BOOST_AUTO_TEST_CASE(reason_code_to_string) { wildcard_subscriptions_not_supported }; - BOOST_CHECK_EQUAL(rcs.size(), 46u); + BOOST_TEST(rcs.size() == 46u); constexpr auto default_output = "Invalid reason code"; for (const auto& rc: rcs) - BOOST_CHECK(rc.message() != "Invalid reason code"); + BOOST_TEST(rc.message() != "Invalid reason code"); // default branch - BOOST_CHECK( + BOOST_TEST( reason_code(0x05, reason_codes::category::suback).message() == default_output ); } @@ -77,7 +82,7 @@ BOOST_AUTO_TEST_CASE(reason_code_to_string) { BOOST_AUTO_TEST_CASE(reason_code_to_stream) { std::ostringstream stream; stream << reason_codes::success; - BOOST_CHECK_EQUAL(stream.str(), reason_codes::success.message()); + BOOST_TEST(stream.str() == reason_codes::success.message()); } BOOST_AUTO_TEST_SUITE_END();