From 53c33f23bb442fdfa1a4fe6f3dcf2a42d28a68ea Mon Sep 17 00:00:00 2001 From: Bruno Iljazovic Date: Tue, 15 Jul 2025 14:09:48 +0200 Subject: [PATCH] Send default Maximum Packet Size in the CONNECT packet Summary: #33 Reviewers: ivica Reviewed By: ivica Subscribers: korina, miljen Differential Revision: https://repo.mireo.local/D36253 --- doc/qbk/02_getting_started.qbk | 2 ++ .../reference/properties/connect_props.qbk | 2 +- example/hello_world_over_tcp.cpp | 1 + include/boost/mqtt5/detail/control_packet.hpp | 2 ++ include/boost/mqtt5/impl/assemble_op.hpp | 23 +++++++++++-------- include/boost/mqtt5/impl/connect_op.hpp | 2 ++ include/boost/mqtt5/impl/read_message_op.hpp | 13 +++++++++-- test/include/test_common/packet_util.hpp | 6 +++++ test/integration/async_sender.cpp | 2 +- test/integration/cancellation.cpp | 2 +- test/integration/client_functions.cpp | 8 +++---- test/integration/disconnect.cpp | 2 +- test/integration/executors.cpp | 2 +- test/integration/ping.cpp | 2 +- test/integration/re_authentication.cpp | 6 ++--- test/integration/read_message.cpp | 17 ++++++++------ test/integration/receive_publish.cpp | 2 +- test/integration/send_publish.cpp | 2 +- test/integration/sub_unsub.cpp | 2 +- test/unit/connect_op.cpp | 4 ++-- test/unit/logger.cpp | 2 +- test/unit/reconnect_op.cpp | 2 +- 22 files changed, 68 insertions(+), 38 deletions(-) diff --git a/doc/qbk/02_getting_started.qbk b/doc/qbk/02_getting_started.qbk index a7425df..838e0a6 100644 --- a/doc/qbk/02_getting_started.qbk +++ b/doc/qbk/02_getting_started.qbk @@ -53,6 +53,8 @@ Listing Brokers from different clusters may lead to inconsistencies between MQTT * *Assign a custom user-implemented authenticator:* The custom authentication will be used for __ENHANCED_AUTH__ ([refmem mqtt_client authenticator]). * *Defining CONNECT Packet Properties:* Specify properties that will be included in the __CONNECT__ packet sent during connection initiation (see [refmem mqtt_client connect_property] and [refmem mqtt_client connect_properties]). +[note If the user does not set the `maximum_packet_size` CONNECT property, it defaults to `64KB`. This is the only CONNECT property with a default value. ] + Firstly, we will specify the Broker we want to connect to using the [refmem mqtt_client brokers] function. This can be __HIVEMQ__'s public Broker available at `broker.hivemq.com`:1883. Additionally, we can set the Client Identifier using the [refmem mqtt_client credentials] function. This is not mandatory, as some Brokers allow anonymous connections. diff --git a/doc/qbk/reference/properties/connect_props.qbk b/doc/qbk/reference/properties/connect_props.qbk index 76df641..b70b6eb 100644 --- a/doc/qbk/reference/properties/connect_props.qbk +++ b/doc/qbk/reference/properties/connect_props.qbk @@ -16,7 +16,7 @@ Below is a list of possible __CONNECT__ Properties, along with descriptions of t [[Identifier] [Value type] [Description]] [[session_expiry_interval] [`uint32_t`] [Represents the Session Expiry Internal in seconds.]] [[receive_maximum] [`uint16_t`] [The maximum number of QoS 1 and QoS 2 publications that the Client is willing to process concurrently.]] - [[maximum_packet_size] [`uint32_t`] [The maximum __PACKET_SIZE__ in bytes as defined by the specification that the Client is willing to process.]] + [[maximum_packet_size] [`uint32_t`] [The maximum __PACKET_SIZE__ in bytes as defined by the specification that the Client is willing to process. [* Default: `65536` (`64KB`)]]] [[topic_alias_maximum] [`uint16_t`] [The highest value that the Client will accept as a Topic Alias sent by the Server.]] [[request_response_information] [`uint8_t`] [The value of 0 signals that the Server MUST NOT return Response Information in __CONNACK__. If the value if 1, it MAY return it.]] [[request_problem_information] [`uint8_t`] [The value of 0 signals that the Server MAY return a Reason String or User Properties on a __CONNACK__ or __DISCONNECT__ packet, diff --git a/example/hello_world_over_tcp.cpp b/example/hello_world_over_tcp.cpp index 074f7b4..45ceb9d 100644 --- a/example/hello_world_over_tcp.cpp +++ b/example/hello_world_over_tcp.cpp @@ -48,6 +48,7 @@ int main(int argc, char** argv) { //[configure_tcp_client client.brokers(cfg.brokers, cfg.port) // Set the Broker to connect to. .credentials(cfg.client_id) // Set the Client Identifier. (optional) + .connect_property(boost::mqtt5::prop::session_expiry_interval, 60) // optional .async_run(boost::asio::detached); // Start the Client. //] diff --git a/include/boost/mqtt5/detail/control_packet.hpp b/include/boost/mqtt5/detail/control_packet.hpp index 77299f1..f8a81d7 100644 --- a/include/boost/mqtt5/detail/control_packet.hpp +++ b/include/boost/mqtt5/detail/control_packet.hpp @@ -25,6 +25,8 @@ namespace boost::mqtt5::detail { /* max varint number (268'435'455) + fixed header size (1 + 4) */ static constexpr int32_t default_max_send_size = 268'435'460; +static constexpr int32_t default_max_recv_size = 65'536; + enum class control_code_e : std::uint8_t { no_packet = 0b00000000, // 0 diff --git a/include/boost/mqtt5/impl/assemble_op.hpp b/include/boost/mqtt5/impl/assemble_op.hpp index 5c690af..e8e8bfb 100644 --- a/include/boost/mqtt5/impl/assemble_op.hpp +++ b/include/boost/mqtt5/impl/assemble_op.hpp @@ -63,8 +63,6 @@ class assemble_op { struct on_read {}; - static constexpr uint32_t max_recv_size = 65'536; - client_service& _svc; handler_type _handler; @@ -102,9 +100,7 @@ public: _read_buff.erase( _read_buff.cbegin(), _data_span.first() ); - _read_buff.resize( - _svc.connect_property(prop::maximum_packet_size).value_or(max_recv_size) - ); + _read_buff.resize(max_recv_size()); _data_span = { _read_buff.cbegin(), _read_buff.cbegin() + _data_span.size() @@ -168,10 +164,11 @@ public: return complete(client::error::malformed_packet, 0, {}, {}); } - auto recv_size = _svc.connect_property(prop::maximum_packet_size) - .value_or(max_recv_size); - if (static_cast(*varlen) > recv_size - std::distance(_data_span.first(), first)) - return complete(client::error::malformed_packet, 0, {}, {}); + if ( + static_cast(*varlen) + > max_recv_size() - std::distance(_data_span.first(), first) + ) + return complete(client::error::packet_too_large, 0, {}, {}); if (std::distance(first, _data_span.last()) < *varlen) return perform(asio::transfer_at_least(1)); @@ -184,6 +181,14 @@ public: } private: + uint32_t max_recv_size() const { + return std::min( + _svc.connect_property(prop::maximum_packet_size) + .value_or(default_max_recv_size), + static_cast(default_max_send_size) + ); + } + duration compute_read_timeout() const { auto negotiated_ka = _svc.negotiated_keep_alive(); return negotiated_ka ? diff --git a/include/boost/mqtt5/impl/connect_op.hpp b/include/boost/mqtt5/impl/connect_op.hpp index 2bcfe6f..dd51fcf 100644 --- a/include/boost/mqtt5/impl/connect_op.hpp +++ b/include/boost/mqtt5/impl/connect_op.hpp @@ -215,6 +215,8 @@ public: } void send_connect() { + if (!_ctx.co_props[prop::maximum_packet_size].has_value()) + _ctx.co_props[prop::maximum_packet_size] = default_max_recv_size; auto packet = control_packet::of( no_pid, get_allocator(), encoders::encode_connect, diff --git a/include/boost/mqtt5/impl/read_message_op.hpp b/include/boost/mqtt5/impl/read_message_op.hpp index b43cd60..198274d 100644 --- a/include/boost/mqtt5/impl/read_message_op.hpp +++ b/include/boost/mqtt5/impl/read_message_op.hpp @@ -75,8 +75,14 @@ public: ) { if (ec == client::error::malformed_packet) return on_malformed_packet( + disconnect_rc_e::malformed_packet, "Malformed Packet received from the Server" ); + else if (ec == client::error::packet_too_large) + return on_malformed_packet( + disconnect_rc_e::packet_too_large, + "The packet size is greater than Maximum Packet Size" + ); if (ec == asio::error::no_recovery) _svc_ptr->cancel(); @@ -108,6 +114,7 @@ private: ); if (!msg.has_value()) return on_malformed_packet( + disconnect_rc_e::malformed_packet, "Malformed PUBLISH received: cannot decode" ); @@ -120,6 +127,7 @@ private: ); if (!rv.has_value()) return on_malformed_packet( + disconnect_rc_e::malformed_packet, "Malformed DISCONNECT received: cannot decode" ); @@ -140,6 +148,7 @@ private: ); if (!rv.has_value()) return on_malformed_packet( + disconnect_rc_e::malformed_packet, "Malformed AUTH received: cannot decode" ); @@ -153,13 +162,13 @@ private: perform(); } - void on_malformed_packet(const std::string& reason) { + void on_malformed_packet(disconnect_rc_e rc, const std::string& reason) { auto props = disconnect_props {}; props[prop::reason_string] = reason; auto svc_ptr = _svc_ptr; // copy before this is moved async_disconnect( - disconnect_rc_e::malformed_packet, props, svc_ptr, + rc, props, svc_ptr, asio::prepend(std::move(*this), on_disconnect {}) ); } diff --git a/test/include/test_common/packet_util.hpp b/test/include/test_common/packet_util.hpp index 8309ce1..a43158a 100644 --- a/test/include/test_common/packet_util.hpp +++ b/test/include/test_common/packet_util.hpp @@ -393,6 +393,12 @@ inline disconnect_props dprops_with_reason_string(const std::string& reason_stri return dprops; } +static constexpr auto dflt_cprops = std::invoke([] { + connect_props ret; + ret[prop::maximum_packet_size] = ::boost::mqtt5::detail::default_max_recv_size; + return ret; +}); + } // end namespace boost::mqtt5::test #endif // BOOST_MQTT5_TEST_PACKET_UTIL_HPP diff --git a/test/integration/async_sender.cpp b/test/integration/async_sender.cpp index f3759c8..b283a97 100644 --- a/test/integration/async_sender.cpp +++ b/test/integration/async_sender.cpp @@ -35,7 +35,7 @@ struct shared_test_data { const std::string payload = "payload"; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( false, reason_codes::success.value(), {} diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index e0da565..6df43ff 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -255,7 +255,7 @@ struct shared_test_data { error_code fail = asio::error::not_connected; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( false, reason_codes::success.value(), {} diff --git a/test/integration/client_functions.cpp b/test/integration/client_functions.cpp index 8273605..5182328 100644 --- a/test/integration/client_functions.cpp +++ b/test/integration/client_functions.cpp @@ -89,7 +89,7 @@ BOOST_FIXTURE_TEST_CASE(assign_credentials, shared_test_data) { std::string password = "password"; auto connect = encoders::encode_connect( - client_id, username, password, 60, false, {}, std::nullopt + client_id, username, password, 60, false, test::dflt_cprops, std::nullopt ); test::msg_exchange broker_side; @@ -111,7 +111,7 @@ BOOST_FIXTURE_TEST_CASE(assign_will, shared_test_data) { std::optional will_opt { std::move(w) }; auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, will_opt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, will_opt ); test::msg_exchange broker_side; @@ -143,7 +143,7 @@ BOOST_FIXTURE_TEST_CASE(assign_keep_alive, shared_test_data) { uint16_t keep_alive = 120; auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, keep_alive, false, {}, std::nullopt + "", std::nullopt, std::nullopt, keep_alive, false, test::dflt_cprops, std::nullopt ); test::msg_exchange broker_side; @@ -259,7 +259,7 @@ struct shared_connack_prop_test_data { // packets const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( false, reason_codes::success.value(), cprops diff --git a/test/integration/disconnect.cpp b/test/integration/disconnect.cpp index f5d0f92..0fd0710 100644 --- a/test/integration/disconnect.cpp +++ b/test/integration/disconnect.cpp @@ -30,7 +30,7 @@ struct shared_test_data { error_code fail = asio::error::not_connected; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( true, reason_codes::success.value(), {} diff --git a/test/integration/executors.cpp b/test/integration/executors.cpp index eee2211..d8a307d 100644 --- a/test/integration/executors.cpp +++ b/test/integration/executors.cpp @@ -44,7 +44,7 @@ void run_test( // packets auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); auto connack = encoders::encode_connack(false, reason_codes::success.value(), {}); auto publish_0 = encoders::encode_publish( diff --git a/test/integration/ping.cpp b/test/integration/ping.cpp index 90ea5c6..d488804 100644 --- a/test/integration/ping.cpp +++ b/test/integration/ping.cpp @@ -43,7 +43,7 @@ using namespace std::chrono_literals; std::string connect_with_keep_alive(uint16_t keep_alive) { return encoders::encode_connect( - "", std::nullopt, std::nullopt, keep_alive, false, {}, std::nullopt + "", std::nullopt, std::nullopt, keep_alive, false, test::dflt_cprops, std::nullopt ); } diff --git a/test/integration/re_authentication.cpp b/test/integration/re_authentication.cpp index a318edf..01ecc8e 100644 --- a/test/integration/re_authentication.cpp +++ b/test/integration/re_authentication.cpp @@ -51,7 +51,7 @@ struct shared_test_data { ); connect_props init_connect_props() { - connect_props cprops; + auto cprops = test::dflt_cprops; cprops[prop::authentication_method] = "method"; cprops[prop::authentication_data] = ""; return cprops; @@ -226,7 +226,7 @@ BOOST_FIXTURE_TEST_CASE(async_auth_fail, shared_test_data) { BOOST_FIXTURE_TEST_CASE(unexpected_auth, shared_test_data) { auto connect_no_auth = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); auto disconnect = encoders::encode_disconnect( reason_codes::protocol_error.value(), @@ -249,7 +249,7 @@ BOOST_FIXTURE_TEST_CASE(unexpected_auth, shared_test_data) { BOOST_FIXTURE_TEST_CASE(re_auth_without_authenticator, shared_test_data) { auto connect_no_auth = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); test::msg_exchange broker_side; diff --git a/test/integration/read_message.cpp b/test/integration/read_message.cpp index ddd13ab..ae92fde 100644 --- a/test/integration/read_message.cpp +++ b/test/integration/read_message.cpp @@ -30,11 +30,11 @@ using test::after; using namespace std::chrono_literals; void test_receive_malformed_packet( - std::string malformed_packet, std::string reason_string + std::string malformed_packet, reason_code rc, std::string reason_string ) { // packets auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); connack_props co_props; co_props[prop::maximum_packet_size] = 2000; @@ -42,9 +42,7 @@ void test_receive_malformed_packet( disconnect_props dc_props; dc_props[prop::reason_string] = reason_string; - auto disconnect = encoders::encode_disconnect( - reason_codes::malformed_packet.value(), dc_props - ); + auto disconnect = encoders::encode_disconnect(rc.value(), dc_props); error_code success {}; test::msg_exchange broker_side; @@ -82,6 +80,7 @@ void test_receive_malformed_packet( BOOST_AUTO_TEST_CASE(forbidden_packet_type) { test_receive_malformed_packet( std::string({ 0x00 }), + reason_codes::malformed_packet, "Malformed Packet received from the Server" ); } @@ -89,6 +88,7 @@ BOOST_AUTO_TEST_CASE(forbidden_packet_type) { BOOST_AUTO_TEST_CASE(malformed_varint) { test_receive_malformed_packet( std::string({ 0x10, -1 /* 0xFF */, -1, -1, -1 }), + reason_codes::malformed_packet, "Malformed Packet received from the Server" ); } @@ -96,6 +96,7 @@ BOOST_AUTO_TEST_CASE(malformed_varint) { BOOST_AUTO_TEST_CASE(malformed_fixed_header) { test_receive_malformed_packet( std::string({ 0x60, 1, 0 }), + reason_codes::malformed_packet, "Malformed Packet received from the Server" ); } @@ -103,13 +104,15 @@ BOOST_AUTO_TEST_CASE(malformed_fixed_header) { BOOST_AUTO_TEST_CASE(packet_larger_than_allowed) { test_receive_malformed_packet( std::string({ 0x10, -1, -1, -1, 0 }), - "Malformed Packet received from the Server" + reason_codes::packet_too_large, + "The packet size is greater than Maximum Packet Size" ); } BOOST_AUTO_TEST_CASE(receive_malformed_publish) { test_receive_malformed_packet( std::string({ 0x30, 1, -1 }), + reason_codes::malformed_packet, "Malformed PUBLISH received: cannot decode" ); } @@ -122,7 +125,7 @@ struct shared_test_data { const std::string payload = "payload"; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack(false, uint8_t(0x00), {}); diff --git a/test/integration/receive_publish.cpp b/test/integration/receive_publish.cpp index e0e1ad5..2aa2683 100644 --- a/test/integration/receive_publish.cpp +++ b/test/integration/receive_publish.cpp @@ -33,7 +33,7 @@ struct shared_test_data { error_code fail = asio::error::not_connected; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( true, reason_codes::success.value(), {} diff --git a/test/integration/send_publish.cpp b/test/integration/send_publish.cpp index 1d63ab5..9b77537 100644 --- a/test/integration/send_publish.cpp +++ b/test/integration/send_publish.cpp @@ -33,7 +33,7 @@ struct shared_test_data { error_code fail = asio::error::not_connected; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( false, reason_codes::success.value(), {} diff --git a/test/integration/sub_unsub.cpp b/test/integration/sub_unsub.cpp index 071414e..7392307 100644 --- a/test/integration/sub_unsub.cpp +++ b/test/integration/sub_unsub.cpp @@ -43,7 +43,7 @@ struct shared_test_data { error_code fail = asio::error::not_connected; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( false, reason_codes::success.value(), {} diff --git a/test/unit/connect_op.cpp b/test/unit/connect_op.cpp index d6b08f8..0134337 100644 --- a/test/unit/connect_op.cpp +++ b/test/unit/connect_op.cpp @@ -35,7 +35,7 @@ struct shared_test_data { error_code fail = asio::error::not_connected; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( true, reason_codes::success.value(), {} @@ -246,7 +246,7 @@ struct shared_test_auth_data { const std::string auth_response = auth_challenge; connect_props init_connect_props() { - connect_props cprops; + auto cprops = test::dflt_cprops; cprops[prop::authentication_method] = "method"; cprops[prop::authentication_data] = ""; return cprops; diff --git a/test/unit/logger.cpp b/test/unit/logger.cpp index 3376529..023abd9 100644 --- a/test/unit/logger.cpp +++ b/test/unit/logger.cpp @@ -439,7 +439,7 @@ BOOST_AUTO_TEST_CASE(client_disconnect) { // packets auto connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); auto connack = encoders::encode_connack(false, uint8_t(0x00), {}); diff --git a/test/unit/reconnect_op.cpp b/test/unit/reconnect_op.cpp index 724b53f..3a9ea4b 100644 --- a/test/unit/reconnect_op.cpp +++ b/test/unit/reconnect_op.cpp @@ -98,7 +98,7 @@ void run_connect_to_localhost_test(int succeed_after) { error_code success {}; const std::string connect = encoders::encode_connect( - "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + "", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt ); const std::string connack = encoders::encode_connack( true, reason_codes::success.value(), {}