Complete serialization, publish_rec_op & unsubscribe tests

Summary: related to T12015

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Differential Revision: https://repo.mireo.local/D27356
This commit is contained in:
Korina Šimičević
2024-01-15 08:48:34 +01:00
parent 3629c19ce9
commit e3eb408c98
9 changed files with 943 additions and 238 deletions

View File

@ -156,57 +156,6 @@ struct subscribe_topic {
subscribe_options sub_opts;
};
/*
reason codes:
0x00 success
0x01 success_qos_1
0x02 success_qos_2
0x04 disconnect_with_will_message
0x10 no_matching_subscribers
0x11 no_subscription_existed
0x18 continue_authentication
0x19 re_authenticate
0x80 failure
0x81 malformed_packet
0x82 protocol_error
0x83 implementation_specific_error
0x84 unsupported_protocol_version
0x85 client_identifier_not_valid
0x86 bad_user_name_or_password
0x87 not_authorized
0x88 server_unavailable
0x89 server_busy
0x8a banned
0x8b server_shutting_down
0x8c bad_authentication_method
0x8d keep_alive_timeout
0x8e session_taken_over
0x8f topic_filter_invalid
0x90 topic_name_invalid
0x91 packet_identifier_in_use
0x92 packet_identifier_not_found
0x93 receive_maximum_exceeded
0x94 topic_alias_invalid
0x95 packet_too_large
0x95 packet_too_large
0x96 message_rate_too_high
0x97 quota_exceeded
0x98 administrative_action
0x99 payload_format_invalid
0x9a retain_not_supported
0x9b qos_not_supported
0x9c use_another_server
0x9d server_moved
0x9e shared_subscriptions_not_supported
0x9f connection_rate_exceeded
0xa0 maximum_connect_time
0xa1 subscription_identifiers_not_supported
0xa2 wildcard_subscriptions_not_supported
*/
/// \cond
class connect_props : public prop::properties<

View File

@ -16,11 +16,14 @@ add_executable(
test/client_broker.cpp
test/compilation_checks.cpp
test/coroutine.cpp
test/disconnect_op.cpp
test/publish_rec_op.cpp
test/publish_send_op.cpp
test/serialization.cpp
test/session.cpp
test/string_validation.cpp
test/subscribe_op.cpp
test/unsubscribe_op.cpp
)
target_include_directories(mqtt-test PRIVATE include)

View File

@ -61,6 +61,22 @@ public:
};
template <
typename StreamType,
typename TlsContext = std::monostate
>
class overrun_client : public async_mqtt5::detail::client_service<StreamType, TlsContext> {
public:
overrun_client(const asio::any_io_executor& ex, const std::string& cnf) :
async_mqtt5::detail::client_service<StreamType, TlsContext>(ex, cnf)
{}
uint16_t allocate_pid() {
return 0;
}
};
} // end namespace async_mqtt5::test
#endif // ASYNC_MQTT5_TEST_TEST_SERVICE_HPP

View File

