Add async mutex unit tests

Summary:
related to T12015
- relax coroutine tests
- add async mutex unit tests

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Differential Revision: https://repo.mireo.local/D27719
This commit is contained in:
Korina Šimičević
2024-02-05 12:11:35 +01:00
parent 5c7f0bc29d
commit 0de02e3c53
5 changed files with 228 additions and 23 deletions

View File

@ -183,7 +183,7 @@ private:
});
}
// Executes operation immediatelly if mutex is not locked
// Executes operation immediately if mutex is not locked
// or queues it for later execution otherwise. In both cases
// the operation will be executed in a manner equivalent
// to asio::post to avoid recursion.

View File

@ -59,27 +59,27 @@ asio::awaitable<void> sanity_check(mqtt_client<StreamType, TlsContext>& c) {
"test/mqtt-test", "hello world with qos0!", retain_e::yes, publish_props {},
use_nothrow_awaitable
);
BOOST_CHECK(!ec_0);
BOOST_TEST_WARN(!ec_0);
auto [ec_1, puback_rc, puback_props] = co_await c.template async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
retain_e::yes, publish_props {},
use_nothrow_awaitable
);
BOOST_CHECK(!ec_1);
BOOST_CHECK(!puback_rc);
BOOST_TEST_WARN(!ec_1);
BOOST_TEST_WARN(!puback_rc);
auto [ec_2, pubcomp_rc, pubcomp_props] = co_await c.template async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
retain_e::yes, publish_props {},
use_nothrow_awaitable
);
BOOST_CHECK(!ec_2);
BOOST_CHECK(!pubcomp_rc);
BOOST_TEST_WARN(!ec_2);
BOOST_TEST_WARN(!pubcomp_rc);
subscribe_topic sub_topic = subscribe_topic {
"test/mqtt-test", async_mqtt5::subscribe_options {
qos_e::exactly_once,
qos_e::at_least_once,
no_local_e::no,
retain_as_published_e::retain,
retain_handling_e::send
@ -89,16 +89,16 @@ asio::awaitable<void> sanity_check(mqtt_client<StreamType, TlsContext>& c) {
auto [sub_ec, sub_codes, sub_props] = co_await c.async_subscribe(
sub_topic, subscribe_props {}, use_nothrow_awaitable
);
BOOST_CHECK(!sub_ec);
BOOST_CHECK(!sub_codes[0]);
auto [rec, topic, payload, publish_props] = co_await c.async_receive(use_nothrow_awaitable);
BOOST_TEST_WARN(!sub_ec);
if (!sub_codes[0])
auto [rec, topic, payload, publish_props] = co_await c.async_receive(use_nothrow_awaitable);
auto [unsub_ec, unsub_codes, unsub_props] = co_await c.async_unsubscribe(
"test/mqtt-test", unsubscribe_props {},
use_nothrow_awaitable
);
BOOST_CHECK(!unsub_ec);
BOOST_CHECK(!unsub_codes[0]);
BOOST_TEST_WARN(!unsub_ec);
BOOST_TEST_WARN(!unsub_codes[0]);
co_await c.async_disconnect(use_nothrow_awaitable);
co_return;
@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) {
timer.async_wait(
[&](boost::system::error_code ec) {
BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!");
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}
@ -157,7 +157,7 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) {
timer.async_wait(
[&](boost::system::error_code ec) {
BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!");
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}
@ -193,7 +193,7 @@ BOOST_AUTO_TEST_CASE(openssl_tls_client_check) {
timer.async_wait(
[&](boost::system::error_code ec) {
BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!");
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}
@ -231,7 +231,7 @@ BOOST_AUTO_TEST_CASE(websocket_tls_client_check) {
timer.async_wait(
[&](boost::system::error_code ec) {
BOOST_CHECK_MESSAGE(ec, "Failed to receive all the expected replies!");
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}

View File

@ -201,6 +201,34 @@ BOOST_FIXTURE_TEST_CASE(fail_to_send_pubrel, shared_test_data) {
);
}
BOOST_FIXTURE_TEST_CASE(fail_to_receive_pubcomp, shared_test_data) {
test::msg_exchange broker_side;
broker_side
.expect(connect)
.complete_with(success, after(1ms))
.reply_with(connack, after(2ms))
.expect(publish_qos2)
.complete_with(success, after(1ms))
.reply_with(pubrec, after(2ms))
.expect(pubrel)
.complete_with(success, after(1ms))
.send(fail, after(100ms))
.expect(connect)
.complete_with(success, after(1ms))
.reply_with(connack, after(2ms))
.expect(pubrel)
.complete_with(success, after(1ms))
.reply_with(pubcomp, after(2ms));
run_test<qos_e::exactly_once>(
std::move(broker_side),
[](error_code ec, reason_code rc, pubcomp_props) {
BOOST_TEST(!ec);
BOOST_TEST(rc == reason_codes::success);
}
);
}
BOOST_FIXTURE_TEST_CASE(receive_malformed_puback, shared_test_data) {
// packets
auto publish_qos1_dup = encoders::encode_publish(

172
test/unit/async_mutex.cpp Normal file
View File

@ -0,0 +1,172 @@
#include <boost/test/unit_test.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <async_mqtt5/detail/async_mutex.hpp>
using namespace async_mqtt5;
using error_code = boost::system::error_code;
using async_mutex = detail::async_mutex;
BOOST_AUTO_TEST_SUITE(async_mutex_unit/*, *boost::unit_test::disabled()*/)
BOOST_AUTO_TEST_CASE(lock_mutex) {
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
asio::thread_pool tp(1);
async_mutex mutex(tp.executor());
mutex.lock([&mutex, &handlers_called](error_code ec) {
++handlers_called;
BOOST_TEST(!ec);
BOOST_TEST(mutex.is_locked());
mutex.unlock();
BOOST_TEST(!mutex.is_locked());
});
tp.wait();
BOOST_TEST(handlers_called == expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(get_executor) {
asio::thread_pool tp(1);
auto ex = tp.get_executor();
async_mutex mutex(ex);
BOOST_CHECK(mutex.get_executor() == ex);
}
BOOST_AUTO_TEST_CASE(bind_executor) {
constexpr int expected_handlers_called = 2;
int handlers_called = 0;
asio::thread_pool tp(1);
async_mutex mutex(tp.get_executor());
auto s1 = asio::make_strand(tp.get_executor());
auto s2 = asio::make_strand(tp.get_executor());
mutex.lock(
asio::bind_executor(
s1,
[&](error_code ec) mutable {
++handlers_called;
BOOST_TEST(!ec);
BOOST_TEST(s1.running_in_this_thread());
BOOST_TEST(!s2.running_in_this_thread());
mutex.unlock();
}
)
);
mutex.lock(
asio::bind_executor(
s2,
[&](error_code ec) mutable {
++handlers_called;
BOOST_TEST(!ec);
BOOST_TEST(!s1.running_in_this_thread());
BOOST_TEST(s2.running_in_this_thread());
mutex.unlock();
}
)
);
tp.wait();
BOOST_TEST(handlers_called == expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(per_op_cancellation) {
constexpr int expected_handlers_called = 2;
int handlers_called = 0;
asio::io_context ioc;
asio::cancellation_signal cs;
async_mutex mutex(asio::make_strand(ioc.get_executor()));
// mutex must be locked in order to cancel a pending operation
mutex.lock(
[&mutex, &handlers_called](error_code ec) {
++handlers_called;
BOOST_TEST(!ec);
mutex.unlock();
}
);
mutex.lock(
asio::bind_cancellation_slot(
cs.slot(),
[&handlers_called](error_code ec) {
++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted);
}
)
);
cs.emit(asio::cancellation_type_t::terminal);
cs.slot().clear();
ioc.run();
BOOST_TEST(handlers_called == expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(cancel_ops_by_destructor) {
constexpr int expected_handlers_called = 2;
int handlers_called = 0;
asio::io_context ioc;
{
async_mutex mutex(ioc.get_executor());
auto op = [&handlers_called](error_code ec) {
handlers_called++;
BOOST_TEST(!ec);
};
auto cancelled_op = [&handlers_called](error_code ec) {
handlers_called++;
BOOST_TEST(ec == asio::error::operation_aborted);
};
mutex.lock(std::move(op)); // will be immediately posted with ec = success
mutex.lock(std::move(cancelled_op));
}
ioc.run();
BOOST_TEST(handlers_called == expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(cancel_ops) {
constexpr int expected_handlers_called = 5;
int handlers_called = 0;
asio::io_context ioc;
async_mutex mutex(ioc.get_executor());
auto op = [&mutex, &handlers_called](error_code ec) {
handlers_called++;
BOOST_TEST(!ec);
mutex.unlock();
};
auto cancelled_op = [&handlers_called](error_code ec) {
handlers_called++;
BOOST_TEST(ec == asio::error::operation_aborted);
};
mutex.lock(std::move(op));
// pending operations that will be cancelled
for (int i = 0; i < expected_handlers_called - 1; ++i)
mutex.lock(cancelled_op);
mutex.cancel();
ioc.run();
BOOST_TEST(handlers_called == expected_handlers_called);
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -27,16 +27,21 @@ BOOST_AUTO_TEST_CASE(client_ec_to_string) {
};
const client::client_ec_category& cat = client::get_error_code_category();
BOOST_CHECK_NO_THROW(cat.name());
BOOST_TEST(cat.name());
constexpr auto default_output = "Unknown client error.";
for (auto ec : ecs)
BOOST_CHECK(cat.message(static_cast<int>(ec)) != default_output);
BOOST_TEST(cat.message(static_cast<int>(ec)) != default_output);
// default branch
BOOST_CHECK(cat.message(1) == default_output);
BOOST_TEST(cat.message(1) == default_output);
}
BOOST_AUTO_TEST_CASE(client_ec_to_stream) {
std::ostringstream stream;
stream << client::error::invalid_topic;
BOOST_TEST(stream.str() == client_error_to_string(client::error::invalid_topic));
}
BOOST_AUTO_TEST_CASE(reason_code_to_string) {
// Ensure that all branches of the switch/case are covered
@ -62,14 +67,14 @@ BOOST_AUTO_TEST_CASE(reason_code_to_string) {
wildcard_subscriptions_not_supported
};
BOOST_CHECK_EQUAL(rcs.size(), 46u);
BOOST_TEST(rcs.size() == 46u);
constexpr auto default_output = "Invalid reason code";
for (const auto& rc: rcs)
BOOST_CHECK(rc.message() != "Invalid reason code");
BOOST_TEST(rc.message() != "Invalid reason code");
// default branch
BOOST_CHECK(
BOOST_TEST(
reason_code(0x05, reason_codes::category::suback).message() == default_output
);
}
@ -77,7 +82,7 @@ BOOST_AUTO_TEST_CASE(reason_code_to_string) {
BOOST_AUTO_TEST_CASE(reason_code_to_stream) {
std::ostringstream stream;
stream << reason_codes::success;
BOOST_CHECK_EQUAL(stream.str(), reason_codes::success.message());
BOOST_TEST(stream.str() == reason_codes::success.message());
}
BOOST_AUTO_TEST_SUITE_END();