diff --git a/test/unit/include/test_common/packet_util.hpp b/test/unit/include/test_common/packet_util.hpp index aa186d3..0e1e81e 100644 --- a/test/unit/include/test_common/packet_util.hpp +++ b/test/unit/include/test_common/packet_util.hpp @@ -58,12 +58,9 @@ inline std::string_view code_to_str(control_code_e code) { case control_code_e::pingresp: return "PINGRESP"; default: return "NO PACKET"; } - return "UNKNOWN"; } -inline std::string to_readable_packet( - std::string packet, error_code ec = {}, bool incoming = false -) { +inline std::string to_readable_packet(std::string packet) { auto control_byte = uint8_t(*packet.data()); auto code = extract_code(control_byte); @@ -72,14 +69,11 @@ inline std::string to_readable_packet( std::ostringstream stream; - if (incoming) - stream << "-> "; - if ( code == control_code_e::connect || code == control_code_e::connack || code == control_code_e::disconnect ) { - stream << code_to_str(code) << (ec ? " ec: " + ec.message() : ""); + stream << code_to_str(code); return stream.str(); } @@ -95,6 +89,7 @@ inline std::string to_readable_packet( auto& [topic, packet_id, flags, props, payload] = *publish; stream << code_to_str(code); stream << (packet_id ? " " + std::to_string(*packet_id) : ""); + stream << "flags: " << flags; return stream.str(); } diff --git a/test/unit/include/test_common/test_broker.hpp b/test/unit/include/test_common/test_broker.hpp index cd8cabd..8d4e1fe 100644 --- a/test/unit/include/test_common/test_broker.hpp +++ b/test/unit/include/test_common/test_broker.hpp @@ -150,7 +150,14 @@ public: for (size_t i = 0; i < num_packets; ++i, ++it) { BOOST_CHECK_EQUAL(it->size(), expected[i].size()); size_t len = std::min(it->size(), expected[i].size()); - BOOST_CHECK(!memcmp(it->data(), expected[i].data(), len)); + if (memcmp(it->data(), expected[i].data(), len)) { + std::ostringstream stream; + stream << "Packet mismatch! Expected: " + << to_readable_packet(expected[i]) + << " Received: " + << to_readable_packet(std::string((const char*)it->data(), len)); + log(stream.str()); + } } } diff --git a/test/unit/include/test_common/test_service.hpp b/test/unit/include/test_common/test_service.hpp index 2988e2f..d2c9a57 100644 --- a/test/unit/include/test_common/test_service.hpp +++ b/test/unit/include/test_common/test_service.hpp @@ -18,9 +18,9 @@ template < typename StreamType, typename TlsContext = std::monostate > -class test_service : public detail::client_service { +class test_service : public async_mqtt5::detail::client_service { using error_code = boost::system::error_code; - using base = detail::client_service; + using base = async_mqtt5::detail::client_service; asio::any_io_executor _ex; connack_props _test_props; diff --git a/test/unit/test/disconnect_op.cpp b/test/unit/test/disconnect_op.cpp new file mode 100644 index 0000000..f6fab96 --- /dev/null +++ b/test/unit/test/disconnect_op.cpp @@ -0,0 +1,127 @@ +#include + +#include +#include +#include +#include + +#include + +#include + +#include + +#include "test_common/message_exchange.hpp" +#include "test_common/test_service.hpp" +#include "test_common/test_stream.hpp" + +using namespace async_mqtt5; + +BOOST_AUTO_TEST_SUITE(disconnect_op/*, *boost::unit_test::disabled()*/) + + +BOOST_AUTO_TEST_CASE(test_malformed_packet) { + std::string malformed_str = std::string{ 0x01 }; + + disconnect_props invalid_user_props; + invalid_user_props[prop::user_property].push_back(malformed_str); + + disconnect_props invalid_reason_string; + invalid_reason_string[prop::reason_string] = malformed_str; + + std::vector testing_props = { + invalid_user_props, invalid_reason_string + }; + + int expected_handlers_called = static_cast(testing_props.size()); + int handlers_called = 0; + + asio::io_context ioc; + using client_service_type = test::test_service; + auto svc_ptr = std::make_shared(ioc.get_executor()); + + for (const auto& props : testing_props) { + auto handler = [&handlers_called](error_code ec) { + ++handlers_called; + BOOST_CHECK(ec == client::error::malformed_packet); + }; + + detail::disconnect_ctx ctx; + ctx.props = props; + + detail::disconnect_op< + client_service_type, detail::disconnect_ctx + > { svc_ptr, std::move(ctx), std::move(handler) } + .perform(); + } + + ioc.run_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(test_omitting_props) { + using test::after; + using std::chrono_literals::operator ""ms; + + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + connack_props co_props; + co_props[prop::maximum_packet_size] = 20; + + // packets + auto connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + auto connack = encoders::encode_connack( + false, reason_codes::success.value(), co_props + ); + + disconnect_props props; + props[prop::user_property].push_back(std::string(50, 'a')); + auto disconnect = encoders::encode_disconnect( + reason_codes::normal_disconnection.value(), props + ); + auto disconnect_no_props = encoders::encode_disconnect( + reason_codes::normal_disconnection.value(), disconnect_props{} + ); + + test::msg_exchange broker_side; + error_code success {}; + + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(disconnect_no_props) + .complete_with(success, after(0ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1") + .run(); + + asio::steady_timer timer(c.get_executor()); + timer.expires_after(std::chrono::milliseconds(200)); + timer.async_wait([&](auto) { + c.async_disconnect( + disconnect_rc_e::normal_disconnection, props, + [&](error_code ec) { + handlers_called++; + BOOST_CHECK(!ec); + } + ); + }); + + ioc.run_for(std::chrono::seconds(5)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/unit/test/publish_send_op.cpp b/test/unit/test/publish_send_op.cpp index 14ece50..81e3527 100644 --- a/test/unit/test/publish_send_op.cpp +++ b/test/unit/test/publish_send_op.cpp @@ -5,12 +5,14 @@ #include #include -#include +#include #include #include +#include "test_common/message_exchange.hpp" #include "test_common/test_service.hpp" +#include "test_common/test_stream.hpp" using namespace async_mqtt5; @@ -52,7 +54,7 @@ BOOST_AUTO_TEST_CASE(test_pid_overrun) { "test", "payload", retain_e::no, {} ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } @@ -80,13 +82,17 @@ BOOST_AUTO_TEST_CASE(test_invalid_topic_names) { .perform(topic, "payload", retain_e::no, {}); } - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_CASE(test_malformed_packet) { + uint16_t topic_alias_max = 10; std::string malformed_str = std::string { 0x01 }; + connack_props cprops; + cprops[prop::topic_alias_maximum] = topic_alias_max; + publish_props malfored_response_topic_props; malfored_response_topic_props[prop::response_topic] = "response#topic"; @@ -99,13 +105,17 @@ BOOST_AUTO_TEST_CASE(test_malformed_packet) { publish_props malformed_content_type_props; malformed_content_type_props[prop::content_type] = malformed_str; + publish_props zero_topic_alias_props; + zero_topic_alias_props[prop::topic_alias] = uint16_t(0); + publish_props out_of_range_subid_props; out_of_range_subid_props[prop::subscription_identifier] = 300'000'000; + std::vector testing_props = { malfored_response_topic_props, utf8_payload_props, invalid_user_props, malformed_content_type_props, - out_of_range_subid_props + zero_topic_alias_props, out_of_range_subid_props }; int expected_handlers_called = static_cast(testing_props.size()); @@ -113,7 +123,8 @@ BOOST_AUTO_TEST_CASE(test_malformed_packet) { asio::io_context ioc; using client_service_type = test::test_service; - auto svc_ptr = std::make_shared(ioc.get_executor()); + auto svc_ptr = std::make_shared(ioc.get_executor(), std::move(cprops)); + BOOST_ASSERT(svc_ptr->connack_property(prop::topic_alias_maximum) == topic_alias_max); for (const auto& props: testing_props) { auto handler = [&handlers_called](error_code ec) { @@ -127,11 +138,13 @@ BOOST_AUTO_TEST_CASE(test_malformed_packet) { .perform("topic", malformed_str, retain_e::no, props); } - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_CASE(test_packet_too_large) { + int max_packet_sz = 10; + connack_props props; props[prop::maximum_packet_size] = 10; @@ -143,6 +156,7 @@ BOOST_AUTO_TEST_CASE(test_packet_too_large) { auto svc_ptr = std::make_shared( ioc.get_executor(), std::move(props) ); + BOOST_ASSERT(svc_ptr->connack_property(prop::maximum_packet_size) == max_packet_sz); auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) { ++handlers_called; @@ -157,13 +171,15 @@ BOOST_AUTO_TEST_CASE(test_packet_too_large) { "test", "payload", retain_e::no, {} ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_CASE(test_qos_not_supported) { + uint8_t max_qos = 0; + connack_props props; - props[prop::maximum_qos] = uint8_t(0); + props[prop::maximum_qos] = max_qos; constexpr int expected_handlers_called = 1; int handlers_called = 0; @@ -173,6 +189,7 @@ BOOST_AUTO_TEST_CASE(test_qos_not_supported) { auto svc_ptr = std::make_shared( ioc.get_executor(), std::move(props) ); + BOOST_ASSERT(svc_ptr->connack_property(prop::maximum_qos) == max_qos); auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) { ++handlers_called; @@ -187,13 +204,15 @@ BOOST_AUTO_TEST_CASE(test_qos_not_supported) { "test", "payload", retain_e::no, {} ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_CASE(test_retain_not_available) { + uint8_t retain = 0; + connack_props props; - props[prop::retain_available] = uint8_t(0); + props[prop::retain_available] = retain; constexpr int expected_handlers_called = 1; int handlers_called = 0; @@ -203,6 +222,7 @@ BOOST_AUTO_TEST_CASE(test_retain_not_available) { auto svc_ptr = std::make_shared( ioc.get_executor(), std::move(props) ); + BOOST_ASSERT(svc_ptr->connack_property(prop::retain_available) == retain); auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) { ++handlers_called; @@ -217,13 +237,15 @@ BOOST_AUTO_TEST_CASE(test_retain_not_available) { "test", "payload", retain_e::yes, {} ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_CASE(test_topic_alias_maximum) { + uint16_t max_topic_alias = 10; + connack_props ta_allowed_props; - ta_allowed_props[prop::topic_alias_maximum] = uint16_t(10); + ta_allowed_props[prop::topic_alias_maximum] = max_topic_alias; std::vector test_props = { ta_allowed_props, connack_props {} /* not allowed */ @@ -256,7 +278,7 @@ BOOST_AUTO_TEST_CASE(test_topic_alias_maximum) { ); } - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } @@ -292,8 +314,184 @@ BOOST_AUTO_TEST_CASE(test_publish_cancellation) { "test", "payload", retain_e::no, {} ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } +BOOST_AUTO_TEST_CASE(test_malformed_puback) { + using test::after; + using std::chrono_literals::operator ""ms; + + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + // packets + auto connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); + + auto publish = encoders::encode_publish( + 1, "t", "p", qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + auto malformed_puback = encoders::encode_puback(1, uint8_t(0x04), {}); + + auto publish_dup = encoders::encode_publish( + 1, "t", "p", qos_e::at_least_once, retain_e::no, dup_e::yes, {} + ); + auto puback = encoders::encode_puback(1, reason_codes::success.value(), {}); + + disconnect_props dc_props; + dc_props[prop::reason_string] = "Malformed PUBACK: invalid Reason Code"; + auto disconnect = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), dc_props + ); + + test::msg_exchange broker_side; + error_code success {}; + + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(publish) + .complete_with(success, after(0ms)) + .reply_with(malformed_puback, after(0ms)) + .expect(disconnect) + .complete_with(success, after(0ms)) + .reply_with(std::string {}, after(0ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(publish_dup) + .complete_with(success, after(0ms)) + .reply_with(puback, after(0ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1") + .run(); + + c.async_publish( + "t", "p", retain_e::no, publish_props {}, + [&](error_code ec, reason_code rc, auto) { + ++handlers_called; + + BOOST_CHECK(!ec); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + + c.cancel(); + } + ); + + ioc.run_for(std::chrono::seconds(10)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + + +BOOST_AUTO_TEST_CASE(test_malformed_pubrec_pubcomp) { + using test::after; + using std::chrono_literals::operator ""ms; + + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + // packets + auto connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); + + auto publish = encoders::encode_publish( + 1, "t", "p", qos_e::exactly_once, retain_e::no, dup_e::no, {} + ); + auto malformed_pubrec = encoders::encode_pubrec(1, uint8_t(0x04), {}); + + auto publish_dup = encoders::encode_publish( + 1, "t", "p", qos_e::exactly_once, retain_e::no, dup_e::yes, {} + ); + auto pubrec = encoders::encode_pubrec(1, reason_codes::success.value(), {}); + + auto pubrel = encoders::encode_pubrel(1, reason_codes::success.value(), {}); + auto malformed_pubcomp = encoders::encode_pubcomp(1, uint8_t(0x04), {}); + auto pubcomp = encoders::encode_pubcomp(1, reason_codes::success.value(), {}); + + disconnect_props dc_props; + dc_props[prop::reason_string] = "Malformed PUBREC: invalid Reason Code"; + auto disconnect_on_pubrec = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), dc_props + ); + + dc_props[prop::reason_string] = "Malformed PUBCOMP: invalid Reason Code"; + auto disconnect_on_pubcomp = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), dc_props + ); + + test::msg_exchange broker_side; + error_code success {}; + + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(publish) + .complete_with(success, after(0ms)) + .reply_with(malformed_pubrec, after(0ms)) + .expect(disconnect_on_pubrec) + .complete_with(success, after(0ms)) + .reply_with(std::string {}, after(0ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(publish_dup) + .complete_with(success, after(0ms)) + .reply_with(pubrec, after(0ms)) + .expect(pubrel) + .complete_with(success, after(0ms)) + .reply_with(malformed_pubcomp, after(0ms)) + .expect(disconnect_on_pubcomp) + .complete_with(success, after(0ms)) + .reply_with(std::string {}, after(0ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(pubrel) + .complete_with(success, after(0ms)) + .reply_with(pubcomp, after(0ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1") + .run(); + + c.async_publish( + "t", "p", retain_e::no, publish_props {}, + [&](error_code ec, reason_code rc, auto) { + ++handlers_called; + + BOOST_CHECK(!ec); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + + c.cancel(); + } + ); + + ioc.run_for(std::chrono::seconds(15)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/unit/test/subscribe_op.cpp b/test/unit/test/subscribe_op.cpp index 9edb8cf..ca88630 100644 --- a/test/unit/test/subscribe_op.cpp +++ b/test/unit/test/subscribe_op.cpp @@ -2,16 +2,59 @@ #include -#include +#include #include +#include "test_common/message_exchange.hpp" #include "test_common/test_service.hpp" +#include "test_common/test_stream.hpp" using namespace async_mqtt5; BOOST_AUTO_TEST_SUITE(subscribe_op/*, *boost::unit_test::disabled()*/) +template < + typename StreamType, + typename TlsContext = std::monostate +> +class overrun_client : public detail::client_service { +public: + overrun_client(const asio::any_io_executor& ex, const std::string& cnf) : + detail::client_service(ex, cnf) + {} + + uint16_t allocate_pid() { + return 0; + } +}; + +BOOST_AUTO_TEST_CASE(test_pid_overrun) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + using client_service_type = overrun_client; + auto svc_ptr = std::make_shared(ioc.get_executor(), ""); + + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { + ++handlers_called; + BOOST_CHECK(ec == client::error::pid_overrun); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); + }; + + detail::subscribe_op< + client_service_type, decltype(handler) + > { svc_ptr, std::move(handler) } + .perform( + { { "topic", { qos_e::exactly_once } } }, subscribe_props {} + ); + + ioc.run(); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); +} + BOOST_AUTO_TEST_CASE(test_invalid_topic_filters) { std::vector invalid_topics = { "", "+topic", "#topic", "some/#/topic", "topic+", @@ -25,9 +68,11 @@ BOOST_AUTO_TEST_CASE(test_invalid_topic_filters) { auto svc_ptr = std::make_shared(ioc.get_executor()); for (const auto& topic: invalid_topics) { - auto handler = [&handlers_called](error_code ec, auto, auto) { + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { ++handlers_called; BOOST_CHECK(ec == client::error::invalid_topic); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); }; detail::subscribe_op< @@ -36,7 +81,42 @@ BOOST_AUTO_TEST_CASE(test_invalid_topic_filters) { .perform({{ topic, { qos_e::exactly_once } }}, subscribe_props {}); } - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); +} + + +BOOST_AUTO_TEST_CASE(test_malformed_packet) { + std::vector test_properties = { + std::string(75000, 'a'), std::string(10, char(0x01)) + }; + + const int expected_handlers_called = static_cast(test_properties.size()); + int handlers_called = 0; + + asio::io_context ioc; + using client_service_type = test::test_service; + auto svc_ptr = std::make_shared(ioc.get_executor()); + + for (const auto& test_prop: test_properties) { + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { + ++handlers_called; + BOOST_CHECK(ec == client::error::malformed_packet); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); + }; + + subscribe_props props; + props[prop::user_property].push_back(test_prop); + + detail::subscribe_op< + client_service_type, decltype(handler) + > { svc_ptr, std::move(handler) } + .perform({ { "topic", { qos_e::exactly_once } } }, props + ); + } + + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } @@ -58,9 +138,11 @@ BOOST_AUTO_TEST_CASE(test_wildcard_subscriptions_not_supported) { BOOST_ASSERT(svc_ptr->connack_property(prop::wildcard_subscription_available) == 0); for (const auto& topic: wildcard_topics) { - auto handler = [&handlers_called](error_code ec, auto, auto) { + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { ++handlers_called; BOOST_CHECK(ec == client::error::wildcard_subscription_not_available); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); }; detail::subscribe_op< @@ -71,7 +153,7 @@ BOOST_AUTO_TEST_CASE(test_wildcard_subscriptions_not_supported) { ); } - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } @@ -89,9 +171,11 @@ BOOST_AUTO_TEST_CASE(test_shared_subscriptions_not_supported) { ); BOOST_ASSERT(svc_ptr->connack_property(prop::shared_subscription_available) == 0); - auto handler = [&handlers_called](error_code ec, auto, auto) { + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { ++handlers_called; BOOST_CHECK(ec == client::error::shared_subscription_not_available); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); }; detail::subscribe_op< @@ -101,13 +185,15 @@ BOOST_AUTO_TEST_CASE(test_shared_subscriptions_not_supported) { {{ "$share/group/topic", { qos_e::exactly_once } }}, subscribe_props {} ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_CASE(test_large_subscription_id) { + uint8_t sub_id_available = 1; + connack_props props; - props[prop::subscription_identifier_available] = uint8_t(1); + props[prop::subscription_identifier_available] = sub_id_available; constexpr int expected_handlers_called = 1; int handlers_called = 0; @@ -117,11 +203,13 @@ BOOST_AUTO_TEST_CASE(test_large_subscription_id) { auto svc_ptr = std::make_shared( ioc.get_executor(), std::move(props) ); - BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == 1); + BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == sub_id_available); - auto handler = [&handlers_called](error_code ec, auto, auto) { + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { ++handlers_called; BOOST_CHECK(ec == client::error::subscription_identifier_not_available); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); }; subscribe_props sub_props_big_id {}; @@ -134,13 +222,15 @@ BOOST_AUTO_TEST_CASE(test_large_subscription_id) { {{ "topic", { qos_e::exactly_once } }}, sub_props_big_id ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } BOOST_AUTO_TEST_CASE(test_subscription_ids_not_supported) { + uint8_t sub_id_available = 0; + connack_props props; - props[prop::subscription_identifier_available] = uint8_t(0); + props[prop::subscription_identifier_available] = sub_id_available; constexpr int expected_handlers_called = 1; int handlers_called = 0; @@ -150,11 +240,13 @@ BOOST_AUTO_TEST_CASE(test_subscription_ids_not_supported) { auto svc_ptr = std::make_shared( ioc.get_executor(), std::move(props) ); - BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == 0); + BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == sub_id_available); - auto handler = [&handlers_called](error_code ec, auto, auto) { + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { ++handlers_called; BOOST_CHECK(ec == client::error::subscription_identifier_not_available); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); }; subscribe_props sub_props {}; @@ -167,8 +259,113 @@ BOOST_AUTO_TEST_CASE(test_subscription_ids_not_supported) { {{ "topic", { qos_e::exactly_once } }}, sub_props ); - ioc.run(); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); } +BOOST_AUTO_TEST_CASE(test_packet_too_large) { + int max_packet_sz = 10; + + connack_props props; + props[prop::maximum_packet_size] = max_packet_sz; + + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + using client_service_type = test::test_service; + auto svc_ptr = std::make_shared( + ioc.get_executor(), std::move(props) + ); + BOOST_ASSERT(svc_ptr->connack_property(prop::maximum_packet_size) == max_packet_sz); + + auto handler = [&handlers_called](error_code ec, std::vector rcs, auto) { + ++handlers_called; + BOOST_CHECK(ec == client::error::packet_too_large); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty); + }; + + detail::subscribe_op< + client_service_type, decltype(handler) + > { svc_ptr, std::move(handler) } + .perform( + { { "topic", { qos_e::exactly_once } } }, subscribe_props {} + ); + + ioc.run_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); +} + +BOOST_AUTO_TEST_CASE(test_resending) { + using test::after; + using std::chrono_literals::operator ""ms; + + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + // data + std::vector topics = { + subscribe_topic { "topic", subscribe_options {} } + }; + subscribe_props subscribe_props; + + std::vector rcs = { reason_codes::granted_qos_0.value() }; + suback_props suback_props; + + // packets + auto connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); + + auto subscribe = encoders::encode_subscribe(1, topics, subscribe_props); + auto suback = encoders::encode_suback(1, rcs, suback_props); + + test::msg_exchange broker_side; + error_code success {}; + error_code fail = asio::error::not_connected; + + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(subscribe) + .complete_with(fail, after(0ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(subscribe) + .complete_with(success, after(0ms)) + .reply_with(suback, after(0ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1") + .run(); + + c.async_subscribe( + topics, subscribe_props, + [&](error_code ec, std::vector rcs, auto) { + handlers_called++; + + BOOST_CHECK(!ec); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::granted_qos_0); + + c.cancel(); + } + ); + + ioc.run_for(std::chrono::seconds(10)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + BOOST_AUTO_TEST_SUITE_END()