Send default Maximum Packet Size in the CONNECT packet

Summary: #33

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina, miljen

Differential Revision: https://repo.mireo.local/D36253
This commit is contained in:
Bruno Iljazovic
2025-07-15 14:09:48 +02:00
parent 272217cbb3
commit 53c33f23bb
22 changed files with 68 additions and 38 deletions

View File

@ -53,6 +53,8 @@ Listing Brokers from different clusters may lead to inconsistencies between MQTT
* *Assign a custom user-implemented authenticator:* The custom authentication will be used for __ENHANCED_AUTH__ ([refmem mqtt_client authenticator]).
* *Defining CONNECT Packet Properties:* Specify properties that will be included in the __CONNECT__ packet sent during connection initiation (see [refmem mqtt_client connect_property] and [refmem mqtt_client connect_properties]).
[note If the user does not set the `maximum_packet_size` CONNECT property, it defaults to `64KB`. This is the only CONNECT property with a default value. ]
Firstly, we will specify the Broker we want to connect to using the [refmem mqtt_client brokers] function.
This can be __HIVEMQ__'s public Broker available at `broker.hivemq.com`:1883.
Additionally, we can set the Client Identifier using the [refmem mqtt_client credentials] function. This is not mandatory, as some Brokers allow anonymous connections.

View File