@ -155,82 +155,6 @@ BOOST_AUTO_TEST_CASE(two_publishes_qos_1_with_fail_on_write) {
BOOST_CHECK(broker.received_all_expected());
}
BOOST_AUTO_TEST_CASE(receive_publish_qos_2) {
using test::after;
using std::chrono_literals::operator ""ms;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
std::string topic = "topic";
std::string payload = "payload";
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
);
auto connack = encoders::encode_connack(
false, reason_codes::success.value(), {}
);
auto publish = encoders::encode_publish(
1, topic, payload, qos_e::exactly_once, retain_e::no, dup_e::no, {}
);
auto pubrec = encoders::encode_pubrec(
1, reason_codes::success.value(), {}
);
auto pubrel = encoders::encode_pubrel(
1, reason_codes::success.value(), {}
);
auto pubcomp = encoders::encode_pubcomp(
1, reason_codes::success.value(), {}
);
test::msg_exchange broker_side;
error_code success {};
broker_side
.expect(connect)
.complete_with(success, after(10ms))
.reply_with(connack, after(15ms))
.send(publish, after(300ms))
.expect(pubrec)
.complete_with(success, after(10ms))
.reply_with(pubrel, after(15ms))
.expect(pubcomp)
.complete_with(success, after(5ms));
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side)
);
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
c.async_receive(
[&](
error_code ec,
std::string rec_topic, std::string rec_payload,
publish_props
) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_EQUAL(topic, rec_topic);
BOOST_CHECK_EQUAL(payload, rec_payload);
++handlers_called;
}
);
asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::seconds(1));
timer.async_wait([&](auto) { c.cancel(); });
ioc.run();
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
BOOST_CHECK(broker.received_all_expected());
}
BOOST_AUTO_TEST_CASE(send_publish_qos_2_with_fail_on_read) {
using test::after;

View File

@ -0,0 +1,330 @@
#include <boost/test/unit_test.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/impl/publish_rec_op.hpp>
#include "test_common/message_exchange.hpp"
#include "test_common/test_service.hpp"
#include "test_common/test_stream.hpp"
using namespace async_mqtt5;
BOOST_AUTO_TEST_SUITE(publish_rec_op/*, *boost::unit_test::disabled()*/)
template <qos_e qos>
void receive_publish() {
using test::after;
using std::chrono_literals::operator ""ms;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
std::string topic = "topic";
std::string payload = "payload";
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
);
auto connack = encoders::encode_connack(
false, reason_codes::success.value(), {}
);
auto publish = encoders::encode_publish(
1, topic, payload, qos, retain_e::no, dup_e::no, {}
);
auto puback = encoders::encode_puback(1, reason_codes::success.value(), {});
auto pubrec = encoders::encode_pubrec(1, reason_codes::success.value(), {});
auto pubrel = encoders::encode_pubrel(1, reason_codes::success.value(), {});
auto pubcomp = encoders::encode_pubcomp(1, reason_codes::success.value(), {});
test::msg_exchange broker_side;
error_code success {};
broker_side
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.send(publish, after(10ms));
if constexpr (qos == qos_e::at_least_once) {
broker_side
.expect(puback)
.complete_with(success, after(1ms));
} else if constexpr (qos == qos_e::exactly_once) {
broker_side
.expect(pubrec)
.complete_with(success, after(1ms))
.reply_with(pubrel, after(2ms))
.expect(pubcomp)
.complete_with(success, after(1ms));
}
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side)
);
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
c.async_receive(
[&](
error_code ec,
std::string rec_topic, std::string rec_payload,
publish_props
) {
++handlers_called;
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_EQUAL(topic, rec_topic);
BOOST_CHECK_EQUAL(payload, rec_payload);
c.cancel();
}
);
ioc.run_for(std::chrono::seconds(10));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
BOOST_CHECK(broker.received_all_expected());
}
BOOST_AUTO_TEST_CASE(receive_publish_qos0) {
receive_publish<qos_e::at_most_once>();
}
BOOST_AUTO_TEST_CASE(receive_publish_qos1) {
receive_publish<qos_e::at_least_once>();
}
BOOST_AUTO_TEST_CASE(receive_publish_qos2) {
receive_publish<qos_e::exactly_once>();
}
BOOST_AUTO_TEST_CASE(test_reconnect) {
using test::after;
using std::chrono_literals::operator ""ms;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
std::string topic = "topic";
std::string payload = "payload";
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
);
auto connack = encoders::encode_connack(
false, reason_codes::success.value(), {}
);
auto publish = encoders::encode_publish(
1, topic, payload, qos_e::exactly_once, retain_e::yes, dup_e::no, {}
);
auto pubrec = encoders::encode_pubrec(1, reason_codes::success.value(), {});
auto pubrel = encoders::encode_pubrel(1, reason_codes::success.value(), {});
auto pubcomp = encoders::encode_pubcomp(1, reason_codes::success.value(), {});
test::msg_exchange broker_side;
error_code success {};
error_code fail = asio::error::not_connected;
broker_side
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.send(publish, after(10ms))
.expect(pubrec)
.complete_with(success, after(1ms))
.reply_with(pubrel, after(2ms))
.expect(pubcomp)
.complete_with(fail, after(1ms))
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.send(pubrel, after(10ms))
.expect(pubcomp)
.complete_with(success, after(1ms));
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side)
);
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
c.async_receive(
[&](
error_code ec,
std::string rec_topic, std::string rec_payload,
publish_props
) {
++handlers_called;
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_EQUAL(topic, rec_topic);
BOOST_CHECK_EQUAL(payload, rec_payload);
c.cancel();
}
);
ioc.run_for(std::chrono::seconds(10));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
BOOST_CHECK(broker.received_all_expected());
}
BOOST_AUTO_TEST_CASE(test_malformed_publish) {
using test::after;
using std::chrono_literals::operator ""ms;
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
);
auto connack = encoders::encode_connack(
false, reason_codes::success.value(), {}
);
auto publish = encoders::encode_publish(
1, "topic", "payload", static_cast<qos_e>(0b11), retain_e::yes, dup_e::no, {}
);
disconnect_props dprops;
dprops[prop::reason_string] = "Malformed PUBLISH received: QoS bits set to 0b11";
auto disconnect = encoders::encode_disconnect(
reason_codes::malformed_packet.value(), dprops
);
test::msg_exchange broker_side;
error_code success {};
broker_side
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.send(publish, after(10ms))
.expect(disconnect)
.complete_with(success, after(1ms));
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side)
);
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::seconds(4));
timer.async_wait([&](auto) { c.cancel(); });
ioc.run();
BOOST_CHECK(broker.received_all_expected());
}
// does not receive the malformed pubrel
BOOST_AUTO_TEST_CASE(test_malformed_pubrel, *boost::unit_test::disabled()) {
using test::after;
using std::chrono_literals::operator ""ms;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
std::string topic = "topic";
std::string payload = "payload";
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
);
auto connack = encoders::encode_connack(
false, reason_codes::success.value(), {}
);
auto publish = encoders::encode_publish(
1, topic, payload, qos_e::exactly_once, retain_e::yes, dup_e::no, {}
);
auto pubrec = encoders::encode_pubrec(1, reason_codes::success.value(), {});
auto pubrel = encoders::encode_pubrel(1, reason_codes::success.value(), {});
auto malformed_pubrel = encoders::encode_pubrel(1, 0x04, {});
auto pubcomp = encoders::encode_pubcomp(1, reason_codes::success.value(), {});
disconnect_props dprops;
dprops[prop::reason_string] = "Malformed PUBREL received: invalid Reason Code";
auto disconnect = encoders::encode_disconnect(
reason_codes::malformed_packet.value(), dprops
);
test::msg_exchange broker_side;
error_code success {};
broker_side
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.send(publish, after(10ms))
.expect(pubrec)
.complete_with(success, after(1ms))
.reply_with(malformed_pubrel, after(2ms))
.expect(disconnect)
.complete_with(success, after(1ms))
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.send(pubrel, after(10ms))
.expect(pubcomp)
.complete_with(success, after(1ms));
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side)
);
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
c.async_receive(
[&](
error_code ec,
std::string rec_topic, std::string rec_payload,
publish_props
) {
++handlers_called;
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_EQUAL(topic, rec_topic);
BOOST_CHECK_EQUAL(payload, rec_payload);
c.cancel();
}
);
ioc.run_for(std::chrono::seconds(10));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
BOOST_CHECK(broker.received_all_expected());
}
BOOST_AUTO_TEST_SUITE_END();

View File

