From 05ae2f55623bfa86feabc9b18f8dc516011f83c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Tue, 23 Jan 2024 11:46:00 +0100 Subject: [PATCH] Simplify integration publish tests Summary: related to T12015 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27511 --- test/integration/malformed_packet.cpp | 355 ---------------- test/integration/publish_send.cpp | 302 -------------- ...ublish_receive.cpp => receive_publish.cpp} | 143 ++++++- test/integration/resending.cpp | 312 -------------- test/integration/send_publish.cpp | 394 ++++++++++++++++++ 5 files changed, 522 insertions(+), 984 deletions(-) delete mode 100644 test/integration/malformed_packet.cpp delete mode 100644 test/integration/publish_send.cpp rename test/integration/{publish_receive.cpp => receive_publish.cpp} (53%) delete mode 100644 test/integration/resending.cpp create mode 100644 test/integration/send_publish.cpp diff --git a/test/integration/malformed_packet.cpp b/test/integration/malformed_packet.cpp deleted file mode 100644 index de01aba..0000000 --- a/test/integration/malformed_packet.cpp +++ /dev/null @@ -1,355 +0,0 @@ -#include - -#include - -#include "test_common/message_exchange.hpp" -#include "test_common/test_service.hpp" -#include "test_common/test_stream.hpp" - -using namespace async_mqtt5; - -BOOST_AUTO_TEST_SUITE(malformed_packet/* , *boost::unit_test::disabled()*/) - -struct shared_test_data { - const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - const std::string connack = encoders::encode_connack( - true, reason_codes::success.value(), {} - ); - - std::string topic = "topic"; - std::string payload = "payload"; - - const std::string publish_qos2 = encoders::encode_publish( - 1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::no, {} - ); - const std::string publish_qos2_dup = encoders::encode_publish( - 1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::yes, {} - ); - - const std::string pubrec = encoders::encode_pubrec(1, uint8_t(0x00), {}); - const std::string pubrel = encoders::encode_pubrel(1, uint8_t(0x00), {}); - const std::string pubcomp = encoders::encode_pubcomp(1, uint8_t(0x00), {}); - -}; - -BOOST_FIXTURE_TEST_CASE(test_malformed_publish, shared_test_data) { - using test::after; - using std::chrono_literals::operator ""ms; - - // packets - auto publish = encoders::encode_publish( - 1, topic, payload, static_cast(0b11), retain_e::yes, dup_e::no, {} - ); - - disconnect_props dprops; - dprops[prop::reason_string] = "Malformed PUBLISH received: QoS bits set to 0b11"; - auto disconnect = encoders::encode_disconnect( - reason_codes::malformed_packet.value(), dprops - ); - - test::msg_exchange broker_side; - error_code success {}; - - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .send(publish, after(10ms)) - .expect(disconnect) - .complete_with(success, after(1ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::seconds(2)); - timer.async_wait([&](auto) { c.cancel(); }); - - ioc.run(); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_FIXTURE_TEST_CASE(malformed_puback, shared_test_data) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - // packets - auto publish = encoders::encode_publish( - 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto publish_dup = encoders::encode_publish( - 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::yes, {} - ); - - auto puback = encoders::encode_puback(1, uint8_t(0x00), {}); - auto malformed_puback = encoders::encode_puback(1, uint8_t(0x04), {}); - - disconnect_props dc_props; - dc_props[prop::reason_string] = "Malformed PUBACK: invalid Reason Code"; - auto disconnect = encoders::encode_disconnect( - reason_codes::malformed_packet.value(), dc_props - ); - - test::msg_exchange broker_side; - error_code success {}; - - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(publish) - .complete_with(success, after(0ms)) - .reply_with(malformed_puback, after(0ms)) - .expect(disconnect) - .complete_with(success, after(0ms)) - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(publish_dup) - .complete_with(success, after(0ms)) - .reply_with(puback, after(0ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_publish( - topic, payload, retain_e::no, publish_props {}, - [&](error_code ec, reason_code rc, auto) { - ++handlers_called; - - BOOST_CHECK(!ec); - BOOST_CHECK_EQUAL(rc, reason_codes::success); - - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(2)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - - -BOOST_FIXTURE_TEST_CASE(malformed_pubrel, shared_test_data) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - // packets - auto malformed_pubrel = encoders::encode_pubrel(1, uint8_t(0x04), {}); - - disconnect_props dprops; - dprops[prop::reason_string] = "Malformed PUBREL received: invalid Reason Code"; - auto disconnect = encoders::encode_disconnect( - reason_codes::malformed_packet.value(), dprops - ); - - test::msg_exchange broker_side; - error_code success {}; - - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .send(publish_qos2, after(10ms)) - .expect(pubrec) - .complete_with(success, after(1ms)) - .reply_with(malformed_pubrel, after(2ms)) - .expect(disconnect) - .complete_with(success, after(1ms)) - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .send(pubrel, after(100ms)) - .expect(pubcomp) - .complete_with(success, after(1ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_receive( - [&]( - error_code ec, - std::string rec_topic, std::string rec_payload, - publish_props - ) { - ++handlers_called; - - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_EQUAL(topic, rec_topic); - BOOST_CHECK_EQUAL(payload, rec_payload); - - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(6)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_FIXTURE_TEST_CASE(malformed_pubrec, shared_test_data) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - // packets - auto malformed_pubrec = encoders::encode_pubrec(1, uint8_t(0x04), {}); - - disconnect_props dprops; - dprops[prop::reason_string] = "Malformed PUBREC: invalid Reason Code"; - auto disconnect = encoders::encode_disconnect( - reason_codes::malformed_packet.value(), dprops - ); - - test::msg_exchange broker_side; - error_code success {}; - - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(publish_qos2) - .complete_with(success, after(0ms)) - .reply_with(malformed_pubrec, after(0ms)) - .expect(disconnect) - .complete_with(success, after(0ms)) - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(publish_qos2_dup) - .complete_with(success, after(0ms)) - .reply_with(pubrec, after(0ms)) - .expect(pubrel) - .complete_with(success, after(0ms)) - .reply_with(pubcomp, after(0ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_publish( - topic, payload, retain_e::no, publish_props {}, - [&](error_code ec, reason_code rc, auto) { - ++handlers_called; - - BOOST_CHECK(!ec); - BOOST_CHECK_EQUAL(rc, reason_codes::success); - - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(6)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - - -BOOST_FIXTURE_TEST_CASE(malformed_pubcomp, shared_test_data) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - // packets - auto malformed_pubcomp = encoders::encode_pubcomp(1, uint8_t(0x04), {}); - - disconnect_props dprops; - dprops[prop::reason_string] = "Malformed PUBCOMP: invalid Reason Code"; - auto disconnect = encoders::encode_disconnect( - reason_codes::malformed_packet.value(), dprops - ); - - test::msg_exchange broker_side; - error_code success {}; - - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(publish_qos2) - .complete_with(success, after(0ms)) - .reply_with(pubrec, after(0ms)) - .expect(pubrel) - .complete_with(success, after(0ms)) - .reply_with(malformed_pubcomp, after(0ms)) - .expect(disconnect) - .complete_with(success, after(0ms)) - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(pubrel) - .complete_with(success, after(0ms)) - .reply_with(pubcomp, after(0ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_publish( - topic, payload, retain_e::no, publish_props {}, - [&](error_code ec, reason_code rc, auto) { - ++handlers_called; - - BOOST_CHECK(!ec); - BOOST_CHECK_EQUAL(rc, reason_codes::success); - - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(6)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_AUTO_TEST_SUITE_END(); diff --git a/test/integration/publish_send.cpp b/test/integration/publish_send.cpp deleted file mode 100644 index 5c745c5..0000000 --- a/test/integration/publish_send.cpp +++ /dev/null @@ -1,302 +0,0 @@ -#include - -#include -#include - -#include - -#include "test_common/message_exchange.hpp" -#include "test_common/test_service.hpp" -#include "test_common/test_stream.hpp" - -using namespace async_mqtt5; - -BOOST_AUTO_TEST_SUITE(publish_send/*, *boost::unit_test::disabled()*/) - -BOOST_AUTO_TEST_CASE(ordering_after_reconnect) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 2; - int handlers_called = 0; - - // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack( - false, reason_codes::success.value(), {} - ); - auto publish_1 = encoders::encode_publish( - 1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto publish_1_dup = encoders::encode_publish( - 1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::yes, {} - ); - auto puback = encoders::encode_puback( - 1, reason_codes::success.value(), {} - ); - auto publish_2 = encoders::encode_publish( - 2, "t_2", "p_2", qos_e::exactly_once, retain_e::no, dup_e::no, {} - ); - auto pubrec = encoders::encode_pubrec( - 2, reason_codes::success.value(), {} - ); - auto pubrel = encoders::encode_pubrel( - 2, reason_codes::success.value(), {} - ); - auto pubcomp = encoders::encode_pubcomp( - 2, reason_codes::success.value(), {} - ); - - test::msg_exchange broker_side; - error_code success {}; - error_code fail = asio::error::not_connected; - - broker_side - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(20ms)) - .expect(publish_1, publish_2) - .complete_with(success, after(10ms)) - .reply_with(pubrec, after(20ms)) - .expect(pubrel) - .complete_with(fail, after(10ms)) - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(15ms)) - .expect(pubrel, publish_1_dup) - .complete_with(success, after(10ms)) - .reply_with(pubcomp, puback, after(20ms)); - - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_publish( - "t_1", "p_1", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - ++handlers_called; - - if (handlers_called == expected_handlers_called) - c.cancel(); - } - ); - - c.async_publish( - "t_2", "p_2", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - ++handlers_called; - - if (handlers_called == expected_handlers_called) - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(1)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_AUTO_TEST_CASE(throttling) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 3; - int handlers_called = 0; - - connack_props props; - props[prop::receive_maximum] = int16_t(1); - - //packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack( - false, reason_codes::success.value(), props - ); - auto publish_1 = encoders::encode_publish( - 1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto publish_2 = encoders::encode_publish( - 2, "t_1", "p_2", qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto publish_3 = encoders::encode_publish( - 3, "t_1", "p_3", qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto puback_1 = encoders::encode_puback( - 1, reason_codes::success.value(), {} - ); - auto puback_2 = encoders::encode_puback( - 2, reason_codes::success.value(), {} - ); - auto puback_3 = encoders::encode_puback( - 3, reason_codes::success.value(), {} - ); - - test::msg_exchange broker_side; - error_code success {}; - - broker_side - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(15ms)) - .expect(publish_1) - .complete_with(success, after(10ms)) - .reply_with(puback_1, after(15ms)) - .expect(publish_2) - .complete_with(success, after(10ms)) - .reply_with(puback_2, after(15ms)) - .expect(publish_3) - .complete_with(success, after(10ms)) - .reply_with(puback_3, after(15ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1") - .async_run(asio::detached); - - c.async_publish( - "t_1", "p_1", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - BOOST_CHECK_EQUAL(handlers_called, 0); - ++handlers_called; - } - ); - - - c.async_publish( - "t_1", "p_2", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - BOOST_CHECK_EQUAL(handlers_called, 1); - ++handlers_called; - } - ); - - c.async_publish( - "t_1", "p_3", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - BOOST_CHECK_EQUAL(handlers_called, 2); - ++handlers_called; - - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(1)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - - -BOOST_AUTO_TEST_CASE(cancel_multiple_ops) { - using test::after; - using namespace std::chrono; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - auto begin = high_resolution_clock::now(); - - // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack( - false, reason_codes::success.value(), {} - ); - auto publish_1 = encoders::encode_publish( - 1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto puback_1 = encoders::encode_puback( - 1, reason_codes::success.value(), {} - ); - - auto publish_2 = encoders::encode_publish( - 1, "t_2", "p_2", qos_e::exactly_once, retain_e::no, dup_e::no, {} - ); - auto pubrec_2 = encoders::encode_pubrec( - 1, reason_codes::success.value(), {} - ); - auto pubrel_2 = encoders::encode_pubrel( - 1, reason_codes::success.value(), {} - ); - - test::msg_exchange broker_side; - error_code success{}; - - broker_side - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(20ms)) - .expect(publish_1) - .complete_with(success, after(10ms)) - .reply_with(puback_1, after(10s)) - .send(publish_2, after(200ms)) - .expect(pubrec_2) - .complete_with(success, after(10ms)) - .reply_with(pubrel_2, after(10s)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1") - .async_run(asio::detached); - - c.async_publish( - "t_1", "p_1", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(ec, ec.message()); - BOOST_CHECK_MESSAGE(rc, rc.message()); - ++handlers_called; - } - ); - - asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::seconds(2)); - timer.async_wait([&](auto) { c.cancel(); }); - - ioc.run(); - auto end = high_resolution_clock::now(); - auto duration = duration_cast(end - begin); - - BOOST_CHECK_MESSAGE( - duration <= std::chrono::seconds(3), - "The client did not cancel properly!" - ); - - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - - -BOOST_AUTO_TEST_SUITE_END(); diff --git a/test/integration/publish_receive.cpp b/test/integration/receive_publish.cpp similarity index 53% rename from test/integration/publish_receive.cpp rename to test/integration/receive_publish.cpp index 9c9c741..528d301 100644 --- a/test/integration/publish_receive.cpp +++ b/test/integration/receive_publish.cpp @@ -2,7 +2,6 @@ #include #include -#include #include @@ -12,7 +11,7 @@ using namespace async_mqtt5; -BOOST_AUTO_TEST_SUITE(publish_receive/*, *boost::unit_test::disabled()*/) +BOOST_AUTO_TEST_SUITE(receive_publish/*, *boost::unit_test::disabled()*/) struct shared_test_data { error_code success {}; @@ -63,29 +62,27 @@ void run_test(test::msg_exchange broker_side) { c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff .async_run(asio::detached); - c.async_receive( - [&]( - error_code ec, - std::string rec_topic, std::string rec_payload, - publish_props - ) { + c.async_receive([&handlers_called, &c]( + error_code ec, std::string rec_topic, std::string rec_payload, publish_props + ){ ++handlers_called; + auto data = shared_test_data(); BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_EQUAL(shared_test_data().topic, rec_topic); - BOOST_CHECK_EQUAL(shared_test_data().payload, rec_payload); + BOOST_CHECK_EQUAL(data.topic, rec_topic); + BOOST_CHECK_EQUAL(data.payload, rec_payload); c.cancel(); } ); - ioc.run_for(std::chrono::seconds(10)); + ioc.run_for(3s); BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); BOOST_CHECK(broker.received_all_expected()); } - -BOOST_FIXTURE_TEST_CASE(test_receive_publish_qos0, shared_test_data) { + +BOOST_FIXTURE_TEST_CASE(receive_publish_qos0, shared_test_data) { test::msg_exchange broker_side; broker_side .expect(connect) @@ -96,7 +93,7 @@ BOOST_FIXTURE_TEST_CASE(test_receive_publish_qos0, shared_test_data) { run_test(std::move(broker_side)); } -BOOST_FIXTURE_TEST_CASE(test_receive_publish_qos1, shared_test_data) { +BOOST_FIXTURE_TEST_CASE(receive_publish_qos1, shared_test_data) { test::msg_exchange broker_side; broker_side .expect(connect) @@ -109,7 +106,7 @@ BOOST_FIXTURE_TEST_CASE(test_receive_publish_qos1, shared_test_data) { run_test(std::move(broker_side)); } -BOOST_FIXTURE_TEST_CASE(test_receive_publish_qos2, shared_test_data) { +BOOST_FIXTURE_TEST_CASE(receive_publish_qos2, shared_test_data) { test::msg_exchange broker_side; broker_side .expect(connect) @@ -125,7 +122,123 @@ BOOST_FIXTURE_TEST_CASE(test_receive_publish_qos2, shared_test_data) { run_test(std::move(broker_side)); } +disconnect_props dprops_with_reason_string(const std::string& reason_string) { + disconnect_props dprops; + dprops[prop::reason_string] = reason_string; + return dprops; +} + +BOOST_FIXTURE_TEST_CASE(receive_malformed_publish, shared_test_data) { + // packets + auto malformed_publish = encoders::encode_publish( + 1, "malformed topic", "malformed payload", + static_cast(0b11), retain_e::yes, dup_e::no, {} + ); + + auto disconnect = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), + dprops_with_reason_string("Malformed PUBLISH received: QoS bits set to 0b11") + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(malformed_publish, after(10ms)) + .expect(disconnect) + .complete_with(success, after(1ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(publish_qos0, after(50ms)); + + run_test(std::move(broker_side)); +} + +BOOST_FIXTURE_TEST_CASE(receive_malformed_pubrel, shared_test_data) { + // packets + auto malformed_pubrel = encoders::encode_pubrel(1, uint8_t(0x04), {}); + + auto disconnect = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), + dprops_with_reason_string("Malformed PUBREL received: invalid Reason Code") + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(publish_qos2, after(10ms)) + .expect(pubrec) + .complete_with(success, after(1ms)) + .reply_with(malformed_pubrel, after(2ms)) + .expect(disconnect) + .complete_with(success, after(1ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(pubrel, after(100ms)) + .expect(pubcomp) + .complete_with(success, after(1ms)); + + run_test(std::move(broker_side)); +} + +BOOST_FIXTURE_TEST_CASE(fail_to_send_puback, shared_test_data) { + // packets + auto publish_qos1_dup = encoders::encode_publish( + 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::yes, {} + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(publish_qos1, after(3ms)) + .expect(puback) + .complete_with(fail, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .send(publish_qos1_dup, after(100ms)) + .expect(puback) + .complete_with(success, after(1ms)); + + run_test(std::move(broker_side)); +} + BOOST_FIXTURE_TEST_CASE(fail_to_send_pubrec, shared_test_data) { + // packets + auto publish_qos2_dup = encoders::encode_publish( + 1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::yes, {} + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(publish_qos2, after(3ms)) + .expect(pubrec) + .complete_with(fail, after(1ms)) + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .send(publish_qos2_dup, after(100ms)) + .expect(pubrec) + .complete_with(success, after(1ms)) + .reply_with(pubrel, after(2ms)) + .expect(pubcomp) + .complete_with(success, after(1ms)); + + run_test(std::move(broker_side)); +} + +BOOST_FIXTURE_TEST_CASE(broker_fails_to_receive_pubrec, shared_test_data) { + // packets auto publish_dup = encoders::encode_publish( 1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::yes, {} ); diff --git a/test/integration/resending.cpp b/test/integration/resending.cpp deleted file mode 100644 index 97271be..0000000 --- a/test/integration/resending.cpp +++ /dev/null @@ -1,312 +0,0 @@ -#include - -#include - -#include "test_common/message_exchange.hpp" -#include "test_common/test_service.hpp" -#include "test_common/test_stream.hpp" - -using namespace async_mqtt5; - -BOOST_AUTO_TEST_SUITE(resending/* , *boost::unit_test::disabled()*/) - -BOOST_AUTO_TEST_CASE(resend_multiple_publishes) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 2; - int handlers_called = 0; - - // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack( - false, reason_codes::success.value(), {} - ); - auto publish_1 = encoders::encode_publish( - 1, "t", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto puback_1 = encoders::encode_puback( - 1, reason_codes::success.value(), {} - ); - auto publish_2 = encoders::encode_publish( - 2, "t", "p_2", qos_e::at_least_once, retain_e::no, dup_e::no, {} - ); - auto puback_2 = encoders::encode_puback( - 2, reason_codes::success.value(), {} - ); - - test::msg_exchange broker_side; - error_code success {}; - error_code fail = asio::error::not_connected; - - broker_side - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(10ms)) - .expect(publish_1, publish_2) - .complete_with(fail, after(10ms)) - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(10ms)) - .expect(publish_1, publish_2) - .complete_with(success, after(10ms)) - .reply_with(puback_1, puback_2, after(20ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_publish( - "t", "p_1", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - ++handlers_called; - - if (handlers_called == expected_handlers_called) - c.cancel(); - } - ); - - c.async_publish( - "t", "p_2", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - ++handlers_called; - - if (handlers_called == expected_handlers_called) - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(6)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_AUTO_TEST_CASE(resend_pubrel) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack( - false, reason_codes::success.value(), {} - ); - auto publish = encoders::encode_publish( - 1, "t_1", "p_1", qos_e::exactly_once, retain_e::no, dup_e::no, {} - ); - auto pubrec = encoders::encode_pubrec( - 1, reason_codes::success.value(), {} - ); - auto pubrel = encoders::encode_pubrel( - 1, reason_codes::success.value(), {} - ); - auto pubcomp = encoders::encode_pubcomp( - 1, reason_codes::success.value(), {} - ); - - test::msg_exchange broker_side; - error_code success {}; - error_code fail = asio::error::not_connected; - - broker_side - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(20ms)) - .expect(publish) - .complete_with(success, after(10ms)) - .reply_with(pubrec, after(25ms)) - .expect(pubrel) - .complete_with(success, after(10ms)) - .reply_with(fail, after(10ms)) - .expect(connect) - .complete_with(success, after(10ms)) - .reply_with(connack, after(20ms)) - .expect(pubrel) - .complete_with(success, after(10ms)) - .reply_with(pubcomp, after(20ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_publish( - "t_1", "p_1", retain_e::no, publish_props{}, - [&](error_code ec, reason_code rc, auto) { - BOOST_CHECK_MESSAGE(!ec, ec.message()); - BOOST_CHECK_MESSAGE(!rc, rc.message()); - ++handlers_called; - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(6)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_AUTO_TEST_CASE(resend_subscribe) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - // data - std::vector topics = { - subscribe_topic { "topic", subscribe_options {} } - }; - subscribe_props subscribe_props; - - std::vector rcs = { reason_codes::granted_qos_0.value() }; - suback_props suback_props; - - // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); - - auto subscribe = encoders::encode_subscribe(1, topics, subscribe_props); - auto suback = encoders::encode_suback(1, rcs, suback_props); - - test::msg_exchange broker_side; - error_code success {}; - error_code fail = asio::error::not_connected; - - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(subscribe) - .complete_with(fail, after(0ms)) - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(subscribe) - .complete_with(success, after(0ms)) - .reply_with(suback, after(0ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_subscribe( - topics, subscribe_props, - [&](error_code ec, std::vector rcs, auto) { - handlers_called++; - - BOOST_CHECK(!ec); - BOOST_ASSERT(rcs.size() == 1); - BOOST_CHECK_EQUAL(rcs[0], reason_codes::granted_qos_0); - - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(10)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_AUTO_TEST_CASE(resend_unsubscribe) { - using test::after; - using std::chrono_literals::operator ""ms; - - constexpr int expected_handlers_called = 1; - int handlers_called = 0; - - // data - std::vector topics = { "topic" }; - unsubscribe_props unsubscribe_props; - - std::vector rcs = { reason_codes::success.value() }; - unsuback_props unsuback_props; - - // packets - auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt - ); - auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); - - auto unsubscribe = encoders::encode_unsubscribe(1, topics, unsubscribe_props); - auto unsuback = encoders::encode_unsuback(1, rcs, unsuback_props); - - test::msg_exchange broker_side; - error_code success {}; - error_code fail = asio::error::not_connected; - - broker_side - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(unsubscribe) - .complete_with(fail, after(0ms)) - .expect(connect) - .complete_with(success, after(0ms)) - .reply_with(connack, after(0ms)) - .expect(unsubscribe) - .complete_with(success, after(0ms)) - .reply_with(unsuback, after(0ms)); - - asio::io_context ioc; - auto executor = ioc.get_executor(); - auto& broker = asio::make_service( - ioc, executor, std::move(broker_side) - ); - - using client_type = mqtt_client; - client_type c(executor, ""); - c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .async_run(asio::detached); - - c.async_unsubscribe( - topics, unsubscribe_props, - [&](error_code ec, std::vector rcs, auto) { - handlers_called++; - - BOOST_CHECK(!ec); - BOOST_ASSERT(rcs.size() == 1); - BOOST_CHECK_EQUAL(rcs[0], reason_codes::success); - - c.cancel(); - } - ); - - ioc.run_for(std::chrono::seconds(10)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); - BOOST_CHECK(broker.received_all_expected()); -} - -BOOST_AUTO_TEST_SUITE_END(); diff --git a/test/integration/send_publish.cpp b/test/integration/send_publish.cpp new file mode 100644 index 0000000..7147fe0 --- /dev/null +++ b/test/integration/send_publish.cpp @@ -0,0 +1,394 @@ +#include + +#include + +#include + +#include "test_common/message_exchange.hpp" +#include "test_common/test_service.hpp" +#include "test_common/test_stream.hpp" + +using namespace async_mqtt5; + +BOOST_AUTO_TEST_SUITE(send_publish/*, *boost::unit_test::disabled()*/) + +struct shared_test_data { + error_code success {}; + error_code fail = asio::error::not_connected; + + const std::string connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + const std::string connack = encoders::encode_connack( + false, reason_codes::success.value(), {} + ); + + const std::string topic = "topic"; + const std::string payload = "payload"; + + const std::string publish_qos0 = encoders::encode_publish( + 0, topic, payload, qos_e::at_most_once, retain_e::no, dup_e::no, {} + ); + const std::string publish_qos1 = encoders::encode_publish( + 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + const std::string publish_qos2 = encoders::encode_publish( + 1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::no, {} + ); + + const std::string puback = encoders::encode_puback(1, uint8_t(0x00), {}); + + const std::string pubrec = encoders::encode_pubrec(1, uint8_t(0x00), {}); + const std::string pubrel = encoders::encode_pubrel(1, uint8_t(0x00), {}); + const std::string pubcomp = encoders::encode_pubcomp(1, uint8_t(0x00), {}); +}; + +using test::after; +using namespace std::chrono; + +template +void run_test( + test::msg_exchange broker_side, + asio::any_completion_handler> op_handler +) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff + .async_run(asio::detached); + + auto shared_data = shared_test_data(); + if constexpr (qos == qos_e::at_most_once) + c.async_publish( + shared_data.topic, shared_data.payload, retain_e::no, publish_props {}, + [&handlers_called, &c, h = std::move(op_handler)](error_code ec) mutable { + ++handlers_called; + std::move(h)(ec); + c.cancel(); + } + ); + else + c.async_publish( + shared_data.topic, shared_data.payload, retain_e::no, publish_props {}, + [&handlers_called, &c, h = std::move(op_handler)] + (error_code ec, reason_code rc, detail::on_publish_props_type props) mutable { + ++handlers_called; + std::move(h)(ec, rc, props); + c.cancel(); + } + ); + + ioc.run_for(2s); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + +BOOST_FIXTURE_TEST_CASE(send_publish_qos0, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos0) + .complete_with(success, after(1ms)); + + run_test( + std::move(broker_side), + [](error_code ec) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(send_publish_qos1, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1) + .complete_with(success, after(1ms)) + .reply_with(puback, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, puback_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(send_publish_qos2, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos2) + .complete_with(success, after(1ms)) + .reply_with(pubrec, after(2ms)) + .expect(pubrel) + .complete_with(success, after(1ms)) + .reply_with(pubcomp, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, pubcomp_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(fail_to_send_publish, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1) + .complete_with(fail, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1) + .complete_with(success, after(1ms)) + .reply_with(puback, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, puback_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(fail_to_send_pubrel, shared_test_data) { + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos2) + .complete_with(success, after(1ms)) + .reply_with(pubrec, after(2ms)) + .expect(pubrel) + .complete_with(fail, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(pubrel) + .complete_with(success, after(1ms)) + .reply_with(pubcomp, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, pubcomp_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + } + ); +} + +disconnect_props dprops_with_reason_string(const std::string& reason_string) { + disconnect_props dprops; + dprops[prop::reason_string] = reason_string; + return dprops; +} + +BOOST_FIXTURE_TEST_CASE(receive_malformed_puback, shared_test_data) { + // packets + auto publish_qos1_dup = encoders::encode_publish( + 1, topic, payload, qos_e::at_least_once, retain_e::no, dup_e::yes, {} + ); + auto malformed_puback = encoders::encode_puback(1, uint8_t(0x04), {}); + + auto disconnect = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), + dprops_with_reason_string("Malformed PUBACK: invalid Reason Code") + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1) + .complete_with(success, after(1ms)) + .reply_with(malformed_puback, after(2ms)) + .expect(disconnect) + .complete_with(success, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos1_dup) + .complete_with(success, after(1ms)) + .reply_with(puback, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, puback_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(receive_malformed_pubrec, shared_test_data) { + // packets + auto publish_qos2_dup = encoders::encode_publish( + 1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::yes, {} + ); + + auto malformed_pubrec = encoders::encode_pubrec(1, uint8_t(0x04), {}); + + auto disconnect = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), + dprops_with_reason_string("Malformed PUBREC: invalid Reason Code") + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos2) + .complete_with(success, after(1ms)) + .reply_with(malformed_pubrec, after(2ms)) + .expect(disconnect) + .complete_with(success, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos2_dup) + .complete_with(success, after(1ms)) + .reply_with(pubrec, after(2ms)) + .expect(pubrel) + .complete_with(success, after(1ms)) + .reply_with(pubcomp, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, pubcomp_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(receive_malformed_pubcomp, shared_test_data) { + // packets + auto malformed_pubcomp = encoders::encode_pubcomp(1, uint8_t(0x04), {}); + + auto disconnect = encoders::encode_disconnect( + reason_codes::malformed_packet.value(), + dprops_with_reason_string("Malformed PUBCOMP: invalid Reason Code") + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish_qos2) + .complete_with(success, after(1ms)) + .reply_with(pubrec, after(2ms)) + .expect(pubrel) + .complete_with(success, after(1ms)) + .reply_with(malformed_pubcomp, after(2ms)) + .expect(disconnect) + .complete_with(success, after(1ms)) + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(pubrel) + .complete_with(success, after(1ms)) + .reply_with(pubcomp, after(2ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, pubcomp_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::success); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(receive_pubrec_with_rc, shared_test_data) { + // packets + auto pubrec_with_rc = encoders::encode_pubrec( + 1, reason_codes::unspecified_error.value(), {} + ); + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(publish_qos2) + .complete_with(success, after(0ms)) + .reply_with(pubrec_with_rc, after(0ms)); + + run_test( + std::move(broker_side), + [](error_code ec, reason_code rc, pubcomp_props) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL(rc, reason_codes::unspecified_error); + } + ); +} + +BOOST_FIXTURE_TEST_CASE(cancel_resending_publish, shared_test_data) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff + .async_run(asio::detached); + + asio::cancellation_signal cancel_signal; + c.async_publish( + topic, payload, retain_e::no, publish_props{}, + asio::bind_cancellation_slot( + cancel_signal.slot(), + [&handlers_called, &c](error_code ec, reason_code rc, puback_props) { + ++handlers_called; + + BOOST_CHECK(ec = asio::error::operation_aborted); + BOOST_CHECK_EQUAL(rc, reason_codes::empty); + + c.cancel(); + } + ) + ); + cancel_signal.emit(asio::cancellation_type::total); + + ioc.run_for(1s); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + +BOOST_AUTO_TEST_SUITE_END();