Support multiple subscription identifiers in received messages.

Summary:
- refactor property encoding
- change user property type to be std::pair<std::string, std::string>

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina

Differential Revision: https://repo.mireo.local/D27867
This commit is contained in:
Bruno Iljazovic
2024-02-16 10:36:26 +01:00
parent b40ddb3ced
commit 1acdd99f28
35 changed files with 469 additions and 380 deletions

View File

@@ -94,6 +94,13 @@ inline validation_result validate_mqtt_utf8(std::string_view str) {
return validate_impl(str, is_valid_string_size, is_utf8);
}
inline bool is_valid_string_pair(
const std::pair<std::string, std::string>& str_pair
) {
return validate_mqtt_utf8(str_pair.first) == validation_result::valid &&
validate_mqtt_utf8(str_pair.second) == validation_result::valid;
}
} // namespace async_mqtt5::detail
#endif //ASYNC_MQTT5_UTF8_MQTT_HPP

View File

@@ -349,49 +349,38 @@ bool parse_to_prop(
It& iter, const It last,
const Ctx& ctx, RCtx& rctx, Prop& prop
) {
using prop_type = decltype(prop);
using prop_type = std::remove_reference_t<decltype(prop)>;
bool rv = false;
if constexpr (is_optional<prop_type>) {
using value_type =
typename std::remove_reference_t<prop_type>::value_type;
if constexpr (std::is_same_v<value_type, uint8_t>) {
uint8_t attr;
rv = x3::byte_.parse(iter, last, ctx, rctx, attr);
prop = attr;
}
if constexpr (std::is_same_v<value_type, uint16_t>) {
uint16_t attr;
rv = x3::big_word.parse(iter, last, ctx, rctx, attr);
prop = attr;
}
if constexpr (std::is_same_v<value_type, int32_t>) {
int32_t attr;
rv = basic::varint_.parse(iter, last, ctx, rctx, attr);
prop = attr;
}
if constexpr (std::is_same_v<value_type, uint32_t>) {
uint32_t attr;
rv = x3::big_dword.parse(iter, last, ctx, rctx, attr);
prop = attr;
}
if constexpr (std::is_same_v<value_type, std::string>) {
std::string attr;
rv = basic::utf8_.parse(iter, last, ctx, rctx, attr);
prop.emplace(std::move(attr));
}
if constexpr (std::is_same_v<prop_type, uint8_t>)
rv = x3::byte_.parse(iter, last, ctx, rctx, prop);
else if constexpr (std::is_same_v<prop_type, uint16_t>)
rv = x3::big_word.parse(iter, last, ctx, rctx, prop);
else if constexpr (std::is_same_v<prop_type, int32_t>)
rv = basic::varint_.parse(iter, last, ctx, rctx, prop);
else if constexpr (std::is_same_v<prop_type, uint32_t>)
rv = x3::big_dword.parse(iter, last, ctx, rctx, prop);
else if constexpr (std::is_same_v<prop_type, std::string>)
rv = basic::utf8_.parse(iter, last, ctx, rctx, prop);
else if constexpr (is_optional<prop_type>) {
typename prop_type::value_type val;
rv = parse_to_prop(iter, last, ctx, rctx, val);
if (rv) prop.emplace(std::move(val));
}
if constexpr (async_mqtt5::is_vector<prop_type>) {
std::string value;
// key
rv = basic::utf8_.parse(iter, last, ctx, rctx, value);
if (rv) prop.push_back(std::move(value));
// value
rv = basic::utf8_.parse(iter, last, ctx, rctx, value);
else if constexpr (is_pair<prop_type>) {
rv = parse_to_prop(iter, last, ctx, rctx, prop.first);
rv = parse_to_prop(iter, last, ctx, rctx, prop.second);
}
else if constexpr (is_vector<prop_type> || is_small_vector<prop_type>) {
typename std::remove_reference_t<prop_type>::value_type value;
rv = parse_to_prop(iter, last, ctx, rctx, value);
if (rv) prop.push_back(std::move(value));
}
return rv;
}

View File

@@ -320,84 +320,26 @@ std::string& operator<<(std::string& s, T&& t) {
} // end namespace basic
namespace detail {
namespace pp = async_mqtt5::prop;
template <pp::property_type p, std::size_t I, typename Tuple>
constexpr bool match_v = std::is_same_v<
std::integral_constant<pp::property_type, p>,
typename std::tuple_element_t<I, Tuple>::key
>;
template <
pp::property_type p, typename Tuple,
typename Idxs = std::make_index_sequence<std::tuple_size_v<Tuple>>
>
struct type_index;
template <
pp::property_type p, template <typename...> typename Tuple,
typename... Args, std::size_t... Is
>
struct type_index<p, Tuple<Args...>, std::index_sequence<Is...>> :
std::integral_constant<
std::size_t, ((Is * match_v<p, Is, Tuple<Args...>>)+... + 0)
>
{
static_assert(
1 == (match_v<p, Is, Tuple<Args...>> + ... + 0),
"T doesn't appear once in tuple"
);
};
} // end namespace detail
namespace prop {
namespace pp = async_mqtt5::prop;
template <pp::property_type p, typename T>
struct prop_encoder_type {
using key = std::integral_constant<pp::property_type, p>;
using value = T;
};
using encoder_types = std::tuple<
prop_encoder_type<pp::shared_subscription_available_t, basic::int_def<uint8_t>>,
prop_encoder_type<pp::payload_format_indicator_t, basic::int_def<uint8_t>>,
prop_encoder_type<pp::message_expiry_interval_t, basic::int_def<uint32_t>>,
prop_encoder_type<pp::content_type_t, basic::utf8_def>,
prop_encoder_type<pp::response_topic_t, basic::utf8_def>,
prop_encoder_type<pp::correlation_data_t, basic::utf8_def>,
prop_encoder_type<pp::subscription_identifier_t, basic::int_def<basic::varint_t>>,
prop_encoder_type<pp::session_expiry_interval_t, basic::int_def<uint32_t>>,
prop_encoder_type<pp::assigned_client_identifier_t, basic::utf8_def>,
prop_encoder_type<pp::server_keep_alive_t, basic::int_def<uint16_t>>,
prop_encoder_type<pp::authentication_method_t, basic::utf8_def>,
prop_encoder_type<pp::authentication_data_t, basic::utf8_def>,
prop_encoder_type<pp::request_problem_information_t, basic::int_def<uint8_t>>,
prop_encoder_type<pp::will_delay_interval_t, basic::int_def<uint32_t>>,
prop_encoder_type<pp::request_response_information_t, basic::int_def<uint8_t>>,
prop_encoder_type<pp::response_information_t, basic::utf8_def>,
prop_encoder_type<pp::server_reference_t, basic::utf8_def>,
prop_encoder_type<pp::reason_string_t, basic::utf8_def>,
prop_encoder_type<pp::receive_maximum_t, basic::int_def<uint16_t>>,
prop_encoder_type<pp::topic_alias_maximum_t, basic::int_def<uint16_t>>,
prop_encoder_type<pp::topic_alias_t, basic::int_def<uint16_t>>,
prop_encoder_type<pp::maximum_qos_t, basic::int_def<uint8_t>>,
prop_encoder_type<pp::retain_available_t, basic::int_def<uint8_t>>,
prop_encoder_type<pp::user_property_t, basic::utf8_def>,
prop_encoder_type<pp::maximum_packet_size_t, basic::int_def<uint32_t>>,
prop_encoder_type<pp::wildcard_subscription_available_t, basic::int_def<uint8_t>>,
prop_encoder_type<pp::subscription_identifier_available_t, basic::int_def<uint8_t>>
>;
template <pp::property_type p>
constexpr auto encoder_for_prop = typename std::tuple_element_t<
detail::type_index<p, encoder_types>::value, encoder_types
>::value {};
template <typename T>
auto encoder_for_prop_value(const T& val) {
if constexpr (std::is_same_v<T, uint8_t>)
return basic::int_def<uint8_t>{}(val);
else if constexpr (std::is_same_v<T, uint16_t>)
return basic::int_def<uint16_t>{}(val);
else if constexpr (std::is_same_v<T, int32_t>)
return basic::int_def<basic::varint_t>{}(val);
else if constexpr (std::is_same_v<T, uint32_t>)
return basic::int_def<uint32_t>{}(val);
else if constexpr (std::is_same_v<T, std::string>)
return basic::utf8_def{}(val);
else if constexpr (is_pair<T>)
return encoder_for_prop_value(val.first) &
encoder_for_prop_value(val.second);
}
template <typename T, pp::property_type p, typename Enable = void>
class prop_val;
@@ -409,10 +351,8 @@ class prop_val<
T, p,
std::enable_if_t<!is_vector<T> && is_optional<T>>
> : public basic::encoder {
// T is always std::optional
using opt_type = typename boost::remove_cv_ref_t<T>::value_type;
// allows T to be reference type to std::optional
static inline std::optional<opt_type> nulltype;
static inline boost::remove_cv_ref_t<T> nulltype;
T _val;
public:
prop_val(T val) : _val(val) {
@@ -422,16 +362,14 @@ public:
size_t byte_size() const {
if (!_val) return 0;
auto sval = encoder_for_prop<p>(_val);
return 1 + sval.byte_size();
return 1 + encoder_for_prop_value(*_val).byte_size();
}
std::string& encode(std::string& s) const {
if (!_val)
return s;
s.push_back(p);
auto sval = encoder_for_prop<p>(_val);
return sval.encode(s);
return encoder_for_prop_value(*_val).encode(s);
}
};
@@ -440,7 +378,7 @@ template <
>
class prop_val<
T, p,
std::enable_if_t<is_vector<T>>
std::enable_if_t<is_vector<T> || is_small_vector<T>>
> : public basic::encoder {
// allows T to be reference type to std::vector
static inline boost::remove_cv_ref_t<T> nulltype;
@@ -456,16 +394,8 @@ public:
if (_val.empty()) return 0;
size_t total_size = 0;
for (size_t i = 0; i < _val.size() && i + 1 < _val.size(); i += 2) {
auto skey = encoder_for_prop<p>(_val[i]);
size_t key_size = skey.byte_size();
auto sval = encoder_for_prop<p>(_val[i + 1]);
size_t val_size = sval.byte_size();
if (key_size && val_size)
total_size += 1 + key_size + val_size;
}
for (const auto& elem : _val)
total_size += 1 + encoder_for_prop_value(elem).byte_size();
return total_size;
}
@@ -474,14 +404,9 @@ public:
if (_val.empty())
return s;
for (size_t i = 0; i < _val.size() && i + 1 < _val.size(); i += 2) {
for (const auto& elem: _val) {
s.push_back(p);
auto skey = encoder_for_prop<p>(_val[i]);
skey.encode(s);
auto sval = encoder_for_prop<p>(_val[i + 1]);
sval.encode(s);
encoder_for_prop_value(elem).encode(s);
}
return s;

View File

@@ -4,6 +4,7 @@
#include <optional>
#include <vector>
#include <boost/container/small_vector.hpp>
#include <boost/range/iterator_range_core.hpp>
#include <boost/type_traits/remove_cv_ref.hpp>
@@ -30,6 +31,21 @@ constexpr bool is_vector = is_specialization<
boost::remove_cv_ref_t<T>, std::vector
>;
template <typename... Args>
constexpr std::true_type is_small_vector_impl(
boost::container::small_vector_base<Args...> const &
);
constexpr std::false_type is_small_vector_impl( ... );
template <typename T>
constexpr bool is_small_vector =
decltype(is_small_vector_impl(std::declval<T>()))::value;
template <typename T>
constexpr bool is_pair = is_specialization<
boost::remove_cv_ref_t<T>, std::pair
>;
template <typename T>
constexpr bool is_boost_iterator = is_specialization<
boost::remove_cv_ref_t<T>, boost::iterator_range

View File

@@ -131,8 +131,8 @@ private:
return client::error::malformed_packet;
auto user_properties = props[prop::user_property];
for (const auto& user_prop: user_properties)
if (validate_mqtt_utf8(user_prop) != validation_result::valid)
for (const auto& user_property: user_properties)
if (!is_valid_string_pair(user_property))
return client::error::malformed_packet;
return error_code {};
}

View File

@@ -141,8 +141,8 @@ class endpoints {
public:
template <typename Executor>
endpoints(Executor ex, asio::steady_timer& timer)
: _resolver(ex), _connect_timer(timer)
endpoints(Executor ex, asio::steady_timer& timer) :
_resolver(ex), _connect_timer(timer)
{}
void clone_servers(const endpoints& other) {

View File

@@ -379,16 +379,11 @@ private:
return client::error::malformed_packet;
auto user_properties = props[prop::user_property];
for (const auto& user_prop: user_properties)
if (validate_mqtt_utf8(user_prop) != validation_result::valid)
for (const auto& user_property: user_properties)
if (!is_valid_string_pair(user_property))
return client::error::malformed_packet;
auto subscription_identifier = props[prop::subscription_identifier];
if (
subscription_identifier &&
(*subscription_identifier < min_subscription_identifier ||
*subscription_identifier > max_subscription_identifier)
)
if (!props[prop::subscription_identifier].empty())
return client::error::malformed_packet;
auto content_type = props[prop::content_type];

View File

@@ -220,8 +220,8 @@ private:
error_code validate_props(const subscribe_props& props) const {
auto user_properties = props[prop::user_property];
for (const auto& user_prop: user_properties)
if (validate_mqtt_utf8(user_prop) != validation_result::valid)
for (const auto& user_property: user_properties)
if (!is_valid_string_pair(user_property))
return client::error::malformed_packet;
auto sub_id = props[prop::subscription_identifier];

View File

@@ -178,8 +178,8 @@ private:
return client::error::invalid_topic;
auto user_properties = props[prop::user_property];
for (const auto& user_prop: user_properties)
if (validate_mqtt_utf8(user_prop) != validation_result::valid)
for (const auto& user_property: user_properties)
if (!is_valid_string_pair(user_property))
return client::error::malformed_packet;
return error_code {};
}

View File

@@ -368,14 +368,15 @@ public:
*
* \details The return type varies according to the property requested.
* For all properties, the return type will be `std::optional` of their respective value type.
* For `async_mqtt5::prop::user_property`, the return type is `std::vector<std::string>`.
* For `async_mqtt5::prop::user_property`, the return type is
* `std::vector<std::pair<std::string, std::string>>`.
*
* \param prop The \__CONNACK_PROPS\__ property value to retrieve.
*
* \par Example
* \code
* std::optional<std::string> auth_method = client.connack_property(async_mqtt5::prop::authentication_method); // ok
* std::optional<std::string> c_type = client.connack_property(async_mqtt5::prop::content_type); // does not compile, not a CONNAK prop!
* std::optional<std::string> c_type = client.connack_property(async_mqtt5::prop::content_type); // does not compile, not a CONNACK prop!
* \endcode
*
* \see See \__CONNACK_PROPS\__ for all eligible properties.

View File

@@ -7,6 +7,8 @@
#include <type_traits>
#include <vector>
#include <boost/container/small_vector.hpp>
namespace async_mqtt5::prop {
enum property_type : uint8_t {
@@ -39,9 +41,49 @@ enum property_type : uint8_t {
shared_subscription_available_t = 0x2a
};
class alignas(8) subscription_identifiers :
public boost::container::small_vector<int32_t, 1>
{
using base_type = boost::container::small_vector<int32_t, 1>;
public:
using base_type::base_type;
subscription_identifiers(int32_t val) : base_type { val } {}
bool has_value() const noexcept {
return !empty();
}
int32_t& operator*() noexcept {
return front();
}
int32_t operator*() const noexcept {
return front();
}
void emplace(int32_t val = 0) {
*this = val;
}
int32_t value() const {
return front();
}
int32_t value_or(int32_t default_val) const noexcept {
return empty() ? default_val : front();
}
void reset() noexcept {
clear();
}
};
template <property_type p>
struct property_traits;
using user_property_value_t = std::vector<std::pair<std::string, std::string>>;
#define DEF_PROPERTY_TRAIT(Pname, Ptype) \
template <> \
struct property_traits<Pname##_t> { \
@@ -55,7 +97,7 @@ DEF_PROPERTY_TRAIT(message_expiry_interval, std::optional<uint32_t>);
DEF_PROPERTY_TRAIT(content_type, std::optional<std::string>);
DEF_PROPERTY_TRAIT(response_topic, std::optional<std::string>);
DEF_PROPERTY_TRAIT(correlation_data, std::optional<std::string>);
DEF_PROPERTY_TRAIT(subscription_identifier, std::optional<int32_t>);
DEF_PROPERTY_TRAIT(subscription_identifier, subscription_identifiers);
DEF_PROPERTY_TRAIT(session_expiry_interval, std::optional<uint32_t>);
DEF_PROPERTY_TRAIT(assigned_client_identifier, std::optional<std::string>);
DEF_PROPERTY_TRAIT(server_keep_alive, std::optional<uint16_t>);
@@ -72,7 +114,7 @@ DEF_PROPERTY_TRAIT(topic_alias_maximum, std::optional<uint16_t>);
DEF_PROPERTY_TRAIT(topic_alias, std::optional<uint16_t>);
DEF_PROPERTY_TRAIT(maximum_qos, std::optional<uint8_t>);
DEF_PROPERTY_TRAIT(retain_available, std::optional<uint8_t>);
DEF_PROPERTY_TRAIT(user_property, std::vector<std::string>);
DEF_PROPERTY_TRAIT(user_property, user_property_value_t);
DEF_PROPERTY_TRAIT(maximum_packet_size, std::optional<uint32_t>);
DEF_PROPERTY_TRAIT(wildcard_subscription_available, std::optional<uint8_t>);
DEF_PROPERTY_TRAIT(subscription_identifier_available, std::optional<uint8_t>);

View File

@@ -41,8 +41,8 @@ public:
/// \cond INTERNAL
constexpr reason_code() : _code(0xff) {}
constexpr reason_code(uint8_t code, reason_codes::category cat)
: _code(code), _category(cat)
constexpr reason_code(uint8_t code, reason_codes::category cat) :
_code(code), _category(cat)
{}
constexpr explicit reason_code(uint8_t code) : _code(code) {}