forked from boostorg/mqtt5
Validate control packet size
Summary: resolves T13332 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Maniphest Tasks: T13332 Differential Revision: https://repo.mireo.local/D27000
This commit is contained in:
@@ -25,17 +25,21 @@ may complete with, along with the reasons for their occurrence.
|
||||
to establish a connection with the Server. The cause of this error may be attributed to the connection
|
||||
related parameters used during the initialization of the [reflink2 mqtt_client `mqtt_client`].
|
||||
]]
|
||||
[[`async_mqtt5::client::error::malformed_packet`][
|
||||
The Client has attempted to send a packet that does not conform to the specification.
|
||||
This issue can arise from improperly formed UTF-8 encoded strings.
|
||||
Additionally, this error can be caused by providing out-of-range values.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::packet_too_large`][
|
||||
The Client has attempted to send a packet larger than the Maximum Packet Size the Server
|
||||
is willing to process.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::session_expired`][
|
||||
The Client has established a successful connection with a Server, but either the session does not exist or has expired.
|
||||
In cases where the Client had previously set up subscriptions to Topics, these subscriptions are also expired.
|
||||
Therefore, the Client should re-subscribe.
|
||||
This error code is exclusive to completion handlers associated with [refmem mqtt_client async_receive] calls.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::malformed_packet`][
|
||||
The Client has attempted to send a packet that does not conform to the specification.
|
||||
This issue can arise from improperly formed UTF-8 encoded strings.
|
||||
Additionally, this error can be caused by providing out-of-range values.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::pid_overrun`] [
|
||||
This error code signifies that the Client was unable to allocate a Packet Identifier for
|
||||
the current operation due to the exhaustion of the available identifiers.
|
||||
|
@@ -9,6 +9,7 @@
|
||||
|
||||
namespace async_mqtt5 {
|
||||
|
||||
static constexpr int32_t default_max_packet_size = 65'536;
|
||||
|
||||
enum class control_code_e : std::uint8_t {
|
||||
no_packet = 0b00000000, // 0
|
||||
@@ -79,6 +80,10 @@ public:
|
||||
};
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
return _packet->size();
|
||||
}
|
||||
|
||||
control_code_e control_code() const {
|
||||
return control_code_e(uint8_t(*(_packet->data())) & 0b11110000);
|
||||
}
|
||||
|
@@ -71,7 +71,6 @@ struct mqtt_ctx {
|
||||
credentials creds;
|
||||
std::optional<will> will_msg;
|
||||
connect_props co_props;
|
||||
std::shared_mutex ca_mtx;
|
||||
connack_props ca_props;
|
||||
session_state state;
|
||||
any_authenticator authenticator;
|
||||
|
@@ -7,6 +7,11 @@
|
||||
|
||||
namespace async_mqtt5::detail {
|
||||
|
||||
static constexpr uint32_t min_subscription_identifier = 1;
|
||||
static constexpr uint32_t max_subscription_identifier = 268'435'455;
|
||||
|
||||
static constexpr std::string_view shared_sub_id = "$share/";
|
||||
|
||||
inline bool is_utf8_no_wildcard(validation_result result) {
|
||||
return result == validation_result::valid;
|
||||
}
|
||||
@@ -72,8 +77,6 @@ inline validation_result validate_topic_filter(std::string_view str) {
|
||||
inline validation_result validate_shared_topic_filter(
|
||||
std::string_view str, bool wildcard_allowed = true
|
||||
) {
|
||||
constexpr std::string_view shared_sub_id = "$share/";
|
||||
|
||||
if (!is_valid_topic_size(str.size()))
|
||||
return validation_result::invalid;
|
||||
|
||||
|
@@ -56,6 +56,9 @@ enum class error : int {
|
||||
/** The packet is malformed. */
|
||||
malformed_packet = 100,
|
||||
|
||||
/** The packet has exceeded the Maximum Packet Size the Server is willing to accept. */
|
||||
packet_too_large,
|
||||
|
||||
/** The Client's session does not exist or it has expired. */
|
||||
session_expired,
|
||||
|
||||
@@ -91,6 +94,9 @@ inline std::string client_error_to_string(error err) {
|
||||
switch (err) {
|
||||
case error::malformed_packet:
|
||||
return "The packet is malformed.";
|
||||
case error::packet_too_large:
|
||||
return "The packet has exceeded the Maximum Packet Size "
|
||||
"the Server is willing to accept.";
|
||||
case error::session_expired:
|
||||
return "The Client's session does not exist or it has expired.";
|
||||
case error::pid_overrun:
|
||||
|
@@ -52,7 +52,7 @@ class assemble_op {
|
||||
|
||||
struct on_read {};
|
||||
|
||||
static constexpr size_t max_packet_size = 65536;
|
||||
static constexpr size_t max_packet_size = default_max_packet_size;
|
||||
|
||||
client_service& _svc;
|
||||
handler_type _handler;
|
||||
|
@@ -62,18 +62,12 @@ public:
|
||||
}
|
||||
|
||||
template <typename Prop>
|
||||
decltype(auto) connack_prop(Prop p) {
|
||||
std::shared_lock reader_lock(_mqtt_context.ca_mtx);
|
||||
return std::as_const(_mqtt_context.ca_props[p]);
|
||||
const auto& connack_prop(Prop p) const {
|
||||
return _mqtt_context.ca_props[p];
|
||||
}
|
||||
|
||||
template <typename Prop0, typename ...Props>
|
||||
decltype(auto) connack_props(Prop0 p0, Props ...props) {
|
||||
std::shared_lock reader_lock(_mqtt_context.ca_mtx);
|
||||
return std::make_tuple(
|
||||
std::as_const(_mqtt_context.ca_props[p0]),
|
||||
std::as_const(_mqtt_context.ca_props[props])...
|
||||
);
|
||||
const auto& connack_props() const {
|
||||
return _mqtt_context.ca_props;
|
||||
}
|
||||
|
||||
void credentials(
|
||||
@@ -120,18 +114,12 @@ public:
|
||||
}
|
||||
|
||||
template <typename Prop>
|
||||
decltype(auto) connack_prop(Prop p) {
|
||||
std::shared_lock reader_lock(_mqtt_context.ca_mtx);
|
||||
return std::as_const(_mqtt_context.ca_props[p]);
|
||||
const auto& connack_prop(Prop p) const {
|
||||
return _mqtt_context.ca_props[p];
|
||||
}
|
||||
|
||||
template <typename Prop0, typename ...Props>
|
||||
decltype(auto) connack_props(Prop0 p0, Props ...props) {
|
||||
std::shared_lock reader_lock(_mqtt_context.ca_mtx);
|
||||
return std::make_tuple(
|
||||
std::as_const(_mqtt_context.ca_props[p0]),
|
||||
std::as_const(_mqtt_context.ca_props[props])...
|
||||
);
|
||||
const auto& connack_props() const {
|
||||
return _mqtt_context.ca_props;
|
||||
}
|
||||
|
||||
void credentials(
|
||||
@@ -260,14 +248,14 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
template <typename Prop>
|
||||
decltype(auto) connack_prop(Prop p) {
|
||||
const auto& connack_prop(Prop p) const {
|
||||
return _stream_context.connack_prop(p);
|
||||
}
|
||||
|
||||
template <typename Prop0, typename ...Props>
|
||||
decltype(auto) connack_props(Prop0 p0, Props ...props) {
|
||||
return _stream_context.connack_props(p0, props...);
|
||||
const auto& connack_props() const {
|
||||
return _stream_context.connack_props();
|
||||
}
|
||||
|
||||
void run() {
|
||||
|
@@ -285,11 +285,7 @@ public:
|
||||
return complete(client::error::malformed_packet);
|
||||
const auto& [session_present, reason_code, ca_props] = *rv;
|
||||
|
||||
{
|
||||
std::unique_lock writer_lock(_ctx.ca_mtx);
|
||||
_ctx.ca_props = ca_props;
|
||||
}
|
||||
|
||||
_ctx.ca_props = ca_props;
|
||||
_ctx.state.session_present(session_present);
|
||||
|
||||
// Unexpected result handling:
|
||||
@@ -355,7 +351,7 @@ public:
|
||||
|
||||
const auto& wire_data = packet.wire_data();
|
||||
|
||||
async_mqtt5::detail::async_write(
|
||||
detail::async_write(
|
||||
_stream, asio::buffer(wire_data),
|
||||
asio::consign(
|
||||
asio::prepend(std::move(*this), on_send_auth{}),
|
||||
|
@@ -2,7 +2,6 @@
|
||||
#define ASYNC_MQTT5_DISCONNECT_OP_HPP
|
||||
|
||||
#include <boost/asio/consign.hpp>
|
||||
#include <boost/asio/dispatch.hpp>
|
||||
#include <boost/asio/prepend.hpp>
|
||||
|
||||
#include <async_mqtt5/types.hpp>
|
||||
@@ -10,6 +9,7 @@
|
||||
#include <async_mqtt5/detail/cancellable_handler.hpp>
|
||||
#include <async_mqtt5/detail/control_packet.hpp>
|
||||
#include <async_mqtt5/detail/internal_types.hpp>
|
||||
#include <async_mqtt5/detail/topic_validation.hpp>
|
||||
#include <async_mqtt5/detail/utf8_mqtt.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/codecs/message_encoders.hpp>
|
||||
@@ -71,12 +71,22 @@ public:
|
||||
static_cast<uint8_t>(_context.reason_code), _context.props
|
||||
);
|
||||
|
||||
asio::dispatch(asio::prepend(std::move(*this), std::move(disconnect)));
|
||||
auto max_packet_size = _svc_ptr->connack_prop(
|
||||
prop::maximum_packet_size
|
||||
).value_or(default_max_packet_size);
|
||||
if (disconnect.size() > max_packet_size)
|
||||
// drop properties
|
||||
return send_disconnect(control_packet<allocator_type>::of(
|
||||
no_pid, get_allocator(),
|
||||
encoders::encode_disconnect,
|
||||
static_cast<uint8_t>(_context.reason_code), disconnect_props {}
|
||||
));
|
||||
|
||||
send_disconnect(std::move(disconnect));
|
||||
}
|
||||
|
||||
void operator()(control_packet<allocator_type> disconnect) {
|
||||
void send_disconnect(control_packet<allocator_type> disconnect) {
|
||||
const auto& wire_data = disconnect.wire_data();
|
||||
|
||||
_svc_ptr->async_send(
|
||||
wire_data,
|
||||
no_serial, send_flag::terminal,
|
||||
|
@@ -99,30 +99,18 @@ public:
|
||||
void perform(
|
||||
std::string topic, std::string payload,
|
||||
retain_e retain, const publish_props& props
|
||||
) {
|
||||
auto ec = validate_publish(topic, payload, retain, props);
|
||||
if (ec)
|
||||
return complete_post(ec);
|
||||
|
||||
asio::dispatch(
|
||||
asio::prepend(
|
||||
std::move(*this), std::move(topic),
|
||||
std::move(payload), retain, props
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
void operator()(
|
||||
std::string topic, std::string payload,
|
||||
retain_e retain, const publish_props& props
|
||||
) {
|
||||
uint16_t packet_id = 0;
|
||||
if constexpr (qos_type != qos_e::at_most_once) {
|
||||
packet_id = _svc_ptr->allocate_pid();
|
||||
if (packet_id == 0)
|
||||
return complete_post(client::error::pid_overrun);
|
||||
return complete_post(client::error::pid_overrun, packet_id);
|
||||
}
|
||||
|
||||
auto ec = validate_publish(topic, payload, retain, props);
|
||||
if (ec)
|
||||
return complete_post(ec, packet_id);
|
||||
|
||||
_serial_num = _svc_ptr->next_serial_num();
|
||||
|
||||
auto publish = control_packet<allocator_type>::of(
|
||||
@@ -132,9 +120,15 @@ public:
|
||||
qos_type, retain, dup_e::no, props
|
||||
);
|
||||
|
||||
auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size)
|
||||
.value_or(default_max_packet_size);
|
||||
if (publish.size() > max_packet_size)
|
||||
return complete_post(client::error::packet_too_large, packet_id);
|
||||
|
||||
send_publish(std::move(publish));
|
||||
}
|
||||
|
||||
|
||||
void send_publish(control_packet<allocator_type> publish) {
|
||||
if (_handler.empty()) { // already cancelled
|
||||
if constexpr (qos_type != qos_e::at_most_once)
|
||||
@@ -192,6 +186,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <
|
||||
qos_e q = qos_type,
|
||||
std::enable_if_t<q == qos_e::at_least_once, bool> = true
|
||||
@@ -336,18 +331,51 @@ public:
|
||||
return send_pubrel(std::move(pubrel), true);
|
||||
}
|
||||
|
||||
return complete(ec, *rc, pubrel.packet_id(), pubcomp_props{});
|
||||
return complete(ec, *rc, pubrel.packet_id(), pubcomp_props {});
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
error_code validate_props(
|
||||
const publish_props& props,
|
||||
prop::value_type_t<prop::topic_alias_maximum> topic_alias_max_opt
|
||||
) {
|
||||
|
||||
error_code validate_publish(
|
||||
const std::string& topic, const std::string& payload,
|
||||
retain_e retain, const publish_props& props
|
||||
) const {
|
||||
constexpr uint8_t default_retain_available = 1;
|
||||
constexpr uint8_t default_maximum_qos = 2;
|
||||
constexpr uint8_t default_payload_format_ind = 0;
|
||||
|
||||
if (validate_topic_name(topic) != validation_result::valid)
|
||||
return client::error::invalid_topic;
|
||||
|
||||
auto max_qos = _svc_ptr->connack_prop(prop::maximum_qos)
|
||||
.value_or(default_maximum_qos);
|
||||
auto retain_available = _svc_ptr->connack_prop(prop::retain_available)
|
||||
.value_or(default_retain_available);
|
||||
|
||||
if (uint8_t(qos_type) > max_qos)
|
||||
return client::error::qos_not_supported;
|
||||
|
||||
if (retain_available == 0 && retain == retain_e::yes)
|
||||
return client::error::retain_not_available;
|
||||
|
||||
auto payload_format_ind = props[prop::payload_format_indicator]
|
||||
.value_or(default_payload_format_ind);
|
||||
if (
|
||||
payload_format_ind == 1 &&
|
||||
validate_mqtt_utf8(payload) != validation_result::valid
|
||||
)
|
||||
return client::error::malformed_packet;
|
||||
|
||||
return validate_props(props);
|
||||
}
|
||||
|
||||
error_code validate_props(const publish_props& props) const {
|
||||
constexpr uint16_t default_topic_alias_max = 0;
|
||||
|
||||
auto topic_alias = props[prop::topic_alias];
|
||||
if (topic_alias) {
|
||||
auto topic_alias_max = topic_alias_max_opt.value_or(0);
|
||||
auto topic_alias_max = _svc_ptr->connack_prop(prop::topic_alias_maximum)
|
||||
.value_or(default_topic_alias_max);
|
||||
|
||||
if (topic_alias_max == 0 || *topic_alias > topic_alias_max)
|
||||
return client::error::topic_alias_maximum_reached;
|
||||
@@ -370,7 +398,8 @@ private:
|
||||
auto subscription_identifier = props[prop::subscription_identifier];
|
||||
if (
|
||||
subscription_identifier &&
|
||||
(*subscription_identifier < 1 || *subscription_identifier > 268'435'455)
|
||||
(*subscription_identifier < min_subscription_identifier ||
|
||||
*subscription_identifier > max_subscription_identifier)
|
||||
)
|
||||
return client::error::malformed_packet;
|
||||
|
||||
@@ -384,37 +413,6 @@ private:
|
||||
return error_code {};
|
||||
}
|
||||
|
||||
error_code validate_publish(
|
||||
const std::string& topic, const std::string& payload,
|
||||
retain_e retain, const publish_props& props
|
||||
) {
|
||||
if (validate_topic_name(topic) != validation_result::valid)
|
||||
return client::error::invalid_topic;
|
||||
|
||||
const auto& [max_qos_opt, retain_available_opt, topic_alias_max_opt] =
|
||||
_svc_ptr->connack_props(
|
||||
prop::maximum_qos, prop::retain_available,
|
||||
prop::topic_alias_maximum
|
||||
);
|
||||
auto max_qos = max_qos_opt.value_or(2);
|
||||
auto retain_available = retain_available_opt.value_or(1);
|
||||
|
||||
if (uint8_t(qos_type) > max_qos)
|
||||
return client::error::qos_not_supported;
|
||||
|
||||
if (retain_available == 0 && retain == retain_e::yes)
|
||||
return client::error::retain_not_available;
|
||||
|
||||
auto payload_format = props[prop::payload_format_indicator].value_or(0);
|
||||
if (
|
||||
payload_format == 1 &&
|
||||
validate_mqtt_utf8(payload) != validation_result::valid
|
||||
)
|
||||
return client::error::malformed_packet;
|
||||
|
||||
return validate_props(props, topic_alias_max_opt);
|
||||
}
|
||||
|
||||
void on_malformed_packet(const std::string& reason) {
|
||||
auto props = disconnect_props {};
|
||||
props[prop::reason_string] = reason;
|
||||
@@ -436,7 +434,7 @@ private:
|
||||
qos_e q = qos_type,
|
||||
std::enable_if_t<q == qos_e::at_most_once, bool> = true
|
||||
>
|
||||
void complete_post(error_code ec) {
|
||||
void complete_post(error_code ec, uint16_t) {
|
||||
_handler.complete_post(ec);
|
||||
}
|
||||
|
||||
@@ -464,7 +462,9 @@ private:
|
||||
bool
|
||||
> = true
|
||||
>
|
||||
void complete_post(error_code ec) {
|
||||
void complete_post(error_code ec, uint16_t packet_id) {
|
||||
if (packet_id != 0)
|
||||
_svc_ptr->free_pid(packet_id, false);
|
||||
_handler.complete_post(ec, reason_codes::empty, Props {});
|
||||
}
|
||||
};
|
||||
|
@@ -26,6 +26,7 @@ namespace asio = boost::asio;
|
||||
template <typename ClientService, typename Handler>
|
||||
class subscribe_op {
|
||||
using client_service = ClientService;
|
||||
|
||||
struct on_subscribe {};
|
||||
struct on_suback {};
|
||||
|
||||
@@ -62,23 +63,16 @@ public:
|
||||
void perform(
|
||||
const std::vector<subscribe_topic>& topics,
|
||||
const subscribe_props& props
|
||||
) {
|
||||
auto ec = validate_subscribe(topics, props);
|
||||
if (ec)
|
||||
return complete_post(ec, topics.size());
|
||||
|
||||
asio::dispatch(
|
||||
asio::prepend(std::move(*this), topics, props)
|
||||
);
|
||||
}
|
||||
|
||||
void operator()(
|
||||
const std::vector<subscribe_topic>& topics,
|
||||
const subscribe_props& props
|
||||
) {
|
||||
uint16_t packet_id = _svc_ptr->allocate_pid();
|
||||
if (packet_id == 0)
|
||||
return complete_post(client::error::pid_overrun, topics.size());
|
||||
return complete_post(
|
||||
client::error::pid_overrun, packet_id, topics.size()
|
||||
);
|
||||
|
||||
auto ec = validate_subscribe(topics, props);
|
||||
if (ec)
|
||||
return complete_post(ec, packet_id, topics.size());
|
||||
|
||||
auto subscribe = control_packet<allocator_type>::of(
|
||||
with_pid, get_allocator(),
|
||||
@@ -86,6 +80,13 @@ public:
|
||||
topics, props
|
||||
);
|
||||
|
||||
auto max_packet_size = _svc_ptr->connack_prop(prop::maximum_packet_size)
|
||||
.value_or(default_max_packet_size);
|
||||
if (subscribe.size() > max_packet_size)
|
||||
return complete_post(
|
||||
client::error::packet_too_large, packet_id, topics.size()
|
||||
);
|
||||
|
||||
send_subscribe(std::move(subscribe));
|
||||
}
|
||||
|
||||
@@ -149,38 +150,30 @@ public:
|
||||
|
||||
private:
|
||||
|
||||
static bool is_option_available(std::optional<uint8_t> sub_opt) {
|
||||
return !sub_opt.has_value() || *sub_opt == 1;
|
||||
error_code validate_subscribe(
|
||||
const std::vector<subscribe_topic>& topics, const subscribe_props& props
|
||||
) const {
|
||||
error_code ec;
|
||||
for (const auto& topic: topics) {
|
||||
ec = validate_topic(topic);
|
||||
if (ec)
|
||||
return ec;
|
||||
}
|
||||
|
||||
ec = validate_props(props);
|
||||
return ec;
|
||||
}
|
||||
|
||||
static error_code validate_props(
|
||||
const subscribe_props& props, bool sub_id_available
|
||||
) {
|
||||
auto user_properties = props[prop::user_property];
|
||||
for (const auto& user_prop: user_properties)
|
||||
if (validate_mqtt_utf8(user_prop) != validation_result::valid)
|
||||
return client::error::malformed_packet;
|
||||
error_code validate_topic(const subscribe_topic& topic) const {
|
||||
auto wildcard_available = _svc_ptr->connack_prop(
|
||||
prop::wildcard_subscription_available
|
||||
).value_or(1);
|
||||
auto shared_available = _svc_ptr->connack_prop(
|
||||
prop::shared_subscription_available
|
||||
).value_or(1);
|
||||
|
||||
auto sub_id = props[prop::subscription_identifier];
|
||||
if (!sub_id.has_value())
|
||||
return error_code {};
|
||||
|
||||
if (!sub_id_available)
|
||||
return client::error::subscription_identifier_not_available;
|
||||
|
||||
constexpr uint32_t min_sub_id = 1;
|
||||
constexpr uint32_t max_sub_id = 268'435'455;
|
||||
return min_sub_id <= *sub_id && *sub_id <= max_sub_id ?
|
||||
error_code {} :
|
||||
client::error::subscription_identifier_not_available;
|
||||
}
|
||||
|
||||
static error_code validate_topic(
|
||||
const subscribe_topic& topic, bool wildcard_available, bool shared_available
|
||||
) {
|
||||
std::string_view topic_filter = topic.topic_filter;
|
||||
|
||||
constexpr std::string_view shared_sub_id = "$share/";
|
||||
validation_result result = validation_result::valid;
|
||||
if (
|
||||
topic_filter.compare(0, shared_sub_id.size(), shared_sub_id) == 0
|
||||
@@ -201,31 +194,27 @@ private:
|
||||
return error_code {};
|
||||
}
|
||||
|
||||
error_code validate_subscribe(
|
||||
const std::vector<subscribe_topic>& topics,
|
||||
const subscribe_props& props
|
||||
) {
|
||||
auto [wildcard_available, shared_available, sub_id_available] =
|
||||
std::apply(
|
||||
[](auto ...opt) {
|
||||
return std::make_tuple(is_option_available(opt)...);
|
||||
},
|
||||
_svc_ptr->connack_props(
|
||||
prop::wildcard_subscription_available,
|
||||
prop::shared_subscription_available,
|
||||
prop::subscription_identifier_available
|
||||
)
|
||||
);
|
||||
error_code validate_props(const subscribe_props& props) const {
|
||||
auto user_properties = props[prop::user_property];
|
||||
for (const auto& user_prop: user_properties)
|
||||
if (validate_mqtt_utf8(user_prop) != validation_result::valid)
|
||||
return client::error::malformed_packet;
|
||||
|
||||
error_code ec;
|
||||
for (const auto& topic: topics) {
|
||||
ec = validate_topic(topic, wildcard_available, shared_available);
|
||||
if (ec)
|
||||
return ec;
|
||||
}
|
||||
auto sub_id = props[prop::subscription_identifier];
|
||||
if (!sub_id.has_value())
|
||||
return error_code {};
|
||||
|
||||
ec = validate_props(props, sub_id_available);
|
||||
return ec;
|
||||
auto sub_id_available = _svc_ptr->connack_prop(
|
||||
prop::subscription_identifier_available
|
||||
).value_or(1);
|
||||
|
||||
if (!sub_id_available)
|
||||
return client::error::subscription_identifier_not_available;
|
||||
|
||||
return (min_subscription_identifier <= *sub_id &&
|
||||
*sub_id <= max_subscription_identifier) ?
|
||||
error_code {} :
|
||||
client::error::subscription_identifier_not_available;
|
||||
}
|
||||
|
||||
static std::vector<reason_code> to_reason_codes(
|
||||
@@ -250,7 +239,9 @@ private:
|
||||
}
|
||||
|
||||
|
||||
void complete_post(error_code ec, size_t num_topics) {
|
||||
void complete_post(error_code ec, uint16_t packet_id, size_t num_topics) {
|
||||
if (packet_id != 0)
|
||||
_svc_ptr->free_pid(packet_id);
|
||||
_handler.complete_post(
|
||||
ec, std::vector<reason_code> { num_topics, reason_codes::empty },
|
||||
suback_props {}
|
||||
|
@@ -58,23 +58,16 @@ public:
|
||||
void perform(
|
||||
const std::vector<std::string>& topics,
|
||||
const unsubscribe_props& props
|
||||
) {
|
||||
auto ec = validate_unsubscribe(topics, props);
|
||||
if (ec)
|
||||
return complete_post(ec, topics.size());
|
||||
|
||||
asio::dispatch(
|
||||
asio::prepend(std::move(*this), topics, props)
|
||||
);
|
||||
}
|
||||
|
||||
void operator()(
|
||||
const std::vector<std::string>& topics,
|
||||
const unsubscribe_props& props
|
||||
) {
|
||||
uint16_t packet_id = _svc_ptr->allocate_pid();
|
||||
if (packet_id == 0)
|
||||
return complete_post(client::error::pid_overrun, topics.size());
|
||||
return complete_post(
|
||||
client::error::pid_overrun, packet_id, topics.size()
|
||||
);
|
||||
|
||||
auto ec = validate_unsubscribe(topics, props);
|
||||
if (ec)
|
||||
return complete_post(ec, packet_id, topics.size());
|
||||
|
||||
auto unsubscribe = control_packet<allocator_type>::of(
|
||||
with_pid, get_allocator(),
|
||||
@@ -82,6 +75,14 @@ public:
|
||||
topics, props
|
||||
);
|
||||
|
||||
auto max_packet_size = _svc_ptr->connack_prop(
|
||||
prop::maximum_packet_size
|
||||
).value_or(default_max_packet_size);
|
||||
if (unsubscribe.size() > max_packet_size)
|
||||
return complete_post(
|
||||
client::error::packet_too_large, packet_id, topics.size()
|
||||
);
|
||||
|
||||
send_unsubscribe(std::move(unsubscribe));
|
||||
}
|
||||
|
||||
@@ -94,7 +95,7 @@ public:
|
||||
wire_data,
|
||||
no_serial, send_flag::none,
|
||||
asio::prepend(
|
||||
std::move(*this), on_unsubscribe{}, std::move(unsubscribe)
|
||||
std::move(*this), on_unsubscribe {}, std::move(unsubscribe)
|
||||
)
|
||||
);
|
||||
}
|
||||
@@ -183,7 +184,9 @@ private:
|
||||
);
|
||||
}
|
||||
|
||||
void complete_post(error_code ec, size_t num_topics) {
|
||||
void complete_post(error_code ec, uint16_t packet_id, size_t num_topics) {
|
||||
if (packet_id != 0)
|
||||
_svc_ptr->free_pid(packet_id);
|
||||
_handler.complete_post(
|
||||
ec, std::vector<reason_code> { num_topics, reason_codes::empty },
|
||||
unsuback_props {}
|
||||
|
@@ -24,6 +24,12 @@ namespace asio = boost::asio;
|
||||
* the stream of bytes between the Client and the Broker. The transport must be
|
||||
* ordered and lossless.
|
||||
* \tparam \__TlsContext\__ Type of the context object used in TLS/SSL connections.
|
||||
*
|
||||
* \par Thread safety
|
||||
* ['Distinct objects]: safe. \n
|
||||
* ['Shared objects]: unsafe. \n
|
||||
* This class is [*not thread-safe].
|
||||
* The application must also ensure that all asynchronous operations are performed within the same implicit or explicit strand.
|
||||
*/
|
||||
template <
|
||||
typename StreamType,
|
||||
@@ -174,30 +180,6 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Retrieves the value of a specific property from the last \__CONNACK\__ packet received.
|
||||
*
|
||||
* \details The return type varies according to the property requested.
|
||||
* For all properties, the return type will be `std::optional` of their respective value type.
|
||||
* For `async_mqtt5::prop::user_property`, the return type is `std::vector<std::string>`.
|
||||
*
|
||||
* \param prop The \__CONNACK\__ property value to retrieve.
|
||||
*
|
||||
* \par Example
|
||||
* \code
|
||||
* std::optional<std::string> auth_method = client.connection_property(async_mqtt5::prop::authentication_method); // ok
|
||||
* std::optional<std::string> c_type = client.connection_property(async_mqtt5::prop::content_type); // does not compile!
|
||||
* \endcode
|
||||
*
|
||||
* \see See \__CONNACK_PROPS\__ for all eligible properties.
|
||||
*/
|
||||
template <prop::property_type p>
|
||||
decltype(auto) connection_property(
|
||||
std::integral_constant<prop::property_type, p> prop
|
||||
) {
|
||||
return _svc_ptr->connack_prop(prop);
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Assign a \ref will Message.
|
||||
*
|
||||
@@ -302,6 +284,30 @@ public:
|
||||
detail::re_auth_op { _svc_ptr }.perform();
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Retrieves the value of a specific property from the last \__CONNACK\__ packet received.
|
||||
*
|
||||
* \details The return type varies according to the property requested.
|
||||
* For all properties, the return type will be `std::optional` of their respective value type.
|
||||
* For `async_mqtt5::prop::user_property`, the return type is `std::vector<std::string>`.
|
||||
*
|
||||
* \param prop The \__CONNACK\__ property value to retrieve.
|
||||
*
|
||||
* \par Example
|
||||
* \code
|
||||
* std::optional<std::string> auth_method = client.connection_property(async_mqtt5::prop::authentication_method); // ok
|
||||
* std::optional<std::string> c_type = client.connection_property(async_mqtt5::prop::content_type); // does not compile!
|
||||
* \endcode
|
||||
*
|
||||
* \see See \__CONNACK_PROPS\__ for all eligible properties.
|
||||
*/
|
||||
template <prop::property_type p>
|
||||
const auto& connection_property(
|
||||
std::integral_constant<prop::property_type, p> prop
|
||||
) const {
|
||||
return _svc_ptr->connack_prop(prop);
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Send a \__PUBLISH\__ packet to Broker to transport an
|
||||
* Application Message.
|
||||
@@ -362,6 +368,7 @@ public:
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - \link async_mqtt5::client::error::malformed_packet \endlink
|
||||
* - \link async_mqtt5::client::error::packet_too_large \endlink
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::qos_not_supported \endlink
|
||||
* - \link async_mqtt5::client::error::retain_not_available \endlink
|
||||
@@ -438,6 +445,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::malformed_packet \endlink
|
||||
* - \link async_mqtt5::client::error::packet_too_large \endlink
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
* - \link async_mqtt5::client::error::wildcard_subscription_not_available \endlink
|
||||
@@ -509,6 +517,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::malformed_packet \endlink
|
||||
* - \link async_mqtt5::client::error::packet_too_large \endlink
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
* - \link async_mqtt5::client::error::wildcard_subscription_not_available \endlink
|
||||
@@ -568,6 +577,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::malformed_packet \endlink
|
||||
* - \link async_mqtt5::client::error::packet_too_large \endlink
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
*
|
||||
@@ -635,6 +645,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::malformed_packet \endlink
|
||||
* - \link async_mqtt5::client::error::packet_too_large \endlink
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
*
|
||||
|
@@ -63,6 +63,9 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
const auto& connack_props() {
|
||||
return _test_props;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
@@ -92,7 +92,10 @@ BOOST_AUTO_TEST_CASE(client_functions) {
|
||||
|
||||
mqtt_client<tcp_layer> tcp_client(ioc, "");
|
||||
tcp_client.authenticator(good_authenticator());
|
||||
auto data = tcp_client.connection_property(prop::authentication_data);
|
||||
|
||||
std::optional<std::string> data = tcp_client.connection_property(
|
||||
prop::authentication_data
|
||||
);
|
||||
|
||||
asio::ssl::context ctx(asio::ssl::context::tls_client);
|
||||
mqtt_client<
|
||||
|
@@ -131,6 +131,36 @@ BOOST_AUTO_TEST_CASE(test_malformed_packet) {
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_packet_too_large) {
|
||||
connack_props props;
|
||||
props[prop::maximum_packet_size] = 10;
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = test::test_service<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(
|
||||
ioc.get_executor(), std::move(props)
|
||||
);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK(ec == client::error::packet_too_large);
|
||||
BOOST_CHECK_EQUAL(rc, reason_codes::empty);
|
||||
};
|
||||
|
||||
detail::publish_send_op<
|
||||
client_service_type, decltype(handler), qos_e::at_least_once
|
||||
> { svc_ptr, std::move(handler) }
|
||||
.perform(
|
||||
"test", "payload", retain_e::no, {}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_qos_not_supported) {
|
||||
connack_props props;
|
||||
props[prop::maximum_qos] = uint8_t(0);
|
||||
|
Reference in New Issue
Block a user