@ -16,7 +16,7 @@ Below is a list of possible __CONNECT__ Properties, along with descriptions of t
[[Identifier] [Value type] [Description]]
[[session_expiry_interval] [`uint32_t`] [Represents the Session Expiry Internal in seconds.]]
[[receive_maximum] [`uint16_t`] [The maximum number of QoS 1 and QoS 2 publications that the Client is willing to process concurrently.]]
[[maximum_packet_size] [`uint32_t`] [The maximum __PACKET_SIZE__ in bytes as defined by the specification that the Client is willing to process.]]
[[maximum_packet_size] [`uint32_t`] [The maximum __PACKET_SIZE__ in bytes as defined by the specification that the Client is willing to process. [* Default: `65536` (`64KB`)]]]
[[topic_alias_maximum] [`uint16_t`] [The highest value that the Client will accept as a Topic Alias sent by the Server.]]
[[request_response_information] [`uint8_t`] [The value of 0 signals that the Server MUST NOT return Response Information in __CONNACK__. If the value if 1, it MAY return it.]]
[[request_problem_information] [`uint8_t`] [The value of 0 signals that the Server MAY return a Reason String or User Properties on a __CONNACK__ or __DISCONNECT__ packet,

View File

@ -48,6 +48,7 @@ int main(int argc, char** argv) {
//[configure_tcp_client
client.brokers(cfg.brokers, cfg.port) // Set the Broker to connect to.
.credentials(cfg.client_id) // Set the Client Identifier. (optional)
.connect_property(boost::mqtt5::prop::session_expiry_interval, 60) // optional
.async_run(boost::asio::detached); // Start the Client.
//]

View File

@ -25,6 +25,8 @@ namespace boost::mqtt5::detail {
/* max varint number (268'435'455) + fixed header size (1 + 4) */
static constexpr int32_t default_max_send_size = 268'435'460;
static constexpr int32_t default_max_recv_size = 65'536;
enum class control_code_e : std::uint8_t {
no_packet = 0b00000000, // 0

View File

@ -63,8 +63,6 @@ class assemble_op {
struct on_read {};
static constexpr uint32_t max_recv_size = 65'536;
client_service& _svc;
handler_type _handler;
@ -102,9 +100,7 @@ public:
_read_buff.erase(
_read_buff.cbegin(), _data_span.first()
);
_read_buff.resize(
_svc.connect_property(prop::maximum_packet_size).value_or(max_recv_size)
);
_read_buff.resize(max_recv_size());
_data_span = {
_read_buff.cbegin(),
_read_buff.cbegin() + _data_span.size()
@ -168,10 +164,11 @@ public:
return complete(client::error::malformed_packet, 0, {}, {});
}
auto recv_size = _svc.connect_property(prop::maximum_packet_size)
.value_or(max_recv_size);
if (static_cast<uint32_t>(*varlen) > recv_size - std::distance(_data_span.first(), first))
return complete(client::error::malformed_packet, 0, {}, {});
if (
static_cast<uint32_t>(*varlen)
> max_recv_size() - std::distance(_data_span.first(), first)
)
return complete(client::error::packet_too_large, 0, {}, {});
if (std::distance(first, _data_span.last()) < *varlen)
return perform(asio::transfer_at_least(1));
@ -184,6 +181,14 @@ public:
}
private:
uint32_t max_recv_size() const {
return std::min(
_svc.connect_property(prop::maximum_packet_size)
.value_or(default_max_recv_size),
static_cast<uint32_t>(default_max_send_size)
);
}
duration compute_read_timeout() const {
auto negotiated_ka = _svc.negotiated_keep_alive();
return negotiated_ka ?

View File

@ -215,6 +215,8 @@ public:
}
void send_connect() {
if (!_ctx.co_props[prop::maximum_packet_size].has_value())
_ctx.co_props[prop::maximum_packet_size] = default_max_recv_size;
auto packet = control_packet<allocator_type>::of(
no_pid, get_allocator(),
encoders::encode_connect,

View File

@ -75,8 +75,14 @@ public:
) {
if (ec == client::error::malformed_packet)
return on_malformed_packet(
disconnect_rc_e::malformed_packet,
"Malformed Packet received from the Server"
);
else if (ec == client::error::packet_too_large)
return on_malformed_packet(
disconnect_rc_e::packet_too_large,
"The packet size is greater than Maximum Packet Size"
);
if (ec == asio::error::no_recovery)
_svc_ptr->cancel();
@ -108,6 +114,7 @@ private:
);
if (!msg.has_value())
return on_malformed_packet(
disconnect_rc_e::malformed_packet,
"Malformed PUBLISH received: cannot decode"
);
@ -120,6 +127,7 @@ private:
);
if (!rv.has_value())
return on_malformed_packet(
disconnect_rc_e::malformed_packet,
"Malformed DISCONNECT received: cannot decode"
);
@ -140,6 +148,7 @@ private:
);
if (!rv.has_value())
return on_malformed_packet(
disconnect_rc_e::malformed_packet,
"Malformed AUTH received: cannot decode"
);
@ -153,13 +162,13 @@ private:
perform();
}
void on_malformed_packet(const std::string& reason) {
void on_malformed_packet(disconnect_rc_e rc, const std::string& reason) {
auto props = disconnect_props {};
props[prop::reason_string] = reason;
auto svc_ptr = _svc_ptr; // copy before this is moved
async_disconnect(
disconnect_rc_e::malformed_packet, props, svc_ptr,
rc, props, svc_ptr,
asio::prepend(std::move(*this), on_disconnect {})
);
}

View File

@ -393,6 +393,12 @@ inline disconnect_props dprops_with_reason_string(const std::string& reason_stri
return dprops;
}
static constexpr auto dflt_cprops = std::invoke([] {
connect_props ret;
ret[prop::maximum_packet_size] = ::boost::mqtt5::detail::default_max_recv_size;
return ret;
});
} // end namespace boost::mqtt5::test
#endif // BOOST_MQTT5_TEST_PACKET_UTIL_HPP

View File

@ -35,7 +35,7 @@ struct shared_test_data {
const std::string payload = "payload";
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
false, reason_codes::success.value(), {}

View File

@ -255,7 +255,7 @@ struct shared_test_data {
error_code fail = asio::error::not_connected;
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
false, reason_codes::success.value(), {}

View File

@ -89,7 +89,7 @@ BOOST_FIXTURE_TEST_CASE(assign_credentials, shared_test_data) {
std::string password = "password";
auto connect = encoders::encode_connect(
client_id, username, password, 60, false, {}, std::nullopt
client_id, username, password, 60, false, test::dflt_cprops, std::nullopt
);
test::msg_exchange broker_side;
@ -111,7 +111,7 @@ BOOST_FIXTURE_TEST_CASE(assign_will, shared_test_data) {
std::optional<will> will_opt { std::move(w) };
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, will_opt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, will_opt
);
test::msg_exchange broker_side;
@ -143,7 +143,7 @@ BOOST_FIXTURE_TEST_CASE(assign_keep_alive, shared_test_data) {
uint16_t keep_alive = 120;
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, keep_alive, false, {}, std::nullopt
"", std::nullopt, std::nullopt, keep_alive, false, test::dflt_cprops, std::nullopt
);
test::msg_exchange broker_side;
@ -259,7 +259,7 @@ struct shared_connack_prop_test_data {
// packets
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
false, reason_codes::success.value(), cprops

View File

@ -30,7 +30,7 @@ struct shared_test_data {
error_code fail = asio::error::not_connected;
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
true, reason_codes::success.value(), {}

View File

@ -44,7 +44,7 @@ void run_test(
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
auto connack = encoders::encode_connack(false, reason_codes::success.value(), {});
auto publish_0 = encoders::encode_publish(

View File

@ -43,7 +43,7 @@ using namespace std::chrono_literals;
std::string connect_with_keep_alive(uint16_t keep_alive) {
return encoders::encode_connect(
"", std::nullopt, std::nullopt, keep_alive, false, {}, std::nullopt
"", std::nullopt, std::nullopt, keep_alive, false, test::dflt_cprops, std::nullopt
);
}

View File

@ -51,7 +51,7 @@ struct shared_test_data {
);
connect_props init_connect_props() {
connect_props cprops;
auto cprops = test::dflt_cprops;
cprops[prop::authentication_method] = "method";
cprops[prop::authentication_data] = "";
return cprops;
@ -226,7 +226,7 @@ BOOST_FIXTURE_TEST_CASE(async_auth_fail, shared_test_data) {
BOOST_FIXTURE_TEST_CASE(unexpected_auth, shared_test_data) {
auto connect_no_auth = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
auto disconnect = encoders::encode_disconnect(
reason_codes::protocol_error.value(),
@ -249,7 +249,7 @@ BOOST_FIXTURE_TEST_CASE(unexpected_auth, shared_test_data) {
BOOST_FIXTURE_TEST_CASE(re_auth_without_authenticator, shared_test_data) {
auto connect_no_auth = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
test::msg_exchange broker_side;

View File

@ -30,11 +30,11 @@ using test::after;
using namespace std::chrono_literals;
void test_receive_malformed_packet(
std::string malformed_packet, std::string reason_string
std::string malformed_packet, reason_code rc, std::string reason_string
) {
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
connack_props co_props;
co_props[prop::maximum_packet_size] = 2000;
@ -42,9 +42,7 @@ void test_receive_malformed_packet(
disconnect_props dc_props;
dc_props[prop::reason_string] = reason_string;
auto disconnect = encoders::encode_disconnect(
reason_codes::malformed_packet.value(), dc_props
);
auto disconnect = encoders::encode_disconnect(rc.value(), dc_props);
error_code success {};
test::msg_exchange broker_side;
@ -82,6 +80,7 @@ void test_receive_malformed_packet(
BOOST_AUTO_TEST_CASE(forbidden_packet_type) {
test_receive_malformed_packet(
std::string({ 0x00 }),
reason_codes::malformed_packet,
"Malformed Packet received from the Server"
);
}
@ -89,6 +88,7 @@ BOOST_AUTO_TEST_CASE(forbidden_packet_type) {
BOOST_AUTO_TEST_CASE(malformed_varint) {
test_receive_malformed_packet(
std::string({ 0x10, -1 /* 0xFF */, -1, -1, -1 }),
reason_codes::malformed_packet,
"Malformed Packet received from the Server"
);
}
@ -96,6 +96,7 @@ BOOST_AUTO_TEST_CASE(malformed_varint) {
BOOST_AUTO_TEST_CASE(malformed_fixed_header) {
test_receive_malformed_packet(
std::string({ 0x60, 1, 0 }),
reason_codes::malformed_packet,
"Malformed Packet received from the Server"
);
}
@ -103,13 +104,15 @@ BOOST_AUTO_TEST_CASE(malformed_fixed_header) {
BOOST_AUTO_TEST_CASE(packet_larger_than_allowed) {
test_receive_malformed_packet(
std::string({ 0x10, -1, -1, -1, 0 }),
"Malformed Packet received from the Server"
reason_codes::packet_too_large,
"The packet size is greater than Maximum Packet Size"
);
}
BOOST_AUTO_TEST_CASE(receive_malformed_publish) {
test_receive_malformed_packet(
std::string({ 0x30, 1, -1 }),
reason_codes::malformed_packet,
"Malformed PUBLISH received: cannot decode"
);
}
@ -122,7 +125,7 @@ struct shared_test_data {
const std::string payload = "payload";
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(false, uint8_t(0x00), {});

View File

@ -33,7 +33,7 @@ struct shared_test_data {
error_code fail = asio::error::not_connected;
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
true, reason_codes::success.value(), {}

View File

@ -33,7 +33,7 @@ struct shared_test_data {
error_code fail = asio::error::not_connected;
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
false, reason_codes::success.value(), {}

View File

@ -43,7 +43,7 @@ struct shared_test_data {
error_code fail = asio::error::not_connected;
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
false, reason_codes::success.value(), {}

View File

@ -35,7 +35,7 @@ struct shared_test_data {
error_code fail = asio::error::not_connected;
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
true, reason_codes::success.value(), {}
@ -246,7 +246,7 @@ struct shared_test_auth_data {
const std::string auth_response = auth_challenge;
connect_props init_connect_props() {
connect_props cprops;
auto cprops = test::dflt_cprops;
cprops[prop::authentication_method] = "method";
cprops[prop::authentication_data] = "";
return cprops;

View File

@ -439,7 +439,7 @@ BOOST_AUTO_TEST_CASE(client_disconnect) {
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
auto connack = encoders::encode_connack(false, uint8_t(0x00), {});

View File

@ -98,7 +98,7 @@ void run_connect_to_localhost_test(int succeed_after) {
error_code success {};
const std::string connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
);
const std::string connack = encoders::encode_connack(
true, reason_codes::success.value(), {}