forked from boostorg/mqtt5
Disconnect_op tests, malformed & resending tests in publish and subscribe op
Summary: related to T12015 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27323
This commit is contained in:
@ -58,12 +58,9 @@ inline std::string_view code_to_str(control_code_e code) {
|
||||
case control_code_e::pingresp: return "PINGRESP";
|
||||
default: return "NO PACKET";
|
||||
}
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
inline std::string to_readable_packet(
|
||||
std::string packet, error_code ec = {}, bool incoming = false
|
||||
) {
|
||||
inline std::string to_readable_packet(std::string packet) {
|
||||
auto control_byte = uint8_t(*packet.data());
|
||||
auto code = extract_code(control_byte);
|
||||
|
||||
@ -72,14 +69,11 @@ inline std::string to_readable_packet(
|
||||
|
||||
std::ostringstream stream;
|
||||
|
||||
if (incoming)
|
||||
stream << "-> ";
|
||||
|
||||
if (
|
||||
code == control_code_e::connect || code == control_code_e::connack ||
|
||||
code == control_code_e::disconnect
|
||||
) {
|
||||
stream << code_to_str(code) << (ec ? " ec: " + ec.message() : "");
|
||||
stream << code_to_str(code);
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
@ -95,6 +89,7 @@ inline std::string to_readable_packet(
|
||||
auto& [topic, packet_id, flags, props, payload] = *publish;
|
||||
stream << code_to_str(code);
|
||||
stream << (packet_id ? " " + std::to_string(*packet_id) : "");
|
||||
stream << "flags: " << flags;
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
|
@ -150,7 +150,14 @@ public:
|
||||
for (size_t i = 0; i < num_packets; ++i, ++it) {
|
||||
BOOST_CHECK_EQUAL(it->size(), expected[i].size());
|
||||
size_t len = std::min(it->size(), expected[i].size());
|
||||
BOOST_CHECK(!memcmp(it->data(), expected[i].data(), len));
|
||||
if (memcmp(it->data(), expected[i].data(), len)) {
|
||||
std::ostringstream stream;
|
||||
stream << "Packet mismatch! Expected: "
|
||||
<< to_readable_packet(expected[i])
|
||||
<< " Received: "
|
||||
<< to_readable_packet(std::string((const char*)it->data(), len));
|
||||
log(stream.str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,9 +18,9 @@ template <
|
||||
typename StreamType,
|
||||
typename TlsContext = std::monostate
|
||||
>
|
||||
class test_service : public detail::client_service<StreamType, TlsContext> {
|
||||
class test_service : public async_mqtt5::detail::client_service<StreamType, TlsContext> {
|
||||
using error_code = boost::system::error_code;
|
||||
using base = detail::client_service<StreamType, TlsContext>;
|
||||
using base = async_mqtt5::detail::client_service<StreamType, TlsContext>;
|
||||
|
||||
asio::any_io_executor _ex;
|
||||
connack_props _test_props;
|
||||
|
127
test/unit/test/disconnect_op.cpp
Normal file
127
test/unit/test/disconnect_op.cpp
Normal file
@ -0,0 +1,127 @@
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <boost/asio/bind_cancellation_slot.hpp>
|
||||
#include <boost/asio/cancellation_signal.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
|
||||
#include <async_mqtt5/mqtt_client.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/disconnect_op.hpp>
|
||||
|
||||
#include <async_mqtt5/detail/internal_types.hpp>
|
||||
|
||||
#include "test_common/message_exchange.hpp"
|
||||
#include "test_common/test_service.hpp"
|
||||
#include "test_common/test_stream.hpp"
|
||||
|
||||
using namespace async_mqtt5;
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(disconnect_op/*, *boost::unit_test::disabled()*/)
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_malformed_packet) {
|
||||
std::string malformed_str = std::string{ 0x01 };
|
||||
|
||||
disconnect_props invalid_user_props;
|
||||
invalid_user_props[prop::user_property].push_back(malformed_str);
|
||||
|
||||
disconnect_props invalid_reason_string;
|
||||
invalid_reason_string[prop::reason_string] = malformed_str;
|
||||
|
||||
std::vector<disconnect_props> testing_props = {
|
||||
invalid_user_props, invalid_reason_string
|
||||
};
|
||||
|
||||
int expected_handlers_called = static_cast<int>(testing_props.size());
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = test::test_service<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
|
||||
for (const auto& props : testing_props) {
|
||||
auto handler = [&handlers_called](error_code ec) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::malformed_packet);
|
||||
};
|
||||
|
||||
detail::disconnect_ctx ctx;
|
||||
ctx.props = props;
|
||||
|
||||
detail::disconnect_op<
|
||||
client_service_type, detail::disconnect_ctx
|
||||
> { svc_ptr, std::move(ctx), std::move(handler) }
|
||||
.perform();
|
||||
}
|
||||
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_omitting_props) {
|
||||
using test::after;
|
||||
using std::chrono_literals::operator ""ms;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
connack_props co_props;
|
||||
co_props[prop::maximum_packet_size] = 20;
|
||||
|
||||
// packets
|
||||
auto connect = encoders::encode_connect(
|
||||
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
|
||||
);
|
||||
auto connack = encoders::encode_connack(
|
||||
false, reason_codes::success.value(), co_props
|
||||
);
|
||||
|
||||
disconnect_props props;
|
||||
props[prop::user_property].push_back(std::string(50, 'a'));
|
||||
auto disconnect = encoders::encode_disconnect(
|
||||
reason_codes::normal_disconnection.value(), props
|
||||
);
|
||||
auto disconnect_no_props = encoders::encode_disconnect(
|
||||
reason_codes::normal_disconnection.value(), disconnect_props{}
|
||||
);
|
||||
|
||||
test::msg_exchange broker_side;
|
||||
error_code success {};
|
||||
|
||||
broker_side
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(disconnect_no_props)
|
||||
.complete_with(success, after(0ms));
|
||||
|
||||
asio::io_context ioc;
|
||||
auto executor = ioc.get_executor();
|
||||
auto& broker = asio::make_service<test::test_broker>(
|
||||
ioc, executor, std::move(broker_side)
|
||||
);
|
||||
|
||||
using client_type = mqtt_client<test::test_stream>;
|
||||
client_type c(executor, "");
|
||||
c.brokers("127.0.0.1")
|
||||
.run();
|
||||
|
||||
asio::steady_timer timer(c.get_executor());
|
||||
timer.expires_after(std::chrono::milliseconds(200));
|
||||
timer.async_wait([&](auto) {
|
||||
c.async_disconnect(
|
||||
disconnect_rc_e::normal_disconnection, props,
|
||||
[&](error_code ec) {
|
||||
handlers_called++;
|
||||
BOOST_CHECK(!ec);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
ioc.run_for(std::chrono::seconds(5));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
BOOST_CHECK(broker.received_all_expected());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
@ -5,12 +5,14 @@
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
|
||||
#include <async_mqtt5/error.hpp>
|
||||
#include <async_mqtt5/mqtt_client.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/client_service.hpp>
|
||||
#include <async_mqtt5/impl/publish_send_op.hpp>
|
||||
|
||||
#include "test_common/message_exchange.hpp"
|
||||
#include "test_common/test_service.hpp"
|
||||
#include "test_common/test_stream.hpp"
|
||||
|
||||
using namespace async_mqtt5;
|
||||
|
||||
@ -52,7 +54,7 @@ BOOST_AUTO_TEST_CASE(test_pid_overrun) {
|
||||
"test", "payload", retain_e::no, {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
@ -80,13 +82,17 @@ BOOST_AUTO_TEST_CASE(test_invalid_topic_names) {
|
||||
.perform(topic, "payload", retain_e::no, {});
|
||||
}
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_malformed_packet) {
|
||||
uint16_t topic_alias_max = 10;
|
||||
std::string malformed_str = std::string { 0x01 };
|
||||
|
||||
connack_props cprops;
|
||||
cprops[prop::topic_alias_maximum] = topic_alias_max;
|
||||
|
||||
publish_props malfored_response_topic_props;
|
||||
malfored_response_topic_props[prop::response_topic] = "response#topic";
|
||||
|
||||
@ -99,13 +105,17 @@ BOOST_AUTO_TEST_CASE(test_malformed_packet) {
|
||||
publish_props malformed_content_type_props;
|
||||
malformed_content_type_props[prop::content_type] = malformed_str;
|
||||
|
||||
publish_props zero_topic_alias_props;
|
||||
zero_topic_alias_props[prop::topic_alias] = uint16_t(0);
|
||||
|
||||
publish_props out_of_range_subid_props;
|
||||
out_of_range_subid_props[prop::subscription_identifier] = 300'000'000;
|
||||
|
||||
|
||||
std::vector<publish_props> testing_props = {
|
||||
malfored_response_topic_props, utf8_payload_props,
|
||||
invalid_user_props, malformed_content_type_props,
|
||||
out_of_range_subid_props
|
||||
zero_topic_alias_props, out_of_range_subid_props
|
||||
};
|
||||
|
||||
int expected_handlers_called = static_cast<int>(testing_props.size());
|
||||
@ -113,7 +123,8 @@ BOOST_AUTO_TEST_CASE(test_malformed_packet) {
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = test::test_service<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor(), std::move(cprops));
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::topic_alias_maximum) == topic_alias_max);
|
||||
|
||||
for (const auto& props: testing_props) {
|
||||
auto handler = [&handlers_called](error_code ec) {
|
||||
@ -127,11 +138,13 @@ BOOST_AUTO_TEST_CASE(test_malformed_packet) {
|
||||
.perform("topic", malformed_str, retain_e::no, props);
|
||||
}
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_packet_too_large) {
|
||||
int max_packet_sz = 10;
|
||||
|
||||
connack_props props;
|
||||
props[prop::maximum_packet_size] = 10;
|
||||
|
||||
@ -143,6 +156,7 @@ BOOST_AUTO_TEST_CASE(test_packet_too_large) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(
|
||||
ioc.get_executor(), std::move(props)
|
||||
);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::maximum_packet_size) == max_packet_sz);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) {
|
||||
++handlers_called;
|
||||
@ -157,13 +171,15 @@ BOOST_AUTO_TEST_CASE(test_packet_too_large) {
|
||||
"test", "payload", retain_e::no, {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_qos_not_supported) {
|
||||
uint8_t max_qos = 0;
|
||||
|
||||
connack_props props;
|
||||
props[prop::maximum_qos] = uint8_t(0);
|
||||
props[prop::maximum_qos] = max_qos;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
@ -173,6 +189,7 @@ BOOST_AUTO_TEST_CASE(test_qos_not_supported) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(
|
||||
ioc.get_executor(), std::move(props)
|
||||
);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::maximum_qos) == max_qos);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) {
|
||||
++handlers_called;
|
||||
@ -187,13 +204,15 @@ BOOST_AUTO_TEST_CASE(test_qos_not_supported) {
|
||||
"test", "payload", retain_e::no, {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_retain_not_available) {
|
||||
uint8_t retain = 0;
|
||||
|
||||
connack_props props;
|
||||
props[prop::retain_available] = uint8_t(0);
|
||||
props[prop::retain_available] = retain;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
@ -203,6 +222,7 @@ BOOST_AUTO_TEST_CASE(test_retain_not_available) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(
|
||||
ioc.get_executor(), std::move(props)
|
||||
);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::retain_available) == retain);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) {
|
||||
++handlers_called;
|
||||
@ -217,13 +237,15 @@ BOOST_AUTO_TEST_CASE(test_retain_not_available) {
|
||||
"test", "payload", retain_e::yes, {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_topic_alias_maximum) {
|
||||
uint16_t max_topic_alias = 10;
|
||||
|
||||
connack_props ta_allowed_props;
|
||||
ta_allowed_props[prop::topic_alias_maximum] = uint16_t(10);
|
||||
ta_allowed_props[prop::topic_alias_maximum] = max_topic_alias;
|
||||
|
||||
std::vector<connack_props> test_props = {
|
||||
ta_allowed_props, connack_props {} /* not allowed */
|
||||
@ -256,7 +278,7 @@ BOOST_AUTO_TEST_CASE(test_topic_alias_maximum) {
|
||||
);
|
||||
}
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
@ -292,8 +314,184 @@ BOOST_AUTO_TEST_CASE(test_publish_cancellation) {
|
||||
"test", "payload", retain_e::no, {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_malformed_puback) {
|
||||
using test::after;
|
||||
using std::chrono_literals::operator ""ms;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
// packets
|
||||
auto connect = encoders::encode_connect(
|
||||
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
|
||||
);
|
||||
auto connack = encoders::encode_connack(false, reason_codes::success.value(), {});
|
||||
|
||||
auto publish = encoders::encode_publish(
|
||||
1, "t", "p", qos_e::at_least_once, retain_e::no, dup_e::no, {}
|
||||
);
|
||||
auto malformed_puback = encoders::encode_puback(1, uint8_t(0x04), {});
|
||||
|
||||
auto publish_dup = encoders::encode_publish(
|
||||
1, "t", "p", qos_e::at_least_once, retain_e::no, dup_e::yes, {}
|
||||
);
|
||||
auto puback = encoders::encode_puback(1, reason_codes::success.value(), {});
|
||||
|
||||
disconnect_props dc_props;
|
||||
dc_props[prop::reason_string] = "Malformed PUBACK: invalid Reason Code";
|
||||
auto disconnect = encoders::encode_disconnect(
|
||||
reason_codes::malformed_packet.value(), dc_props
|
||||
);
|
||||
|
||||
test::msg_exchange broker_side;
|
||||
error_code success {};
|
||||
|
||||
broker_side
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(publish)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(malformed_puback, after(0ms))
|
||||
.expect(disconnect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(std::string {}, after(0ms))
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(publish_dup)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(puback, after(0ms));
|
||||
|
||||
asio::io_context ioc;
|
||||
auto executor = ioc.get_executor();
|
||||
auto& broker = asio::make_service<test::test_broker>(
|
||||
ioc, executor, std::move(broker_side)
|
||||
);
|
||||
|
||||
using client_type = mqtt_client<test::test_stream>;
|
||||
client_type c(executor, "");
|
||||
c.brokers("127.0.0.1")
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_least_once>(
|
||||
"t", "p", retain_e::no, publish_props {},
|
||||
[&](error_code ec, reason_code rc, auto) {
|
||||
++handlers_called;
|
||||
|
||||
BOOST_CHECK(!ec);
|
||||
BOOST_CHECK_EQUAL(rc, reason_codes::success);
|
||||
|
||||
c.cancel();
|
||||
}
|
||||
);
|
||||
|
||||
ioc.run_for(std::chrono::seconds(10));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
BOOST_CHECK(broker.received_all_expected());
|
||||
}
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_malformed_pubrec_pubcomp) {
|
||||
using test::after;
|
||||
using std::chrono_literals::operator ""ms;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
// packets
|
||||
auto connect = encoders::encode_connect(
|
||||
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
|
||||
);
|
||||
auto connack = encoders::encode_connack(false, reason_codes::success.value(), {});
|
||||
|
||||
auto publish = encoders::encode_publish(
|
||||
1, "t", "p", qos_e::exactly_once, retain_e::no, dup_e::no, {}
|
||||
);
|
||||
auto malformed_pubrec = encoders::encode_pubrec(1, uint8_t(0x04), {});
|
||||
|
||||
auto publish_dup = encoders::encode_publish(
|
||||
1, "t", "p", qos_e::exactly_once, retain_e::no, dup_e::yes, {}
|
||||
);
|
||||
auto pubrec = encoders::encode_pubrec(1, reason_codes::success.value(), {});
|
||||
|
||||
auto pubrel = encoders::encode_pubrel(1, reason_codes::success.value(), {});
|
||||
auto malformed_pubcomp = encoders::encode_pubcomp(1, uint8_t(0x04), {});
|
||||
auto pubcomp = encoders::encode_pubcomp(1, reason_codes::success.value(), {});
|
||||
|
||||
disconnect_props dc_props;
|
||||
dc_props[prop::reason_string] = "Malformed PUBREC: invalid Reason Code";
|
||||
auto disconnect_on_pubrec = encoders::encode_disconnect(
|
||||
reason_codes::malformed_packet.value(), dc_props
|
||||
);
|
||||
|
||||
dc_props[prop::reason_string] = "Malformed PUBCOMP: invalid Reason Code";
|
||||
auto disconnect_on_pubcomp = encoders::encode_disconnect(
|
||||
reason_codes::malformed_packet.value(), dc_props
|
||||
);
|
||||
|
||||
test::msg_exchange broker_side;
|
||||
error_code success {};
|
||||
|
||||
broker_side
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(publish)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(malformed_pubrec, after(0ms))
|
||||
.expect(disconnect_on_pubrec)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(std::string {}, after(0ms))
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(publish_dup)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(pubrec, after(0ms))
|
||||
.expect(pubrel)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(malformed_pubcomp, after(0ms))
|
||||
.expect(disconnect_on_pubcomp)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(std::string {}, after(0ms))
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(pubrel)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(pubcomp, after(0ms));
|
||||
|
||||
asio::io_context ioc;
|
||||
auto executor = ioc.get_executor();
|
||||
auto& broker = asio::make_service<test::test_broker>(
|
||||
ioc, executor, std::move(broker_side)
|
||||
);
|
||||
|
||||
using client_type = mqtt_client<test::test_stream>;
|
||||
client_type c(executor, "");
|
||||
c.brokers("127.0.0.1")
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::exactly_once>(
|
||||
"t", "p", retain_e::no, publish_props {},
|
||||
[&](error_code ec, reason_code rc, auto) {
|
||||
++handlers_called;
|
||||
|
||||
BOOST_CHECK(!ec);
|
||||
BOOST_CHECK_EQUAL(rc, reason_codes::success);
|
||||
|
||||
c.cancel();
|
||||
}
|
||||
);
|
||||
|
||||
ioc.run_for(std::chrono::seconds(15));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
BOOST_CHECK(broker.received_all_expected());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
@ -2,16 +2,59 @@
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
|
||||
#include <async_mqtt5/error.hpp>
|
||||
#include <async_mqtt5/mqtt_client.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/subscribe_op.hpp>
|
||||
|
||||
#include "test_common/message_exchange.hpp"
|
||||
#include "test_common/test_service.hpp"
|
||||
#include "test_common/test_stream.hpp"
|
||||
|
||||
using namespace async_mqtt5;
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(subscribe_op/*, *boost::unit_test::disabled()*/)
|
||||
|
||||
template <
|
||||
typename StreamType,
|
||||
typename TlsContext = std::monostate
|
||||
>
|
||||
class overrun_client : public detail::client_service<StreamType, TlsContext> {
|
||||
public:
|
||||
overrun_client(const asio::any_io_executor& ex, const std::string& cnf) :
|
||||
detail::client_service<StreamType, TlsContext>(ex, cnf)
|
||||
{}
|
||||
|
||||
uint16_t allocate_pid() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_pid_overrun) {
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = overrun_client<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor(), "");
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::pid_overrun);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
detail::subscribe_op<
|
||||
client_service_type, decltype(handler)
|
||||
> { svc_ptr, std::move(handler) }
|
||||
.perform(
|
||||
{ { "topic", { qos_e::exactly_once } } }, subscribe_props {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_invalid_topic_filters) {
|
||||
std::vector<std::string> invalid_topics = {
|
||||
"", "+topic", "#topic", "some/#/topic", "topic+",
|
||||
@ -25,9 +68,11 @@ BOOST_AUTO_TEST_CASE(test_invalid_topic_filters) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
|
||||
for (const auto& topic: invalid_topics) {
|
||||
auto handler = [&handlers_called](error_code ec, auto, auto) {
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::invalid_topic);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
detail::subscribe_op<
|
||||
@ -36,7 +81,42 @@ BOOST_AUTO_TEST_CASE(test_invalid_topic_filters) {
|
||||
.perform({{ topic, { qos_e::exactly_once } }}, subscribe_props {});
|
||||
}
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_malformed_packet) {
|
||||
std::vector<std::string> test_properties = {
|
||||
std::string(75000, 'a'), std::string(10, char(0x01))
|
||||
};
|
||||
|
||||
const int expected_handlers_called = static_cast<int>(test_properties.size());
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = test::test_service<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
|
||||
for (const auto& test_prop: test_properties) {
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::malformed_packet);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
subscribe_props props;
|
||||
props[prop::user_property].push_back(test_prop);
|
||||
|
||||
detail::subscribe_op<
|
||||
client_service_type, decltype(handler)
|
||||
> { svc_ptr, std::move(handler) }
|
||||
.perform({ { "topic", { qos_e::exactly_once } } }, props
|
||||
);
|
||||
}
|
||||
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
@ -58,9 +138,11 @@ BOOST_AUTO_TEST_CASE(test_wildcard_subscriptions_not_supported) {
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::wildcard_subscription_available) == 0);
|
||||
|
||||
for (const auto& topic: wildcard_topics) {
|
||||
auto handler = [&handlers_called](error_code ec, auto, auto) {
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::wildcard_subscription_not_available);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
detail::subscribe_op<
|
||||
@ -71,7 +153,7 @@ BOOST_AUTO_TEST_CASE(test_wildcard_subscriptions_not_supported) {
|
||||
);
|
||||
}
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
@ -89,9 +171,11 @@ BOOST_AUTO_TEST_CASE(test_shared_subscriptions_not_supported) {
|
||||
);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::shared_subscription_available) == 0);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, auto, auto) {
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::shared_subscription_not_available);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
detail::subscribe_op<
|
||||
@ -101,13 +185,15 @@ BOOST_AUTO_TEST_CASE(test_shared_subscriptions_not_supported) {
|
||||
{{ "$share/group/topic", { qos_e::exactly_once } }}, subscribe_props {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_large_subscription_id) {
|
||||
uint8_t sub_id_available = 1;
|
||||
|
||||
connack_props props;
|
||||
props[prop::subscription_identifier_available] = uint8_t(1);
|
||||
props[prop::subscription_identifier_available] = sub_id_available;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
@ -117,11 +203,13 @@ BOOST_AUTO_TEST_CASE(test_large_subscription_id) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(
|
||||
ioc.get_executor(), std::move(props)
|
||||
);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == 1);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == sub_id_available);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, auto, auto) {
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::subscription_identifier_not_available);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
subscribe_props sub_props_big_id {};
|
||||
@ -134,13 +222,15 @@ BOOST_AUTO_TEST_CASE(test_large_subscription_id) {
|
||||
{{ "topic", { qos_e::exactly_once } }}, sub_props_big_id
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_subscription_ids_not_supported) {
|
||||
uint8_t sub_id_available = 0;
|
||||
|
||||
connack_props props;
|
||||
props[prop::subscription_identifier_available] = uint8_t(0);
|
||||
props[prop::subscription_identifier_available] = sub_id_available;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
@ -150,11 +240,13 @@ BOOST_AUTO_TEST_CASE(test_subscription_ids_not_supported) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(
|
||||
ioc.get_executor(), std::move(props)
|
||||
);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == 0);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::subscription_identifier_available) == sub_id_available);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, auto, auto) {
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::subscription_identifier_not_available);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
subscribe_props sub_props {};
|
||||
@ -167,8 +259,113 @@ BOOST_AUTO_TEST_CASE(test_subscription_ids_not_supported) {
|
||||
{{ "topic", { qos_e::exactly_once } }}, sub_props
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_packet_too_large) {
|
||||
int max_packet_sz = 10;
|
||||
|
||||
connack_props props;
|
||||
props[prop::maximum_packet_size] = max_packet_sz;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = test::test_service<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(
|
||||
ioc.get_executor(), std::move(props)
|
||||
);
|
||||
BOOST_ASSERT(svc_ptr->connack_property(prop::maximum_packet_size) == max_packet_sz);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::packet_too_large);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
|
||||
};
|
||||
|
||||
detail::subscribe_op<
|
||||
client_service_type, decltype(handler)
|
||||
> { svc_ptr, std::move(handler) }
|
||||
.perform(
|
||||
{ { "topic", { qos_e::exactly_once } } }, subscribe_props {}
|
||||
);
|
||||
|
||||
ioc.run_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_resending) {
|
||||
using test::after;
|
||||
using std::chrono_literals::operator ""ms;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
// data
|
||||
std::vector<subscribe_topic> topics = {
|
||||
subscribe_topic { "topic", subscribe_options {} }
|
||||
};
|
||||
subscribe_props subscribe_props;
|
||||
|
||||
std::vector<uint8_t> rcs = { reason_codes::granted_qos_0.value() };
|
||||
suback_props suback_props;
|
||||
|
||||
// packets
|
||||
auto connect = encoders::encode_connect(
|
||||
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
|
||||
);
|
||||
auto connack = encoders::encode_connack(false, reason_codes::success.value(), {});
|
||||
|
||||
auto subscribe = encoders::encode_subscribe(1, topics, subscribe_props);
|
||||
auto suback = encoders::encode_suback(1, rcs, suback_props);
|
||||
|
||||
test::msg_exchange broker_side;
|
||||
error_code success {};
|
||||
error_code fail = asio::error::not_connected;
|
||||
|
||||
broker_side
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(subscribe)
|
||||
.complete_with(fail, after(0ms))
|
||||
.expect(connect)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(connack, after(0ms))
|
||||
.expect(subscribe)
|
||||
.complete_with(success, after(0ms))
|
||||
.reply_with(suback, after(0ms));
|
||||
|
||||
asio::io_context ioc;
|
||||
auto executor = ioc.get_executor();
|
||||
auto& broker = asio::make_service<test::test_broker>(
|
||||
ioc, executor, std::move(broker_side)
|
||||
);
|
||||
|
||||
using client_type = mqtt_client<test::test_stream>;
|
||||
client_type c(executor, "");
|
||||
c.brokers("127.0.0.1")
|
||||
.run();
|
||||
|
||||
c.async_subscribe(
|
||||
topics, subscribe_props,
|
||||
[&](error_code ec, std::vector<reason_code> rcs, auto) {
|
||||
handlers_called++;
|
||||
|
||||
BOOST_CHECK(!ec);
|
||||
BOOST_ASSERT(rcs.size() == 1);
|
||||
BOOST_CHECK_EQUAL(rcs[0], reason_codes::granted_qos_0);
|
||||
|
||||
c.cancel();
|
||||
}
|
||||
);
|
||||
|
||||
ioc.run_for(std::chrono::seconds(10));
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
BOOST_CHECK(broker.received_all_expected());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
Reference in New Issue
Block a user