From 7c0b9042d119d6a78b1ae5f674ac5dc5e3790404 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Fri, 22 Dec 2023 13:48:26 +0100 Subject: [PATCH] Allow user to modify CONNECT properties Summary: related to T13332 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27108 --- example/tcp.cpp | 4 +++ include/async_mqtt5/detail/control_packet.hpp | 3 ++- include/async_mqtt5/impl/assemble_op.hpp | 11 ++++---- include/async_mqtt5/impl/client_service.hpp | 27 +++++++++++++++++++ include/async_mqtt5/impl/disconnect_op.hpp | 2 +- include/async_mqtt5/impl/publish_send_op.hpp | 2 +- include/async_mqtt5/impl/subscribe_op.hpp | 2 +- include/async_mqtt5/impl/unsubscribe_op.hpp | 4 +-- include/async_mqtt5/mqtt_client.hpp | 5 ++++ 9 files changed, 49 insertions(+), 11 deletions(-) diff --git a/example/tcp.cpp b/example/tcp.cpp index 8dff1d4..c501323 100644 --- a/example/tcp.cpp +++ b/example/tcp.cpp @@ -17,9 +17,13 @@ void publish_qos0_tcp() { using client_type = mqtt_client; client_type c(ioc, ""); + connect_props props; + props[prop::maximum_packet_size] = 1024; + c.credentials("test-qos0-tcp", "", "") .brokers("emqtt.mireo.local", 1883) .will({ "test/mqtt-test", "Client disconnected!",qos_e::at_least_once }) + .connect_properties(std::move(props)) .run(); c.async_publish( diff --git a/include/async_mqtt5/detail/control_packet.hpp b/include/async_mqtt5/detail/control_packet.hpp index ff61bf8..8b5c2e5 100644 --- a/include/async_mqtt5/detail/control_packet.hpp +++ b/include/async_mqtt5/detail/control_packet.hpp @@ -9,7 +9,8 @@ namespace async_mqtt5 { -static constexpr int32_t default_max_packet_size = 65'536; +/* max varint number (268'435'455) + fixed header size (1 + 4) */ +static constexpr int32_t default_max_send_size = 268'435'460; enum class control_code_e : std::uint8_t { no_packet = 0b00000000, // 0 diff --git a/include/async_mqtt5/impl/assemble_op.hpp b/include/async_mqtt5/impl/assemble_op.hpp index 9092ea2..65640aa 100644 --- a/include/async_mqtt5/impl/assemble_op.hpp +++ b/include/async_mqtt5/impl/assemble_op.hpp @@ -52,7 +52,7 @@ class assemble_op { struct on_read {}; - static constexpr size_t max_packet_size = default_max_packet_size; + static constexpr size_t max_recv_size = 65'536; client_service& _svc; handler_type _handler; @@ -88,8 +88,9 @@ public: _read_buff.erase( _read_buff.cbegin(), _data_span.first() ); - // TODO: respect max packet size from CONNACK - _read_buff.resize(max_packet_size); + _read_buff.resize( + _svc.connect_prop(prop::maximum_packet_size).value_or(max_recv_size) + ); _data_span = { _read_buff.cbegin(), _read_buff.cbegin() + _data_span.size() @@ -155,8 +156,8 @@ public: return complete(client::error::malformed_packet, 0, {}, {}); } - // TODO: respect max packet size which could be dinamically set by the broker - if (*varlen > max_packet_size - std::distance(_data_span.first(), first)) + auto recv_size = _svc.connect_prop(prop::maximum_packet_size).value_or(max_recv_size); + if (*varlen > recv_size - std::distance(_data_span.first(), first)) return complete(client::error::malformed_packet, 0, {}, {}); if (std::distance(first, _data_span.last()) < *varlen) diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index 092290e..7b7f57d 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -70,6 +70,15 @@ public: return _mqtt_context.ca_props; } + template + const auto& connect_prop(Prop p) const { + return _mqtt_context.co_props[p]; + } + + void connect_props(connect_props props) { + _mqtt_context.co_props = std::move(props); + } + void credentials( std::string client_id, std::string username = "", std::string password = "" @@ -122,6 +131,15 @@ public: return _mqtt_context.ca_props; } + template + const auto& connect_prop(Prop p) const { + return _mqtt_context.co_props[p]; + } + + void connect_props(connect_props props) { + _mqtt_context.co_props = std::move(props); + } + void credentials( std::string client_id, std::string username = "", std::string password = "" @@ -248,6 +266,15 @@ public: ); } + template + const auto& connect_prop(Prop p) const { + return _stream_context.connect_prop(p); + } + + void connect_props(connect_props props) { + if (!is_open()) + _stream_context.connect_props(std::move(props)); + } template const auto& connack_prop(Prop p) const { diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index 67d6ecd..c90e0c8 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -73,7 +73,7 @@ public: auto max_packet_size = _svc_ptr->connack_prop( prop::maximum_packet_size - ).value_or(default_max_packet_size); + ).value_or(default_max_send_size); if (disconnect.size() > max_packet_size) // drop properties return send_disconnect(control_packet::of( diff --git a/include/async_mqtt5/impl/publish_send_op.hpp b/include/async_mqtt5/impl/publish_send_op.hpp index ef48146..557fcef 100644 --- a/include/async_mqtt5/impl/publish_send_op.hpp +++ b/include/async_mqtt5/impl/publish_send_op.hpp @@ -121,7 +121,7 @@ public: ); auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size) - .value_or(default_max_packet_size); + .value_or(default_max_send_size); if (publish.size() > max_packet_size) return complete_post(client::error::packet_too_large, packet_id); diff --git a/include/async_mqtt5/impl/subscribe_op.hpp b/include/async_mqtt5/impl/subscribe_op.hpp index cde4355..5ca6cf8 100644 --- a/include/async_mqtt5/impl/subscribe_op.hpp +++ b/include/async_mqtt5/impl/subscribe_op.hpp @@ -81,7 +81,7 @@ public: ); auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size) - .value_or(default_max_packet_size); + .value_or(default_max_send_size); if (subscribe.size() > max_packet_size) return complete_post( client::error::packet_too_large, packet_id, topics.size() diff --git a/include/async_mqtt5/impl/unsubscribe_op.hpp b/include/async_mqtt5/impl/unsubscribe_op.hpp index b5f72a1..ef9fe1d 100644 --- a/include/async_mqtt5/impl/unsubscribe_op.hpp +++ b/include/async_mqtt5/impl/unsubscribe_op.hpp @@ -77,10 +77,10 @@ public: auto max_packet_size = _svc_ptr->connack_prop( prop::maximum_packet_size - ).value_or(default_max_packet_size); + ).value_or(default_max_send_size); if (unsubscribe.size() > max_packet_size) return complete_post( - client::error::packet_too_large, packet_id, topics.size() + client::error::packet_too_large, packet_id, topics.size() ); send_unsubscribe(std::move(unsubscribe)); diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index cb5ba58..0120b67 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -274,6 +274,11 @@ public: return *this; } + mqtt_client& connect_properties(connect_props props) { + _svc_ptr->connect_props(std::move(props)); + return *this; + } + /** * \brief Initiates [mqttlink 3901257 Re-authentication] * using the authenticator given in the \ref authenticator method.