From 4bf59cc18c4579474a3d0062c177121a21fa8394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Thu, 25 Jan 2024 13:43:59 +0100 Subject: [PATCH] Better test packet logging & validation Summary: related to T12015, T11798 - improved packet logging in test broker for easier debugging - test broker will fail the test if a packet is sent after all the messages were exchanged - fixed a bug in assemble_op where read buffer was not properly cleared if an error occurred (for example, malformed packet cases) Reviewers: ivica Reviewed By: ivica Subscribers: iljazovic, miljen Differential Revision: https://repo.mireo.local/D27561 --- include/async_mqtt5/impl/assemble_op.hpp | 6 +- test/include/test_common/message_exchange.hpp | 4 + test/include/test_common/packet_util.hpp | 322 +++++++++++++++--- test/include/test_common/test_broker.hpp | 33 +- test/integration/cancellation.cpp | 6 +- test/integration/read_message.cpp | 3 +- test/src/run_tests.cpp | 2 +- 7 files changed, 307 insertions(+), 69 deletions(-) diff --git a/include/async_mqtt5/impl/assemble_op.hpp b/include/async_mqtt5/impl/assemble_op.hpp index d5ca02a..c94cdf8 100644 --- a/include/async_mqtt5/impl/assemble_op.hpp +++ b/include/async_mqtt5/impl/assemble_op.hpp @@ -197,8 +197,8 @@ private: return perform(wait_for, asio::transfer_at_least(0)); bool is_reply = code != control_code_e::publish && - code != control_code_e::auth && - code != control_code_e::disconnect; + code != control_code_e::auth && + code != control_code_e::disconnect; if (is_reply) { auto packet_id = decoders::decode_packet_id(first).value(); @@ -213,6 +213,8 @@ private: error_code ec, uint8_t control_code, byte_citer first, byte_citer last ) { + if (ec) + _data_span = { _read_buff.cend(), _read_buff.cend() }; std::move(_handler)(ec, control_code, first, last); } }; diff --git a/test/include/test_common/message_exchange.hpp b/test/include/test_common/message_exchange.hpp index aa5999a..4e6637c 100644 --- a/test/include/test_common/message_exchange.hpp +++ b/test/include/test_common/message_exchange.hpp @@ -216,6 +216,10 @@ public: return ret; } + bool has_remaining_messages() const { + return !_to_broker.empty() || !_from_broker.empty(); + } + private: template diff --git a/test/include/test_common/packet_util.hpp b/test/include/test_common/packet_util.hpp index e02a5ec..e51c7d4 100644 --- a/test/include/test_common/packet_util.hpp +++ b/test/include/test_common/packet_util.hpp @@ -4,6 +4,10 @@ #include #include +#include + +#include + #include #include @@ -11,7 +15,14 @@ namespace async_mqtt5::test { -using error_code = boost::system::error_code; +template +std::string concat_strings(Strings&&... strings) { + std::ostringstream stream; + (stream << ... << std::forward(strings)); + return stream.str(); +} + +namespace detail { inline qos_e extract_qos(uint8_t flags) { auto byte = (flags & 0b0110) >> 1; @@ -68,23 +79,233 @@ inline std::string to_readable_props(Props props) { if constexpr (is_optional) if (v.has_value()) stream << *v << " "; + if constexpr (is_vector) + stream << boost::algorithm::join(v, ","); return true; }); return stream.str(); } - + +using byte_citer = std::string::const_iterator; + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + auto connect = decoders::decode_connect(remain_length, it); + if (!connect.has_value()) + return "Cannot decode Connect packet!"; + auto& [cli_id, uname, pwd, keep_alive, clean_start, props, will] = *connect; + + return concat_strings( + code_to_str(code), + " uname: ", uname.value_or(""), " pwd: ", pwd.value_or(""), + " keep_alive: ", keep_alive, " clean_start: ", clean_start, + " props: ", to_readable_props(props) + ); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + auto connack = decoders::decode_connack(remain_length, it); + if (!connack.has_value()) + return "Cannot decode Connack packet!"; + auto& [session_present, reason_code, props] = *connack; + + return concat_strings( + code_to_str(code), + " session_present: ", session_present, " reason_code: ", reason_code, + " props: ", to_readable_props(props) + ); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + auto disconnect = decoders::decode_disconnect(remain_length, it); + if (!disconnect.has_value()) + return "Cannot decode Disconnect packet!"; + auto& [reason_code, props] = *disconnect; + + return concat_strings( + code_to_str(code), + " reason_code: ", std::to_string(uint8_t(reason_code)), + " props: ", to_readable_props(props) + ); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string( + uint8_t control_byte, uint32_t remain_length, byte_citer& it +) { + auto publish = decoders::decode_publish(control_byte, remain_length, it); + if (!publish.has_value()) + return "Cannot decode Publish packet!"; + auto& [topic, packet_id, flags, props, payload] = *publish; + + return concat_strings( + code_to_str(code), (packet_id ? " " + std::to_string(*packet_id) : ""), + " flags: ", std::bitset<8>(flags), + " topic: ", topic, " payload: ", payload, + " props: ", to_readable_props(props) + ); +} + +template < + control_code_e code, + std::enable_if_t< + code == control_code_e::puback || code == control_code_e::pubrec || + code == control_code_e::pubrel || code == control_code_e::pubcomp, + bool> = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + const auto packet_id = decoders::decode_packet_id(it).value_or(0); + remain_length -= sizeof(uint16_t); + uint8_t reason_code = remain_length == 0 ? 0 : uint8_t(*it); + return concat_strings( + code_to_str(code), + " packet_id: ", packet_id, " reason_code: ", std::to_string(reason_code) + ); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + auto auth = decoders::decode_auth(remain_length, it); + if (!auth.has_value()) + return "Cannot decode Auth packet!"; + auto& [reason_code, props] = *auth; + + return concat_strings( + code_to_str(code), + " reason_code: ", std::to_string(uint8_t(reason_code)), + " props: ", to_readable_props(props) + ); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + const auto packet_id = decoders::decode_packet_id(it).value_or(0); + remain_length -= sizeof(uint16_t); + auto subscribe = decoders::decode_subscribe(remain_length, it); + if (!subscribe.has_value()) + return "Cannot decode Subscribe packet!"; + auto& [props, topics] = *subscribe; + + std::vector topics_str; + topics_str.resize(topics.size()); + boost::transform( + boost::make_iterator_range(topics.cbegin(), topics.cend()), + topics_str.begin(), + [](const auto& tuple) { + auto& [topic, options] = tuple; + return concat_strings(topic, " ", std::bitset<8>(options)); + } + ); + return concat_strings( + code_to_str(code), + " packet_id: ", packet_id, + " topics: ", boost::algorithm::join(topics_str, ","), + " props: ", to_readable_props(props) + ); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + const auto packet_id = decoders::decode_packet_id(it).value_or(0); + remain_length -= sizeof(uint16_t); + auto unsubscribe = decoders::decode_unsubscribe(remain_length, it); + if (!unsubscribe.has_value()) + return "Cannot decode Unsubscribe packet!"; + auto& [props, topics] = *unsubscribe; + + return concat_strings( + code_to_str(code), + " packet_id: ", packet_id, + " topics: ", boost::algorithm::join(topics, ","), + " props: ", to_readable_props(props) + ); +} + +inline std::string reason_codes_to_string(const std::vector& rcs) { + std::vector rcs_str; + rcs_str.resize(rcs.size()); + boost::transform( + boost::make_iterator_range(rcs.cbegin(), rcs.cend()), + rcs_str.begin(), + [](const auto& rc) { return std::to_string(rc); } + ); + return boost::algorithm::join(rcs_str, ","); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + const auto packet_id = decoders::decode_packet_id(it).value_or(0); + remain_length -= sizeof(uint16_t); + auto suback = decoders::decode_suback(remain_length, it); + if (!suback.has_value()) + return "Cannot decode Suback packet!"; + auto& [props, reason_codes] = *suback; + + return concat_strings( + code_to_str(code), + " packet_id: ", packet_id, + " reason_codes: ", reason_codes_to_string(reason_codes), + " props: ", to_readable_props(props) + ); +} + +template < + control_code_e code, + std::enable_if_t = true +> +inline std::string to_string(uint32_t remain_length, byte_citer& it) { + const auto packet_id = decoders::decode_packet_id(it).value_or(0); + remain_length -= sizeof(uint16_t); + auto unsuback = decoders::decode_unsuback(remain_length, it); + if (!unsuback.has_value()) + return "Cannot decode Unuback packet!"; + auto& [props, reason_codes] = *unsuback; + + return concat_strings( + code_to_str(code), + " packet_id: ", packet_id, + " reason_codes: ", reason_codes_to_string(reason_codes), + " props: ", to_readable_props(props) + ); +} + +} // end namespace detail + inline std::string to_readable_packet(std::string packet) { auto control_byte = uint8_t(*packet.data()); - auto code = extract_code(control_byte); + auto code = detail::extract_code(control_byte); - if (code == control_code_e::no_packet) - return {}; - - std::ostringstream stream; - - if (code == control_code_e::connack || code == control_code_e::auth) { - stream << code_to_str(code); - return stream.str(); + if ( + code == control_code_e::pingreq || + code == control_code_e::pingresp + ) { + return concat_strings(detail::code_to_str(code)); } auto begin = ++packet.cbegin(); @@ -92,55 +313,50 @@ inline std::string to_readable_packet(std::string packet) { begin, packet.cend(), decoders::basic::varint_ ); - if (code == control_code_e::connect) { - auto connect = decoders::decode_connect(*varlen, begin); - auto& [cli_id, uname, pwd, keep_alive, clean_start, props, will] = *connect; - stream << code_to_str(code); - stream << " props: " << to_readable_props(props); - return stream.str(); + switch (code) { + case control_code_e::connect: + return detail::to_string(*varlen, begin); + case control_code_e::connack: + return detail::to_string(*varlen, begin); + case control_code_e::disconnect: + return detail::to_string(*varlen, begin); + case control_code_e::publish: + return detail::to_string( + control_byte, *varlen, begin + ); + case control_code_e::puback: + return detail::to_string(*varlen, begin); + case control_code_e::pubrec: + return detail::to_string(*varlen, begin); + case control_code_e::pubrel: + return detail::to_string(*varlen, begin); + case control_code_e::pubcomp: + return detail::to_string(*varlen, begin); + case control_code_e::auth: + return detail::to_string(*varlen, begin); + case control_code_e::subscribe: + return detail::to_string(*varlen, begin); + case control_code_e::suback: + return detail::to_string(*varlen, begin); + case control_code_e::unsubscribe: + return detail::to_string(*varlen, begin); + case control_code_e::unsuback: + return detail::to_string(*varlen, begin); + default: + assert(false); } - if (code == control_code_e::disconnect) { - auto disconnect = decoders::decode_disconnect(*varlen, begin); - auto& [rc, props] = *disconnect; - stream << code_to_str(code); - stream << " rc: " << int(rc); - stream << " reason string: " << props[prop::reason_string].value_or(""); - return stream.str(); - } - - if (code == control_code_e::publish) { - auto publish = decoders::decode_publish( - control_byte, *varlen, begin - ); - auto& [topic, packet_id, flags, props, payload] = *publish; - stream << code_to_str(code); - stream << (packet_id ? " " + std::to_string(*packet_id) : ""); - stream << " flags: " << std::bitset<8>(flags); - stream << " topic: " << topic; - stream << " payload: " << payload; - stream << " props: " << to_readable_props(props); - return stream.str(); - } - - const auto packet_id = decoders::decode_packet_id(begin).value(); - stream << code_to_str(code) << " " << packet_id; - return stream.str(); + return {}; } template -std::vector to_packets(const ConstBufferSequence& buffers) { +std::vector to_readable_packets(const ConstBufferSequence& buffers) { std::vector content; - for (const auto& buff : buffers) { - auto control_byte = *(const uint8_t*) buff.data(); - auto code = extract_code(control_byte); - - if (code == control_code_e::pingreq) - continue; - - content.push_back({ (const char*)buff.data(), buff.size() }); - } + for (const auto& buff : buffers) + content.push_back( + to_readable_packet(std::string { (const char*) buff.data(), buff.size() }) + ); return content; } diff --git a/test/include/test_common/test_broker.hpp b/test/include/test_common/test_broker.hpp index fde47f5..77c874c 100644 --- a/test/include/test_common/test_broker.hpp +++ b/test/include/test_common/test_broker.hpp @@ -6,6 +6,8 @@ #include #include +#include + #include #include #include @@ -117,7 +119,7 @@ public: } bool received_all_expected() { - return !_broker_side.pop_reply_action().has_value(); + return !_broker_side.has_remaining_messages(); } template @@ -150,16 +152,27 @@ public: for (size_t i = 0; i < num_packets; ++i, ++it) { BOOST_CHECK_EQUAL(it->size(), expected[i].size()); size_t len = std::min(it->size(), expected[i].size()); - if (memcmp(it->data(), expected[i].data(), len)) { - std::ostringstream stream; - stream << "Packet mismatch! Expected: " - << to_readable_packet(expected[i]) - << " Received: " - << to_readable_packet(std::string((const char*)it->data(), it->size())); - BOOST_CHECK_MESSAGE(false, stream.str()); - } + if (memcmp(it->data(), expected[i].data(), len)) + BOOST_CHECK_MESSAGE( + false, + concat_strings( + "Packet mismatch!\nExpected: ", + to_readable_packet(expected[i]), + "\nReceived: ", + to_readable_packet(std::string((const char*)it->data(), it->size())) + ) + ); + else + log(to_readable_packet(expected[i])); } - } + } else + BOOST_CHECK_MESSAGE( + false, + concat_strings( + "Broker side did not expect: ", + boost::algorithm::join(to_readable_packets(buffers), ",") + ) + ); auto complete_op = reply_action ? reply_action->write_completion(ex) : diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index a6dc7d3..dce5c11 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -273,6 +273,9 @@ using namespace std::chrono; constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable); BOOST_FIXTURE_TEST_CASE(rerunning_the_client, shared_test_data) { + // packets + auto disconnect = encoders::encode_disconnect(uint8_t(0x00), {}); + test::msg_exchange broker_side; broker_side .expect(connect) @@ -286,7 +289,8 @@ BOOST_FIXTURE_TEST_CASE(rerunning_the_client, shared_test_data) { .reply_with(connack, after(2ms)) .expect(publish_qos1) .complete_with(success, after(1ms)) - .reply_with(puback, after(2ms)); + .reply_with(puback, after(2ms)) + .expect(disconnect); asio::io_context ioc; auto executor = ioc.get_executor(); diff --git a/test/integration/read_message.cpp b/test/integration/read_message.cpp index 03139a8..b87d636 100644 --- a/test/integration/read_message.cpp +++ b/test/integration/read_message.cpp @@ -30,9 +30,8 @@ void test_receive_malformed_packet( reason_codes::malformed_packet.value(), dc_props ); - test::msg_exchange broker_side; error_code success {}; - + test::msg_exchange broker_side; broker_side .expect(connect) .complete_with(success, after(0ms)) diff --git a/test/src/run_tests.cpp b/test/src/run_tests.cpp index 89e56b6..6d249c1 100644 --- a/test/src/run_tests.cpp +++ b/test/src/run_tests.cpp @@ -5,7 +5,7 @@ boost::unit_test::test_suite* init_tests( int /*argc*/, char* /*argv*/[] ) { - async_mqtt5::test::logging_enabled() = true; + async_mqtt5::test::logging_enabled() = false; return nullptr; }