@ -18,27 +18,13 @@ using namespace async_mqtt5;
BOOST_AUTO_TEST_SUITE(publish_send_op/*, *boost::unit_test::disabled()*/)
template <
typename StreamType,
typename TlsContext = std::monostate
>
class overrun_client : public detail::client_service<StreamType, TlsContext> {
public:
overrun_client(const asio::any_io_executor& ex, const std::string& cnf) :
detail::client_service<StreamType, TlsContext>(ex, cnf)
{}
uint16_t allocate_pid() {
return 0;
}
};
BOOST_AUTO_TEST_CASE(test_pid_overrun) {
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
asio::io_context ioc;
using client_service_type = overrun_client<asio::ip::tcp::socket>;
using client_service_type = test::overrun_client<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor(), "");
auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) {
@ -146,7 +132,7 @@ BOOST_AUTO_TEST_CASE(test_packet_too_large) {
int max_packet_sz = 10;
connack_props props;
props[prop::maximum_packet_size] = 10;
props[prop::maximum_packet_size] = max_packet_sz;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
@ -359,7 +345,6 @@ BOOST_AUTO_TEST_CASE(test_malformed_puback) {
.reply_with(malformed_puback, after(0ms))
.expect(disconnect)
.complete_with(success, after(0ms))
.reply_with(std::string {}, after(0ms))
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
@ -446,7 +431,6 @@ BOOST_AUTO_TEST_CASE(test_malformed_pubrec_pubcomp) {
.reply_with(malformed_pubrec, after(0ms))
.expect(disconnect_on_pubrec)
.complete_with(success, after(0ms))
.reply_with(std::string {}, after(0ms))
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
@ -458,7 +442,6 @@ BOOST_AUTO_TEST_CASE(test_malformed_pubrec_pubcomp) {
.reply_with(malformed_pubcomp, after(0ms))
.expect(disconnect_on_pubcomp)
.complete_with(success, after(0ms))
.reply_with(std::string {}, after(0ms))
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))

View File

@ -12,23 +12,53 @@ BOOST_AUTO_TEST_SUITE(serialization/*, *boost::unit_test::disabled()*/)
BOOST_AUTO_TEST_CASE(test_connect) {
// testing variables
// connect
std::string_view client_id = "async_mqtt_client_id";
std::string_view uname = "username";
std::optional<std::string_view> password = std::nullopt;
uint16_t keep_alive = 60;
bool clean_start = true;
// connect properties
int32_t session_expiry_interval = 29;
int16_t receive_max = 100;
int32_t maximum_packet_size = 20000;
uint16_t topic_alias_max = 1200;
uint8_t request_response_information = 1;
uint8_t request_problem_information = 0;
std::string_view user_property = "user prop";
std::string_view auth_method = "method";
std::string_view auth_data = "data";
// will
std::string will_topic = "will_topic";
std::string will_message = "will_message";
// will properties
int32_t will_delay_interval = 200;
uint8_t will_payload_format_indicator = 0;
int32_t will_message_expiry_interval = 2000;
std::string_view will_content_type = "will content type";
std::string_view will_response_topic = "response_topic";
std::string_view will_correlation_data = "correlation data";
std::string_view will_user_property = "will prop";
connect_props cprops;
cprops[prop::session_expiry_interval] = 29;
cprops[prop::user_property].emplace_back("connect user prop");
cprops[prop::session_expiry_interval] = session_expiry_interval;
cprops[prop::receive_maximum] = receive_max;
cprops[prop::maximum_packet_size] = maximum_packet_size;
cprops[prop::topic_alias_maximum] = topic_alias_max;
cprops[prop::request_response_information] = request_response_information;
cprops[prop::request_problem_information] = request_problem_information;
cprops[prop::user_property].emplace_back(user_property);
cprops[prop::authentication_method] = auth_method;
cprops[prop::authentication_data] = auth_data;
will w{ will_topic, will_message };
w[prop::content_type] = "will content type";
w[prop::response_topic] = "will response topic";
w[prop::user_property].emplace_back("first user prop");
w[prop::user_property].emplace_back("second user prop");
will w { will_topic, will_message };
w[prop::will_delay_interval] = will_delay_interval;
w[prop::payload_format_indicator] = will_payload_format_indicator;
w[prop::message_expiry_interval] = will_message_expiry_interval;
w[prop::content_type] = will_content_type;
w[prop::response_topic] = will_response_topic;
w[prop::correlation_data] = will_correlation_data;
w[prop::user_property].emplace_back(will_user_property);
std::optional<will> will_opt { std::move(w) };
auto msg = encoders::encode_connect(
@ -43,28 +73,84 @@ BOOST_AUTO_TEST_CASE(test_connect) {
auto rv = decoders::decode_connect(remain_length, it);
BOOST_CHECK_MESSAGE(rv, "Parsing CONNECT failed.");
const auto& [client_id_, uname_, password_, keep_alive_, clean_start_, _, w_] = *rv;
const auto& [client_id_, uname_, password_, keep_alive_, clean_start_, cprops_, w_] = *rv;
BOOST_CHECK_EQUAL(client_id_, client_id);
BOOST_CHECK(uname_);
BOOST_CHECK_EQUAL(*uname_, uname);
BOOST_CHECK(password_ == password);
BOOST_CHECK_EQUAL(keep_alive_, keep_alive);
BOOST_CHECK_EQUAL(clean_start_, clean_start);
cprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*cprops_[prop::session_expiry_interval], session_expiry_interval);
BOOST_CHECK_EQUAL(*cprops_[prop::receive_maximum], receive_max);
BOOST_CHECK_EQUAL(*cprops_[prop::maximum_packet_size], maximum_packet_size);
BOOST_CHECK_EQUAL(*cprops_[prop::topic_alias_maximum], topic_alias_max);
BOOST_CHECK_EQUAL(*cprops_[prop::request_response_information], request_response_information);
BOOST_CHECK_EQUAL(*cprops_[prop::request_problem_information], request_problem_information);
BOOST_CHECK_EQUAL(cprops_[prop::user_property][0], user_property);
BOOST_CHECK_EQUAL(*cprops_[prop::authentication_method], auth_method);
BOOST_CHECK_EQUAL(*cprops_[prop::authentication_data], auth_data);
BOOST_CHECK(w_);
BOOST_CHECK_EQUAL((*w_).topic(), will_topic);
BOOST_CHECK_EQUAL((*w_).message(), will_message);
(*w_).visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*(*w_)[prop::will_delay_interval], will_delay_interval);
BOOST_CHECK_EQUAL(*(*w_)[prop::payload_format_indicator], will_payload_format_indicator);
BOOST_CHECK_EQUAL(*(*w_)[prop::message_expiry_interval], will_message_expiry_interval);
BOOST_CHECK_EQUAL(*(*w_)[prop::content_type], will_content_type);
BOOST_CHECK_EQUAL(*(*w_)[prop::response_topic], will_response_topic);
BOOST_CHECK_EQUAL(*(*w_)[prop::correlation_data], will_correlation_data);
BOOST_CHECK_EQUAL((*w_)[prop::user_property][0], will_user_property);
}
BOOST_AUTO_TEST_CASE(test_connack) {
// testing variables
uint8_t session_present = 1;
uint8_t reason_code = 0x89;
uint8_t wildcard_sub = 1;
uint8_t reason_code = reason_codes::server_busy.value();
connack_props cap;
cap[prop::wildcard_subscription_available] = wildcard_sub;
auto msg = encoders::encode_connack(session_present, reason_code, cap);
int32_t session_expiry_interval = 20;
int16_t receive_maximum = 2000;
uint8_t max_qos = 2;
uint8_t retain_available = 0;
int32_t maximum_packet_sz = 1024;
std::string assigned_client_id = "client_id";
uint16_t topic_alias_max = 128;
std::string reason_string = "some reason string";
std::string user_property = "property";
uint8_t wildcard_sub = 1;
uint8_t sub_id = 1;
uint8_t shared_sub = 0;
int16_t server_keep_alive = 25;
std::string response_information = "info";
std::string server_reference = "srv";
std::string authentication_method = "method";
std::string authentication_data = "data";
connack_props cprops;
cprops[prop::session_expiry_interval] = session_expiry_interval;
cprops[prop::receive_maximum] = receive_maximum;
cprops[prop::maximum_qos] = max_qos;
cprops[prop::retain_available] = retain_available;
cprops[prop::maximum_packet_size] = maximum_packet_sz;
cprops[prop::assigned_client_identifier] = assigned_client_id;
cprops[prop::topic_alias_maximum] = topic_alias_max;
cprops[prop::reason_string] = reason_string;
cprops[prop::user_property].push_back(user_property);
cprops[prop::wildcard_subscription_available] = wildcard_sub;
cprops[prop::subscription_identifier_available] = sub_id;
cprops[prop::shared_subscription_available] = shared_sub;
cprops[prop::server_keep_alive] = server_keep_alive;
cprops[prop::response_information] = response_information;
cprops[prop::server_reference] = server_reference;
cprops[prop::authentication_method] = authentication_method;
cprops[prop::authentication_data] = authentication_data;
auto msg = encoders::encode_connack(session_present, reason_code, cprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -74,10 +160,28 @@ BOOST_AUTO_TEST_CASE(test_connack) {
auto rv = decoders::decode_connack(remain_length, it);
BOOST_CHECK_MESSAGE(rv, "Parsing CONNACK failed.");
const auto& [session_present_, reason_code_, caprops] = *rv;
const auto& [session_present_, reason_code_, cprops_] = *rv;
BOOST_CHECK_EQUAL(session_present_, session_present);
BOOST_CHECK_EQUAL(reason_code_, reason_code);
BOOST_CHECK_EQUAL(*caprops[prop::wildcard_subscription_available], wildcard_sub);
cprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*cprops_[prop::session_expiry_interval], session_expiry_interval);
BOOST_CHECK_EQUAL(*cprops_[prop::receive_maximum], receive_maximum);
BOOST_CHECK_EQUAL(*cprops_[prop::maximum_qos], max_qos);
BOOST_CHECK_EQUAL(*cprops_[prop::retain_available], retain_available);
BOOST_CHECK_EQUAL(*cprops_[prop::maximum_packet_size], maximum_packet_sz);
BOOST_CHECK_EQUAL(*cprops_[prop::assigned_client_identifier], assigned_client_id);
BOOST_CHECK_EQUAL(*cprops_[prop::topic_alias_maximum], topic_alias_max);
BOOST_CHECK_EQUAL(*cprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(cprops_[prop::user_property][0], user_property);
BOOST_CHECK_EQUAL(*cprops_[prop::wildcard_subscription_available], wildcard_sub);
BOOST_CHECK_EQUAL(*cprops_[prop::subscription_identifier_available], sub_id);
BOOST_CHECK_EQUAL(*cprops_[prop::shared_subscription_available], shared_sub);
BOOST_CHECK_EQUAL(*cprops_[prop::server_keep_alive], server_keep_alive);
BOOST_CHECK_EQUAL(*cprops_[prop::response_information], response_information);
BOOST_CHECK_EQUAL(*cprops_[prop::server_reference], server_reference);
BOOST_CHECK_EQUAL(*cprops_[prop::authentication_method], authentication_method);
BOOST_CHECK_EQUAL(*cprops_[prop::authentication_data], authentication_data);
}
BOOST_AUTO_TEST_CASE(test_publish) {
@ -85,21 +189,32 @@ BOOST_AUTO_TEST_CASE(test_publish) {
uint16_t packet_id = 31283;
std::string_view topic = "publish_topic";
std::string_view payload = "This is some payload I am publishing!";
uint8_t message_expiry = 70;
std::string content_type = "application/octet-stream";
uint8_t payload_format_indicator = 1;
int32_t message_expiry_interval = 70;
int16_t topic_alias = 16;
std::string response_topic = "topic/response";
std::string correlation_data = "correlation data";
std::string publish_prop_1 = "first publish prop";
std::string publish_prop_2 = "second publish prop";
uint32_t subscription_identifier = 123456;
std::string content_type = "application/octet-stream";
publish_props pp;
pp[prop::message_expiry_interval] = message_expiry;
pp[prop::content_type] = content_type;
pp[prop::user_property].emplace_back(publish_prop_1);
pp[prop::user_property].emplace_back(publish_prop_2);
publish_props pprops;
pprops[prop::payload_format_indicator] = payload_format_indicator;
pprops[prop::message_expiry_interval] = message_expiry_interval;
pprops[prop::topic_alias] = topic_alias;
pprops[prop::response_topic] = response_topic;
pprops[prop::correlation_data] = correlation_data;
pprops[prop::user_property].emplace_back(publish_prop_1);
pprops[prop::user_property].emplace_back(publish_prop_2);
pprops[prop::subscription_identifier] = subscription_identifier;
pprops[prop::content_type] = content_type;
auto msg = encoders::encode_publish(
packet_id, topic, payload,
qos_e::at_least_once, retain_e::yes, dup_e::no,
pp
pprops
);
byte_citer it = msg.cbegin(), last = msg.cend();
@ -110,15 +225,22 @@ BOOST_AUTO_TEST_CASE(test_publish) {
auto rv = decoders::decode_publish(control_byte, remain_length, it);
BOOST_CHECK_MESSAGE(rv, "Parsing PUBLISH failed.");
const auto& [topic_, packet_id_, flags, pprops, payload_] = *rv;
const auto& [topic_, packet_id_, flags, pprops_, payload_] = *rv;
BOOST_CHECK(packet_id);
BOOST_CHECK_EQUAL(*packet_id_, packet_id);
BOOST_CHECK_EQUAL(topic_, topic);
BOOST_CHECK_EQUAL(payload_, payload);
BOOST_CHECK_EQUAL(*pprops[prop::message_expiry_interval], message_expiry);
BOOST_CHECK_EQUAL(*pprops[prop::content_type], content_type);
BOOST_CHECK_EQUAL(pprops[prop::user_property][0], publish_prop_1);
BOOST_CHECK_EQUAL(pprops[prop::user_property][1], publish_prop_2);
pprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*pprops_[prop::payload_format_indicator], payload_format_indicator);
BOOST_CHECK_EQUAL(*pprops_[prop::message_expiry_interval], message_expiry_interval);
BOOST_CHECK_EQUAL(*pprops_[prop::topic_alias], topic_alias);
BOOST_CHECK_EQUAL(*pprops_[prop::response_topic], response_topic);
BOOST_CHECK_EQUAL(*pprops_[prop::correlation_data], correlation_data);
BOOST_CHECK_EQUAL(pprops_[prop::user_property][0], publish_prop_1);
BOOST_CHECK_EQUAL(pprops_[prop::user_property][1], publish_prop_2);
BOOST_CHECK_EQUAL(*pprops_[prop::subscription_identifier], subscription_identifier);
BOOST_CHECK_EQUAL(*pprops_[prop::content_type], content_type);
}
BOOST_AUTO_TEST_CASE(test_large_publish) {
@ -130,7 +252,7 @@ BOOST_AUTO_TEST_CASE(test_large_publish) {
auto msg = encoders::encode_publish(
packet_id, topic, large_payload,
qos_e::at_least_once, retain_e::yes, dup_e::no,
publish_props{}
publish_props {}
);
byte_citer it = msg.cbegin(), last = msg.cend();
@ -152,14 +274,15 @@ BOOST_AUTO_TEST_CASE(test_puback) {
// testing variables
uint16_t packet_id = 9199;
uint8_t reason_code = 0x93;
std::string reason_string = "PUBACK reason string";
std::string user_prop = "PUBACK user prop";
puback_props pp;
pp[prop::reason_string] = reason_string;
pp[prop::user_property].emplace_back(user_prop);
puback_props pprops;
pprops[prop::reason_string] = reason_string;
pprops[prop::user_property].emplace_back(user_prop);
auto msg = encoders::encode_puback(packet_id, reason_code, pp);
auto msg = encoders::encode_puback(packet_id, reason_code, pprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -173,25 +296,140 @@ BOOST_AUTO_TEST_CASE(test_puback) {
auto rv = decoders::decode_puback(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(rv, "Parsing PUBACK failed.");
const auto& [reason_code_, pprops] = *rv;
const auto& [reason_code_, pprops_] = *rv;
pprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(reason_code_, reason_code);
BOOST_CHECK_EQUAL(*pprops[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(pprops[prop::user_property][0], user_prop);
BOOST_CHECK_EQUAL(*pprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(pprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_pubrec) {
// testing variables
uint16_t packet_id = 8534;
uint8_t reason_code = 0x92;
std::string reason_string = "PUBREC reason string";
std::string user_prop = "PUBREC user prop";
pubrec_props pprops;
pprops[prop::reason_string] = reason_string;
pprops[prop::user_property].emplace_back(user_prop);
auto msg = encoders::encode_pubrec(packet_id, reason_code, pprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
BOOST_CHECK_MESSAGE(header, "Parsing PUBREC fixed header failed.");
auto packet_id_ = decoders::decode_packet_id(it);
BOOST_CHECK(packet_id);
BOOST_CHECK_EQUAL(*packet_id_, packet_id);
const auto& [control_byte, remain_length] = *header;
auto rv = decoders::decode_pubrec(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(rv, "Parsing PUBREC failed.");
const auto& [reason_code_, pprops_] = *rv;
pprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(reason_code_, reason_code);
BOOST_CHECK_EQUAL(*pprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(pprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_pubrel) {
// testing variables
uint16_t packet_id = 21455;
uint8_t reason_code = 0x00;
std::string reason_string = "PUBREL reason string";
std::string user_prop = "PUBREL user prop";
pubrel_props pprops;
pprops[prop::reason_string] = reason_string;
pprops[prop::user_property].emplace_back(user_prop);
auto msg = encoders::encode_pubrel(packet_id, reason_code, pprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
BOOST_CHECK_MESSAGE(header, "Parsing PUBREL fixed header failed.");
auto packet_id_ = decoders::decode_packet_id(it);
BOOST_CHECK(packet_id);
BOOST_CHECK_EQUAL(*packet_id_, packet_id);
const auto& [control_byte, remain_length] = *header;
auto rv = decoders::decode_pubrel(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(rv, "Parsing PUBREL failed.");
const auto& [reason_code_, pprops_] = *rv;
pprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(reason_code_, reason_code);
BOOST_CHECK_EQUAL(*pprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(pprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_pubcomp) {
// testing variables
uint16_t packet_id = 21455;
uint8_t reason_code = 0x00;
std::string reason_string = "PUBCOMP reason string";
std::string user_prop = "PUBCOMP user prop";
pubcomp_props pprops;
pprops[prop::reason_string] = reason_string;
pprops[prop::user_property].emplace_back(user_prop);
auto msg = encoders::encode_pubcomp(packet_id, reason_code, pprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
BOOST_CHECK_MESSAGE(header, "Parsing PUBCOMP fixed header failed.");
auto packet_id_ = decoders::decode_packet_id(it);
BOOST_CHECK(packet_id);
BOOST_CHECK_EQUAL(*packet_id_, packet_id);
const auto& [control_byte, remain_length] = *header;
auto rv = decoders::decode_pubcomp(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(rv, "Parsing PUBCOMP failed.");
const auto& [reason_code_, pprops_] = *rv;
pprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(reason_code_, reason_code);
BOOST_CHECK_EQUAL(*pprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(pprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_subscribe) {
//testing variables
uint32_t sub_id = 1'234'567;
std::string user_prop = "SUBSCRIBE user prop";
subscribe_props sp;
sp[prop::subscription_identifier] = sub_id;
// delete using lines when we shorten the enum names
using no_local_e = subscribe_options::no_local_e;
using retain_as_published_e = subscribe_options::retain_as_published_e;
using retain_handling_e = subscribe_options::retain_handling_e;
qos_e qos = qos_e::at_least_once;
no_local_e no_local = no_local_e::yes;
retain_as_published_e retain_as_published = retain_as_published_e::retain;
retain_handling_e retain_handling = retain_handling_e::not_send;
subscribe_props sprops;
sprops[prop::subscription_identifier] = sub_id;
sprops[prop::user_property].push_back(user_prop);
std::vector<subscribe_topic> filters {
{ "subscribe topic", { qos_e::at_least_once } }
{
"subscribe topic",
{ qos, no_local, retain_as_published, retain_handling }
}
};
uint16_t packet_id = 65535;
auto msg = encoders::encode_subscribe(packet_id, filters, sp);
auto msg = encoders::encode_subscribe(packet_id, filters, sprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -204,19 +442,35 @@ BOOST_AUTO_TEST_CASE(test_subscribe) {
auto rv = decoders::decode_subscribe(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(rv, "Parsing SUBSCRIBE failed.");
const auto& [props_, filters_] = *rv;
BOOST_CHECK_EQUAL(std::get<0>(filters_[0]), filters[0].topic_filter);
BOOST_CHECK_EQUAL(*props_[prop::subscription_identifier], sub_id);
//TODO: sub options
const auto& [sprops_, filters_] = *rv;
const auto& filter_ = filters_[0];
BOOST_CHECK_EQUAL(std::get<0>(filter_), filters[0].topic_filter);
uint8_t options_ = std::get<1>(filter_);
uint8_t mask = (static_cast<uint8_t>(retain_handling) << 4) |
(static_cast<uint8_t>(retain_as_published) << 3) |
(static_cast<uint8_t>(no_local) << 2) |
static_cast<uint8_t>(qos);
BOOST_CHECK_EQUAL(options_, mask);
sprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*sprops_[prop::subscription_identifier], sub_id);
BOOST_CHECK_EQUAL(sprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_suback) {
//testing variables
suback_props sp;
std::vector<uint8_t> reason_codes { 48, 28 };
uint16_t packet_id = 142;
std::vector<uint8_t> reason_codes { 48, 28 };
auto msg = encoders::encode_suback(packet_id, reason_codes, sp);
std::string reason_string = "subscription accepted";
std::string user_prop = "SUBACK user prop";
suback_props sprops;
sprops[prop::reason_string] = reason_string;
sprops[prop::user_property].push_back(user_prop);
auto msg = encoders::encode_suback(packet_id, reason_codes, sprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -229,17 +483,25 @@ BOOST_AUTO_TEST_CASE(test_suback) {
auto rv = decoders::decode_suback(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(rv, "Parsing SUBACK failed.");
const auto& [sp_, reason_codes_] = *rv;
const auto& [sprops_, reason_codes_] = *rv;
BOOST_CHECK(reason_codes_ == reason_codes);
sprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*sprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(sprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_unsubscribe) {
// testing variables
unsubscribe_props sp;
std::vector<std::string> topics { "first topic", "second/topic" };
uint16_t packet_id = 14423;
std::vector<std::string> topics { "first topic", "second/topic" };
auto msg = encoders::encode_unsubscribe(packet_id, topics, sp);
std::string user_prop = "UNSUBSCRIBE user prop";
unsubscribe_props uprops;
uprops[prop::user_property].push_back(user_prop);
auto msg = encoders::encode_unsubscribe(packet_id, topics, uprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -252,19 +514,26 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) {
auto rv = decoders::decode_unsubscribe(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(rv, "Parsing UNSUBSCRIBE failed.");
const auto& [props_, topics_] = *rv;
const auto& [uprops_, topics_] = *rv;
BOOST_CHECK(topics_ == topics);
uprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(uprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_unsuback) {
// testing variables
std::string reason_string = "some unsuback reason string";
unsuback_props sp;
sp[prop::reason_string] = reason_string;
std::vector<uint8_t> reason_codes { 48, 28 };
uint16_t packet_id = 42;
std::vector<uint8_t> reason_codes{ 48, 28 };
auto msg = encoders::encode_unsuback(packet_id, reason_codes, sp);
std::string reason_string = "some unsuback reason string";
std::string user_prop = "UNSUBACK user prop";
unsuback_props uprops;
uprops[prop::reason_string] = reason_string;
uprops[prop::user_property].push_back(user_prop);
auto msg = encoders::encode_unsuback(packet_id, reason_codes, uprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -277,19 +546,30 @@ BOOST_AUTO_TEST_CASE(test_unsuback) {
auto rv = decoders::decode_unsuback(remain_length - sizeof(uint16_t), it);
BOOST_CHECK_MESSAGE(header, "Parsing UNSUBACK failed.");
const auto& [props_, reason_codes_] = *rv;
const auto& [uprops_, reason_codes_] = *rv;
BOOST_CHECK(reason_codes_ == reason_codes);
BOOST_CHECK_EQUAL(*props_[prop::reason_string], reason_string);
uprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*uprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(uprops_[prop::user_property][0], user_prop);
}
BOOST_AUTO_TEST_CASE(test_disconnect) {
// testing variables
uint8_t reason_code = 0;
std::string user_property = "DISCONNECT user property";
disconnect_props sp;
sp[prop::user_property].emplace_back(user_property);
uint8_t reason_code = 0x04;
auto msg = encoders::encode_disconnect(reason_code, sp);
int32_t session_expiry_interval = 50;
std::string reason_string = "a reason";
std::string user_property = "DISCONNECT user property";
std::string server_reference = "server";
disconnect_props dprops;
dprops[prop::session_expiry_interval] = session_expiry_interval;
dprops[prop::reason_string] = reason_string;
dprops[prop::user_property].emplace_back(user_property);
dprops[prop::server_reference] = server_reference;
auto msg = encoders::encode_disconnect(reason_code, dprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -299,21 +579,33 @@ BOOST_AUTO_TEST_CASE(test_disconnect) {
auto rv = decoders::decode_disconnect(remain_length, it);
BOOST_CHECK_MESSAGE(header, "Parsing DISCONNECT failed.");
const auto& [reason_code_, props_] = *rv;
const auto& [reason_code_, dprops_] = *rv;
BOOST_CHECK_EQUAL(reason_code_, reason_code);
BOOST_CHECK_EQUAL(props_[prop::user_property][0], user_property);
dprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*dprops_[prop::session_expiry_interval], session_expiry_interval);
BOOST_CHECK_EQUAL(*dprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(dprops_[prop::user_property][0], user_property);
BOOST_CHECK_EQUAL(*dprops_[prop::server_reference], server_reference);
}
BOOST_AUTO_TEST_CASE(test_auth) {
// testing variables
uint8_t reason_code = 0x18;
std::string reason_string = "AUTH reason";
std::string user_property = "AUTH user propety";
auth_props sp;
sp[prop::reason_string] = reason_string;
sp[prop::user_property].emplace_back(user_property);
auto msg = encoders::encode_auth(reason_code, sp);
std::string authentication_method = "method";
std::string authentication_data = "data";
std::string reason_string = "reason";
std::string user_property = "AUTH user propety";
auth_props aprops;
aprops[prop::authentication_method] = authentication_method;
aprops[prop::authentication_data] = authentication_data;
aprops[prop::reason_string] = reason_string;
aprops[prop::user_property].emplace_back(user_property);
auto msg = encoders::encode_auth(reason_code, aprops);
byte_citer it = msg.cbegin(), last = msg.cend();
auto header = decoders::decode_fixed_header(it, last);
@ -323,10 +615,28 @@ BOOST_AUTO_TEST_CASE(test_auth) {
auto rv = decoders::decode_auth(remain_length, it);
BOOST_CHECK_MESSAGE(rv, "Parsing AUTH failed.");
const auto& [reason_code_, props_] = *rv;
const auto& [reason_code_, aprops_] = *rv;
BOOST_CHECK_EQUAL(reason_code_, reason_code);
BOOST_CHECK_EQUAL(*props_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(props_[prop::user_property][0], user_property);
aprops_.visit([](const auto& prop, const auto&) { BOOST_CHECK(prop); return true; });
BOOST_CHECK_EQUAL(*aprops_[prop::authentication_method], authentication_method);
BOOST_CHECK_EQUAL(*aprops_[prop::authentication_data], authentication_data);
BOOST_CHECK_EQUAL(*aprops_[prop::reason_string], reason_string);
BOOST_CHECK_EQUAL(aprops_[prop::user_property][0], user_property);
}
BOOST_AUTO_TEST_CASE(test_pingreq) {
auto msg = encoders::encode_pingreq();
auto encoded_pingreq = std::string({ -64 /* 192 */, 0 });
BOOST_CHECK_EQUAL(msg, encoded_pingreq);
}
BOOST_AUTO_TEST_CASE(test_pingresp) {
auto msg = encoders::encode_pingresp();
auto encoded_pingresp = std::string({ -48 /* 208 */, 0 });
BOOST_CHECK_EQUAL(msg, encoded_pingresp);
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -14,27 +14,12 @@ using namespace async_mqtt5;
BOOST_AUTO_TEST_SUITE(subscribe_op/*, *boost::unit_test::disabled()*/)
template <
typename StreamType,
typename TlsContext = std::monostate
>
class overrun_client : public detail::client_service<StreamType, TlsContext> {
public:
overrun_client(const asio::any_io_executor& ex, const std::string& cnf) :
detail::client_service<StreamType, TlsContext>(ex, cnf)
{}
uint16_t allocate_pid() {
return 0;
}
};
BOOST_AUTO_TEST_CASE(test_pid_overrun) {
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
asio::io_context ioc;
using client_service_type = overrun_client<asio::ip::tcp::socket>;
using client_service_type = test::overrun_client<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor(), "");
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
@ -51,7 +36,7 @@ BOOST_AUTO_TEST_CASE(test_pid_overrun) {
{ { "topic", { qos_e::exactly_once } } }, subscribe_props {}
);
ioc.run();
ioc.run_for(std::chrono::milliseconds(500));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
}

View File

@ -0,0 +1,205 @@
#include <boost/test/unit_test.hpp>
#include <boost/asio/io_context.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/impl/unsubscribe_op.hpp>
#include "test_common/message_exchange.hpp"
#include "test_common/test_service.hpp"
#include "test_common/test_stream.hpp"
using namespace async_mqtt5;
BOOST_AUTO_TEST_SUITE(unsubscribe_op/*, *boost::unit_test::disabled()*/)
BOOST_AUTO_TEST_CASE(test_pid_overrun) {
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
asio::io_context ioc;
using client_service_type = test::overrun_client<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor(), "");
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
++handlers_called;
BOOST_CHECK(ec == client::error::pid_overrun);
BOOST_ASSERT(rcs.size() == 1);
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
};
detail::unsubscribe_op<
client_service_type, decltype(handler)
> { svc_ptr, std::move(handler) }
.perform({ "topic" }, unsubscribe_props{});
ioc.run_for(std::chrono::milliseconds(500));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(test_invalid_topic_filters) {
std::vector<std::string> invalid_topics = {
"", "+topic", "#topic", "some/#/topic", "topic+"
};
const int expected_handlers_called = static_cast<int>(invalid_topics.size());
int handlers_called = 0;
asio::io_context ioc;
using client_service_type = test::test_service<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
for (const auto& topic: invalid_topics) {
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
++handlers_called;
BOOST_CHECK(ec == client::error::invalid_topic);
BOOST_ASSERT(rcs.size() == 1);
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
};
detail::unsubscribe_op<
client_service_type, decltype(handler)
> { svc_ptr, std::move(handler) }
.perform({ topic }, unsubscribe_props{});
}
ioc.run_for(std::chrono::milliseconds(500));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(test_malformed_packet) {
std::vector<std::string> test_properties = {
std::string(75000, 'a'), std::string(10, char(0x01))
};
const int expected_handlers_called = static_cast<int>(test_properties.size());
int handlers_called = 0;
asio::io_context ioc;
using client_service_type = test::test_service<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
for (const auto& test_prop: test_properties) {
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
++handlers_called;
BOOST_CHECK(ec == client::error::malformed_packet);
BOOST_ASSERT(rcs.size() == 1);
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
};
unsubscribe_props props;
props[prop::user_property].push_back(test_prop);
detail::unsubscribe_op<
client_service_type, decltype(handler)
> { svc_ptr, std::move(handler) }
.perform({ "topic" }, props);
}
ioc.run_for(std::chrono::milliseconds(500));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(test_packet_too_large) {
int max_packet_sz = 10;
connack_props props;
props[prop::maximum_packet_size] = max_packet_sz;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
asio::io_context ioc;
using client_service_type = test::test_service<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(
ioc.get_executor(), std::move(props)
);
BOOST_ASSERT(svc_ptr->connack_property(prop::maximum_packet_size) == max_packet_sz);
auto handler = [&handlers_called](error_code ec, std::vector<reason_code> rcs, auto) {
++handlers_called;
BOOST_CHECK(ec == client::error::packet_too_large);
BOOST_ASSERT(rcs.size() == 1);
BOOST_CHECK_EQUAL(rcs[0], reason_codes::empty);
};
detail::unsubscribe_op<
client_service_type, decltype(handler)
> { svc_ptr, std::move(handler) }
.perform({ "topic" }, unsubscribe_props {});
ioc.run_for(std::chrono::milliseconds(500));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
}
BOOST_AUTO_TEST_CASE(test_resending) {
using test::after;
using std::chrono_literals::operator ""ms;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
// data
std::vector<std::string> topics = { "topic " };
unsubscribe_props unsubscribe_props;
std::vector<uint8_t> rcs = { reason_codes::success.value() };
unsuback_props unsuback_props;
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
);
auto connack = encoders::encode_connack(false, reason_codes::success.value(), {});
auto unsubscribe = encoders::encode_unsubscribe(1, topics, unsubscribe_props);
auto unsuback = encoders::encode_unsuback(1, rcs, unsuback_props);
test::msg_exchange broker_side;
error_code success {};
error_code fail = asio::error::not_connected;
broker_side
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.expect(unsubscribe)
.complete_with(fail, after(0ms))
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.expect(unsubscribe)
.complete_with(success, after(0ms))
.reply_with(unsuback, after(0ms));
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side)
);
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
c.async_unsubscribe(
topics, unsubscribe_props,
[&](error_code ec, std::vector<reason_code> rcs, auto) {
handlers_called++;
BOOST_CHECK(!ec);
BOOST_ASSERT(rcs.size() == 1);
BOOST_CHECK_EQUAL(rcs[0], reason_codes::success);
c.cancel();
}
);
ioc.run_for(std::chrono::seconds(10));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
BOOST_CHECK(broker.received_all_expected());
}
BOOST_AUTO_TEST_SUITE_END()