diff --git a/test/integration/async_sender.cpp b/test/integration/async_sender.cpp new file mode 100644 index 0000000..394dea2 --- /dev/null +++ b/test/integration/async_sender.cpp @@ -0,0 +1,185 @@ +#include + +#include + +#include + +#include "test_common/message_exchange.hpp" +#include "test_common/test_service.hpp" +#include "test_common/test_stream.hpp" + +using namespace async_mqtt5; + +BOOST_AUTO_TEST_SUITE(async_sender/*, *boost::unit_test::disabled()*/) + +struct shared_test_data { + error_code success {}; + error_code fail = asio::error::not_connected; + + const std::string connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + const std::string connack = encoders::encode_connack( + false, reason_codes::success.value(), {} + ); + + const std::string topic = "topic"; + const std::string payload = "payload"; +}; + +using test::after; +using namespace std::chrono; + +BOOST_FIXTURE_TEST_CASE(ordering_after_reconnect, shared_test_data) { + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + // packets + auto publish_qos1 = encoders::encode_publish( + 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + auto publish_qos1_dup = encoders::encode_publish( + 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::yes, {} + ); + + auto publish_qos2 = encoders::encode_publish( + 2, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::no, {} + ); + + auto puback = encoders::encode_puback(1, uint8_t(0x00), {}); + auto pubrec = encoders::encode_pubrec(2, uint8_t(0x00), {}); + auto pubrel = encoders::encode_pubrel(2, uint8_t(0x00), {}); + auto pubcomp = encoders::encode_pubcomp(2, uint8_t(0x00), {}); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1, publish_qos2) + .complete_with(success, after(1ms)) + .reply_with(pubrec, after(2ms)) + .expect(pubrel) + .complete_with(fail, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(pubrel, publish_qos1_dup) + .complete_with(success, after(1ms)) + .reply_with(pubcomp, puback, after(2ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff + .async_run(asio::detached); + + c.async_publish( + topic, payload, retain_e::no, publish_props {}, + [&](error_code ec, reason_code rc, puback_props) { + ++handlers_called; + + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rc, rc.message()); + + if (handlers_called == expected_handlers_called) + c.cancel(); + } + ); + + c.async_publish( + topic, payload, retain_e::no, publish_props{}, + [&](error_code ec, reason_code rc, pubcomp_props) { + ++handlers_called; + + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rc, rc.message()); + + if (handlers_called == expected_handlers_called) + c.cancel(); + } + ); + + ioc.run_for(1s); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + +BOOST_FIXTURE_TEST_CASE(throttling, shared_test_data) { + constexpr int expected_handlers_called = 3; + int handlers_called = 0; + + connack_props props; + props[prop::receive_maximum] = int16_t(1); + + //packets + auto connack_rm = encoders::encode_connack( + false, reason_codes::success.value(), props + ); + + auto publish_1 = encoders::encode_publish( + 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + auto publish_2 = encoders::encode_publish( + 2, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + auto publish_3 = encoders::encode_publish( + 3, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + + auto puback_1 = encoders::encode_puback(1, uint8_t(0x00), {}); + auto puback_2 = encoders::encode_puback(2, uint8_t(0x00), {}); + auto puback_3 = encoders::encode_puback(3, uint8_t(0x00), {}); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack_rm, after(2ms)) + .expect(publish_1) + .complete_with(success, after(1ms)) + .reply_with(puback_1, after(2ms)) + .expect(publish_2) + .complete_with(success, after(1ms)) + .reply_with(puback_2, after(2ms)) + .expect(publish_3) + .complete_with(success, after(1ms)) + .reply_with(puback_3, after(2ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1") + .async_run(asio::detached); + + for (auto i = 0; i < 3; i++) + c.async_publish( + topic, payload, retain_e::no, publish_props {}, + [&c, &handlers_called, i](error_code ec, reason_code rc, puback_props) { + ++handlers_called; + + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rc, rc.message()); + BOOST_CHECK_EQUAL(handlers_called, i + 1); + + if (i == 2) + c.cancel(); + } + ); + + ioc.run_for(1s); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + +BOOST_AUTO_TEST_SUITE_END(); diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index 4d02dea..a6dc7d3 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -244,46 +244,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_unsubscribe) { run_cancel_op_test(); } -BOOST_AUTO_TEST_CASE(signal_emit_async_run_cancels_client) { - using namespace test; - - constexpr int expected_handlers_called = 2; - int handlers_called = 0; - - asio::io_context ioc; - client_type c(ioc, ""); - - asio::cancellation_signal signal; - - c.brokers("127.0.0.1", 1883) - .async_run( - asio::bind_cancellation_slot( - signal.slot(), - [&handlers_called](error_code ec) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - handlers_called++; - } - ) - ); - - c.async_publish( - "topic", "payload", retain_e::yes, {}, - [&handlers_called](error_code ec) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - handlers_called++; - } - ); - - asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::milliseconds(10)); - timer.async_wait([&](auto) { - signal.emit(asio::cancellation_type::terminal); - }); - - ioc.run_for(std::chrono::seconds(1)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); -} - struct shared_test_data { error_code success {}; error_code fail = asio::error::not_connected; diff --git a/test/integration/executors.cpp b/test/integration/executors.cpp index 1debc9f..d1fee9f 100644 --- a/test/integration/executors.cpp +++ b/test/integration/executors.cpp @@ -19,7 +19,7 @@ BOOST_AUTO_TEST_SUITE(executors) BOOST_AUTO_TEST_CASE(async_run) { using test::after; - using std::chrono_literals::operator ""ms; + using namespace std::chrono; constexpr int expected_handlers_called = 9; int handlers_called = 0; diff --git a/test/integration/read_message.cpp b/test/integration/read_message.cpp index f87254e..03139a8 100644 --- a/test/integration/read_message.cpp +++ b/test/integration/read_message.cpp @@ -57,8 +57,8 @@ void test_receive_malformed_packet( .async_run(asio::detached); asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::milliseconds(100)); - timer.async_wait([&](auto) { c.cancel(); }); + timer.expires_after(100ms); + timer.async_wait([&c](error_code) { c.cancel(); }); ioc.run(); BOOST_CHECK(broker.received_all_expected()); @@ -110,16 +110,9 @@ struct shared_test_data { BOOST_FIXTURE_TEST_CASE(receive_disconnect, shared_test_data) { // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); - auto disconnect = encoders::encode_disconnect(0x00, {}); test::msg_exchange broker_side; - error_code success {}; - broker_side .expect(connect) .complete_with(success, after(0ms)) @@ -142,8 +135,8 @@ BOOST_FIXTURE_TEST_CASE(receive_disconnect, shared_test_data) { .async_run(asio::detached); asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::milliseconds(100)); - timer.async_wait([&](auto) { c.cancel(); }); + timer.expires_after(100ms); + timer.async_wait([&c](error_code) { c.cancel(); }); ioc.run(); BOOST_CHECK(broker.received_all_expected()); @@ -152,23 +145,15 @@ BOOST_FIXTURE_TEST_CASE(receive_disconnect, shared_test_data) { BOOST_FIXTURE_TEST_CASE(receive_pingresp, shared_test_data) { // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); - auto pingresp = encoders::encode_pingresp(); test::msg_exchange broker_side; - error_code success {}; - broker_side .expect(connect) .complete_with(success, after(0ms)) .reply_with(connack, after(0ms)) .send(pingresp, after(10ms)); - asio::io_context ioc; auto executor = ioc.get_executor(); auto& broker = asio::make_service( @@ -181,14 +166,13 @@ BOOST_FIXTURE_TEST_CASE(receive_pingresp, shared_test_data) { .async_run(asio::detached); asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::milliseconds(100)); - timer.async_wait([&](auto) { c.cancel(); }); + timer.expires_after(100ms); + timer.async_wait([&c](error_code) { c.cancel(); }); ioc.run(); BOOST_CHECK(broker.received_all_expected()); } - BOOST_FIXTURE_TEST_CASE(receive_byte_by_byte, shared_test_data) { constexpr int expected_handlers_called = 1; int handlers_called = 0; @@ -198,18 +182,11 @@ BOOST_FIXTURE_TEST_CASE(receive_byte_by_byte, shared_test_data) { std::string payload = "payload"; // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); - auto publish = encoders::encode_publish( 0, topic, payload, qos_e::at_most_once, retain_e::no, dup_e::no, {} ); test::msg_exchange broker_side; - error_code success {}; - broker_side .expect(connect) .complete_with(success, after(0ms)) @@ -243,7 +220,7 @@ BOOST_FIXTURE_TEST_CASE(receive_byte_by_byte, shared_test_data) { c.cancel(); }); - ioc.run_for(std::chrono::milliseconds(100)); + ioc.run_for(100ms); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); BOOST_CHECK(broker.received_all_expected()); } diff --git a/test/integration/sub_unsub.cpp b/test/integration/sub_unsub.cpp new file mode 100644 index 0000000..5bc716a --- /dev/null +++ b/test/integration/sub_unsub.cpp @@ -0,0 +1,182 @@ +#include + +#include + +#include + +#include "test_common/message_exchange.hpp" +#include "test_common/test_service.hpp" +#include "test_common/test_stream.hpp" + +using namespace async_mqtt5; + +namespace async_mqtt5::test { + +enum operation_type { + subscribe = 1, + unsubscribe +}; + +} // end namespace async_mqtt5::test + +BOOST_AUTO_TEST_SUITE(sub_unsub/*, *boost::unit_test::disabled()*/) + +struct shared_test_data { + error_code success {}; + error_code fail = asio::error::not_connected; + + const std::string connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + const std::string connack = encoders::encode_connack( + false, reason_codes::success.value(), {} + ); + + std::vector sub_topics = { + subscribe_topic { "topic", subscribe_options {} } + }; + std::vector unsub_topics = { "topic" }; + std::vector rcs = { uint8_t(0x00) }; + + const std::string subscribe = encoders::encode_subscribe( + 1, sub_topics, subscribe_props {} + ); + const std::string suback = encoders::encode_suback(1, rcs, suback_props {}); + + const std::string unsubscribe = encoders::encode_unsubscribe( + 1, unsub_topics, unsubscribe_props {} + ); + const std::string unsuback = encoders::encode_unsuback(1, rcs, unsuback_props {}); +}; + +using test::after; +using namespace std::chrono; + +template +void run_test(test::msg_exchange broker_side) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff + .async_run(asio::detached); + + auto data = shared_test_data(); + if constexpr (op_type == test::operation_type::subscribe) + c.async_subscribe( + data.sub_topics, subscribe_props {}, + [&handlers_called, &c](error_code ec, std::vector rcs, suback_props) { + ++handlers_called; + + BOOST_CHECK(!ec); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::granted_qos_0); + + c.cancel(); + } + ); + else + c.async_unsubscribe( + data.unsub_topics, unsubscribe_props {}, + [&handlers_called, &c](error_code ec, std::vector rcs, unsuback_props) { + ++handlers_called; + + BOOST_CHECK(!ec); + BOOST_ASSERT(rcs.size() == 1); + BOOST_CHECK_EQUAL(rcs[0], reason_codes::success); + + c.cancel(); + } + ); + + ioc.run_for(5s); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + +// subscribe + +BOOST_FIXTURE_TEST_CASE(fail_to_send_subscribe, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(subscribe) + .complete_with(fail, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(subscribe) + .complete_with(success, after(1ms)) + .reply_with(suback, after(2ms)); + + run_test(std::move(broker_side)); +} + +BOOST_FIXTURE_TEST_CASE(fail_to_receive_suback, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(subscribe) + .complete_with(success, after(1ms)) + .send(fail, after(15ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(subscribe) + .complete_with(success, after(1ms)) + .reply_with(suback, after(2ms)); + + run_test(std::move(broker_side)); +} + +// unsubscribe + +BOOST_FIXTURE_TEST_CASE(fail_to_send_unsubscribe, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(unsubscribe) + .complete_with(fail, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(unsubscribe) + .complete_with(success, after(1ms)) + .reply_with(unsuback, after(2ms)); + + run_test(std::move(broker_side)); +} + +BOOST_FIXTURE_TEST_CASE(fail_to_receive_unsuback, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(unsubscribe) + .complete_with(success, after(1ms)) + .send(fail, after(15ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(unsubscribe) + .complete_with(success, after(1ms)) + .reply_with(unsuback, after(2ms)); + + run_test(std::move(broker_side)); +} + +BOOST_AUTO_TEST_SUITE_END();