forked from boostorg/mqtt5
Allow user to modify CONNECT properties
Summary: related to T13332 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27108
This commit is contained in:
@@ -17,9 +17,13 @@ void publish_qos0_tcp() {
|
|||||||
using client_type = mqtt_client<stream_type>;
|
using client_type = mqtt_client<stream_type>;
|
||||||
client_type c(ioc, "");
|
client_type c(ioc, "");
|
||||||
|
|
||||||
|
connect_props props;
|
||||||
|
props[prop::maximum_packet_size] = 1024;
|
||||||
|
|
||||||
c.credentials("test-qos0-tcp", "", "")
|
c.credentials("test-qos0-tcp", "", "")
|
||||||
.brokers("emqtt.mireo.local", 1883)
|
.brokers("emqtt.mireo.local", 1883)
|
||||||
.will({ "test/mqtt-test", "Client disconnected!",qos_e::at_least_once })
|
.will({ "test/mqtt-test", "Client disconnected!",qos_e::at_least_once })
|
||||||
|
.connect_properties(std::move(props))
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
c.async_publish<qos_e::at_most_once>(
|
c.async_publish<qos_e::at_most_once>(
|
||||||
|
@@ -9,7 +9,8 @@
|
|||||||
|
|
||||||
namespace async_mqtt5 {
|
namespace async_mqtt5 {
|
||||||
|
|
||||||
static constexpr int32_t default_max_packet_size = 65'536;
|
/* max varint number (268'435'455) + fixed header size (1 + 4) */
|
||||||
|
static constexpr int32_t default_max_send_size = 268'435'460;
|
||||||
|
|
||||||
enum class control_code_e : std::uint8_t {
|
enum class control_code_e : std::uint8_t {
|
||||||
no_packet = 0b00000000, // 0
|
no_packet = 0b00000000, // 0
|
||||||
|
@@ -52,7 +52,7 @@ class assemble_op {
|
|||||||
|
|
||||||
struct on_read {};
|
struct on_read {};
|
||||||
|
|
||||||
static constexpr size_t max_packet_size = default_max_packet_size;
|
static constexpr size_t max_recv_size = 65'536;
|
||||||
|
|
||||||
client_service& _svc;
|
client_service& _svc;
|
||||||
handler_type _handler;
|
handler_type _handler;
|
||||||
@@ -88,8 +88,9 @@ public:
|
|||||||
_read_buff.erase(
|
_read_buff.erase(
|
||||||
_read_buff.cbegin(), _data_span.first()
|
_read_buff.cbegin(), _data_span.first()
|
||||||
);
|
);
|
||||||
// TODO: respect max packet size from CONNACK
|
_read_buff.resize(
|
||||||
_read_buff.resize(max_packet_size);
|
_svc.connect_prop(prop::maximum_packet_size).value_or(max_recv_size)
|
||||||
|
);
|
||||||
_data_span = {
|
_data_span = {
|
||||||
_read_buff.cbegin(),
|
_read_buff.cbegin(),
|
||||||
_read_buff.cbegin() + _data_span.size()
|
_read_buff.cbegin() + _data_span.size()
|
||||||
@@ -155,8 +156,8 @@ public:
|
|||||||
return complete(client::error::malformed_packet, 0, {}, {});
|
return complete(client::error::malformed_packet, 0, {}, {});
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: respect max packet size which could be dinamically set by the broker
|
auto recv_size = _svc.connect_prop(prop::maximum_packet_size).value_or(max_recv_size);
|
||||||
if (*varlen > max_packet_size - std::distance(_data_span.first(), first))
|
if (*varlen > recv_size - std::distance(_data_span.first(), first))
|
||||||
return complete(client::error::malformed_packet, 0, {}, {});
|
return complete(client::error::malformed_packet, 0, {}, {});
|
||||||
|
|
||||||
if (std::distance(first, _data_span.last()) < *varlen)
|
if (std::distance(first, _data_span.last()) < *varlen)
|
||||||
|
@@ -70,6 +70,15 @@ public:
|
|||||||
return _mqtt_context.ca_props;
|
return _mqtt_context.ca_props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename Prop>
|
||||||
|
const auto& connect_prop(Prop p) const {
|
||||||
|
return _mqtt_context.co_props[p];
|
||||||
|
}
|
||||||
|
|
||||||
|
void connect_props(connect_props props) {
|
||||||
|
_mqtt_context.co_props = std::move(props);
|
||||||
|
}
|
||||||
|
|
||||||
void credentials(
|
void credentials(
|
||||||
std::string client_id,
|
std::string client_id,
|
||||||
std::string username = "", std::string password = ""
|
std::string username = "", std::string password = ""
|
||||||
@@ -122,6 +131,15 @@ public:
|
|||||||
return _mqtt_context.ca_props;
|
return _mqtt_context.ca_props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename Prop>
|
||||||
|
const auto& connect_prop(Prop p) const {
|
||||||
|
return _mqtt_context.co_props[p];
|
||||||
|
}
|
||||||
|
|
||||||
|
void connect_props(connect_props props) {
|
||||||
|
_mqtt_context.co_props = std::move(props);
|
||||||
|
}
|
||||||
|
|
||||||
void credentials(
|
void credentials(
|
||||||
std::string client_id,
|
std::string client_id,
|
||||||
std::string username = "", std::string password = ""
|
std::string username = "", std::string password = ""
|
||||||
@@ -248,6 +266,15 @@ public:
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename Prop>
|
||||||
|
const auto& connect_prop(Prop p) const {
|
||||||
|
return _stream_context.connect_prop(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
void connect_props(connect_props props) {
|
||||||
|
if (!is_open())
|
||||||
|
_stream_context.connect_props(std::move(props));
|
||||||
|
}
|
||||||
|
|
||||||
template <typename Prop>
|
template <typename Prop>
|
||||||
const auto& connack_prop(Prop p) const {
|
const auto& connack_prop(Prop p) const {
|
||||||
|
@@ -73,7 +73,7 @@ public:
|
|||||||
|
|
||||||
auto max_packet_size = _svc_ptr->connack_prop(
|
auto max_packet_size = _svc_ptr->connack_prop(
|
||||||
prop::maximum_packet_size
|
prop::maximum_packet_size
|
||||||
).value_or(default_max_packet_size);
|
).value_or(default_max_send_size);
|
||||||
if (disconnect.size() > max_packet_size)
|
if (disconnect.size() > max_packet_size)
|
||||||
// drop properties
|
// drop properties
|
||||||
return send_disconnect(control_packet<allocator_type>::of(
|
return send_disconnect(control_packet<allocator_type>::of(
|
||||||
|
@@ -121,7 +121,7 @@ public:
|
|||||||
);
|
);
|
||||||
|
|
||||||
auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size)
|
auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size)
|
||||||
.value_or(default_max_packet_size);
|
.value_or(default_max_send_size);
|
||||||
if (publish.size() > max_packet_size)
|
if (publish.size() > max_packet_size)
|
||||||
return complete_post(client::error::packet_too_large, packet_id);
|
return complete_post(client::error::packet_too_large, packet_id);
|
||||||
|
|
||||||
|
@@ -81,7 +81,7 @@ public:
|
|||||||
);
|
);
|
||||||
|
|
||||||
auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size)
|
auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size)
|
||||||
.value_or(default_max_packet_size);
|
.value_or(default_max_send_size);
|
||||||
if (subscribe.size() > max_packet_size)
|
if (subscribe.size() > max_packet_size)
|
||||||
return complete_post(
|
return complete_post(
|
||||||
client::error::packet_too_large, packet_id, topics.size()
|
client::error::packet_too_large, packet_id, topics.size()
|
||||||
|
@@ -77,10 +77,10 @@ public:
|
|||||||
|
|
||||||
auto max_packet_size = _svc_ptr->connack_prop(
|
auto max_packet_size = _svc_ptr->connack_prop(
|
||||||
prop::maximum_packet_size
|
prop::maximum_packet_size
|
||||||
).value_or(default_max_packet_size);
|
).value_or(default_max_send_size);
|
||||||
if (unsubscribe.size() > max_packet_size)
|
if (unsubscribe.size() > max_packet_size)
|
||||||
return complete_post(
|
return complete_post(
|
||||||
client::error::packet_too_large, packet_id, topics.size()
|
client::error::packet_too_large, packet_id, topics.size()
|
||||||
);
|
);
|
||||||
|
|
||||||
send_unsubscribe(std::move(unsubscribe));
|
send_unsubscribe(std::move(unsubscribe));
|
||||||
|
@@ -274,6 +274,11 @@ public:
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mqtt_client& connect_properties(connect_props props) {
|
||||||
|
_svc_ptr->connect_props(std::move(props));
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Initiates [mqttlink 3901257 Re-authentication]
|
* \brief Initiates [mqttlink 3901257 Re-authentication]
|
||||||
* using the authenticator given in the \ref authenticator method.
|
* using the authenticator given in the \ref authenticator method.
|
||||||
|
Reference in New Issue
Block a user