mirror of
https://github.com/boostorg/mqtt5.git
synced 2025-07-31 13:07:37 +02:00
topic validation in subscribe, unsubscribe & publish
Summary: resolves T13170 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Maniphest Tasks: T13170 Differential Revision: https://repo.mireo.local/D26728
This commit is contained in:
@ -65,6 +65,7 @@
|
||||
[def __RETAIN__ [mqttlink 3901104 `RETAIN`]]
|
||||
[def __SUBSCRIBE_OPTIONS__ [mqttlink 3901169 `Subscribe Options`]]
|
||||
[def __ENHANCED_AUTH__ [mqttlink 3901256 `Enhanced Authentication`]]
|
||||
[def __TOPIC_SEMANTIC_AND_USAGE__ [mqttlink 3901247 `Topic semantic and usage`]]
|
||||
|
||||
[def __CONNECT__ [mqttlink 3901033 `CONNECT`]]
|
||||
[def __CONNACK__ [mqttlink 3901074 `CONNACK`]]
|
||||
|
@ -25,17 +25,25 @@ may complete with, along with the reasons for their occurrence.
|
||||
to establish a connection with the Broker. The cause of this error may be attributed to the connection
|
||||
related parameters used during the initialization of the [reflink2 mqtt_client `mqtt_client`].
|
||||
]]
|
||||
[[`async_mqtt5::client::error::pid_overrun`] [
|
||||
This error code signifies that the Client was unable to allocate a Packet Identifier for
|
||||
the current operation due to the exhaustion of the available identifiers.
|
||||
This occurs when there are 65535 outgoing Packets awaiting their responses.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::session_expired`][
|
||||
The Client has established a successful connection with a Broker, but either the session does not exist or has expired.
|
||||
In cases where the Client had previously set up subscriptions to Topics, these subscriptions are also expired.
|
||||
Therefore, the Client should re-subscribe.
|
||||
This error code is exclusive to completion handlers associated with [refmem mqtt_client async_receive] calls.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::pid_overrun`] [
|
||||
This error code signifies that the Client was unable to allocate a Packet Identifier for
|
||||
the current operation due to the exhaustion of the available identifiers.
|
||||
This occurs when there are 65535 outgoing Packets awaiting their responses.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::invalid_topic`] [
|
||||
The Client has attempted to perform an action (publish, subscribe or unsubscribe) on an invalid Topic.
|
||||
See __TOPIC_SEMANTIC_AND_USAGE__ for information on properly formed Topics.
|
||||
This error code is exclusive to completion handlers associated with [refmem mqtt_client async_publish],
|
||||
[refmem mqtt_client async_subscribe], and [refmem mqtt_client async_unsubscribe] calls.
|
||||
In the case of [refmem mqtt_client async_subscribe] and [refmem mqtt_client async_unsubscribe], this error code
|
||||
occurs if at least one Topic provided is malformed.
|
||||
]]
|
||||
[[`async_mqtt5::client::error::qos_not_supported`] [
|
||||
The Client has attempted to publish an Application Message with __QOS__ higher
|
||||
than the Maximum __QOS__ specified by the Server.
|
||||
|
@ -71,7 +71,7 @@ void publish_qos0_openssl_tls() {
|
||||
|
||||
c.credentials("test-qos0-openssl-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_most_once>(
|
||||
@ -104,7 +104,7 @@ void publish_qos1_openssl_tls() {
|
||||
|
||||
c.credentials("test-qos1-openssl-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_least_once>(
|
||||
@ -139,7 +139,7 @@ void publish_qos2_openssl_tls() {
|
||||
|
||||
c.credentials("test-qos2-openssl-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::exactly_once>(
|
||||
@ -174,7 +174,7 @@ void subscribe_and_receive_openssl_tls(int num_receive) {
|
||||
|
||||
c.credentials("test-subscriber-openssl-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
|
||||
@ -240,7 +240,7 @@ void test_coro() {
|
||||
|
||||
c.credentials("coro-client", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
std::vector<subscribe_topic> topics;
|
||||
|
@ -7,9 +7,9 @@ void run_websocket_tls_examples();
|
||||
int main(int argc, char* argv[]) {
|
||||
|
||||
run_tcp_examples();
|
||||
run_openssl_tls_examples();
|
||||
run_websocket_tcp_examples();
|
||||
run_websocket_tls_examples();
|
||||
// run_openssl_tls_examples();
|
||||
// run_websocket_tcp_examples();
|
||||
// run_websocket_tls_examples();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ void publish_qos0_tcp() {
|
||||
|
||||
c.credentials("test-qos0-tcp", "", "")
|
||||
.brokers("mqtt.mireo.local", 1883)
|
||||
.will({ "test/mqtt-test", "i died",qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!",qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_most_once>(
|
||||
@ -46,7 +46,7 @@ void publish_qos1_tcp() {
|
||||
|
||||
c.credentials("test-qos1-tcp", "", "")
|
||||
.brokers("mqtt.mireo.local", 1883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_least_once>(
|
||||
@ -73,7 +73,7 @@ void publish_qos2_tcp() {
|
||||
|
||||
c.credentials("test-qos2-tcp", "", "")
|
||||
.brokers("mqtt.mireo.local", 1883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::exactly_once>(
|
||||
@ -101,7 +101,7 @@ void subscribe_and_receive_tcp(int num_receive) {
|
||||
|
||||
c.credentials("test-subscriber-tcp", "", "")
|
||||
.brokers("mqtt.mireo.local", 1883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_subscribe(
|
||||
|
@ -24,7 +24,7 @@ void publish_qos0_websocket_tcp() {
|
||||
|
||||
c.credentials("test-qos0-websocket-tcp", "", "")
|
||||
.brokers("fcluster-5/mqtt", 8083)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_most_once>(
|
||||
@ -54,7 +54,7 @@ void publish_qos1_websocket_tcp() {
|
||||
|
||||
c.credentials("test-qos1-websocket-tcp", "", "")
|
||||
.brokers("fcluster-5/mqtt", 8083)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_least_once>(
|
||||
@ -85,7 +85,7 @@ void publish_qos2_websocket_tcp() {
|
||||
|
||||
c.credentials("test-qos2-websocket-tcp", "", "")
|
||||
.brokers("fcluster-5/mqtt", 8083)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::exactly_once>(
|
||||
@ -117,7 +117,7 @@ void subscribe_and_receive_websocket_tcp(int num_receive) {
|
||||
|
||||
c.credentials("test-subscriber-websocket-tcp", "", "")
|
||||
.brokers("fcluster-5/mqtt", 8083)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
std::vector<subscribe_topic> topics;
|
||||
|
@ -86,7 +86,7 @@ void publish_qos0_websocket_tls() {
|
||||
|
||||
c.credentials("test-qos0-websocket-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8884)
|
||||
.will({ "test/mqtt-test", "i died", async_mqtt5::qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", async_mqtt5::qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_most_once>(
|
||||
@ -121,7 +121,7 @@ void publish_qos1_websocket_tls() {
|
||||
|
||||
c.credentials("test-qos1-websocket-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8884)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::at_least_once>(
|
||||
@ -157,7 +157,7 @@ void publish_qos2_websocket_tls() {
|
||||
|
||||
c.credentials("test-qos2-websocket-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8884)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
c.async_publish<qos_e::exactly_once>(
|
||||
@ -194,7 +194,7 @@ void subscribe_and_receive_websocket_tls(int num_receive) {
|
||||
|
||||
c.credentials("test-subscriber-websocket-tls", "", "")
|
||||
.brokers("iot.fcluster.mireo.hr/mqtt", 8884)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
std::vector<subscribe_topic> topics;
|
||||
|
82
include/async_mqtt5/detail/utf8_mqtt.hpp
Normal file
82
include/async_mqtt5/detail/utf8_mqtt.hpp
Normal file
@ -0,0 +1,82 @@
|
||||
#ifndef ASYNC_MQTT5_UTF8_MQTT_HPP
|
||||
#define ASYNC_MQTT5_UTF8_MQTT_HPP
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
namespace async_mqtt5::detail {
|
||||
|
||||
struct code_point {
|
||||
int32_t val;
|
||||
uint32_t size;
|
||||
|
||||
auto operator<=>(const code_point&) const = default;
|
||||
|
||||
static code_point from(std::string_view s) {
|
||||
auto hnibble = s[0] & 0xF0;
|
||||
return
|
||||
(hnibble & 0x80) == 0 ?
|
||||
code_point { s[0], 1 }
|
||||
:
|
||||
(hnibble == 0xC0 || hnibble == 0xD0) && s.size() > 1 ?
|
||||
code_point {
|
||||
(int32_t(s[0] & 0x1F) << 6) | int32_t(s[1] & 0x3F),
|
||||
2
|
||||
}
|
||||
:
|
||||
(hnibble == 0xE0) && s.size() > 2 ?
|
||||
code_point {
|
||||
(int32_t(s[0] & 0x1F) << 12) |
|
||||
(int32_t(s[1] & 0x3F) << 6) |
|
||||
int32_t(s[2] & 0x3F),
|
||||
3
|
||||
}
|
||||
:
|
||||
(hnibble == 0xF0) && s.size() > 3 ?
|
||||
code_point {
|
||||
(int32_t(s[0] & 0x1F) << 18) |
|
||||
(int32_t(s[1] & 0x3F) << 12) |
|
||||
(int32_t(s[2] & 0x3F) << 6) |
|
||||
int32_t(s[3] & 0x3F),
|
||||
4
|
||||
}
|
||||
:
|
||||
code_point { -1, 0 };
|
||||
}
|
||||
};
|
||||
|
||||
inline bool is_valid_mqtt_utf8(std::string_view str) {
|
||||
constexpr size_t max_sz = 65535;
|
||||
|
||||
if (str.size() > max_sz)
|
||||
return false;
|
||||
|
||||
auto is_valid_cp = [](int32_t c) -> bool {
|
||||
constexpr int32_t fe_flag = 0xFE;
|
||||
constexpr int32_t ff_flag = 0xFF;
|
||||
|
||||
return c >= 32 && // U+0000...U+001F control characters
|
||||
(c < 127 || c > 159) && // U+007F...0+009F control characters
|
||||
(c < 55296 || c > 57343) && // U+D800...U+DFFF surrogates
|
||||
(c < 64976 || c > 65007) &&// U+FDD0...U+FDEF non-characters
|
||||
(c & fe_flag) != fe_flag && // non-characters
|
||||
(c & ff_flag) != ff_flag;
|
||||
};
|
||||
|
||||
while (!str.empty()) {
|
||||
auto cp = code_point::from(str.data());
|
||||
if (!is_valid_cp(cp.val))
|
||||
return false;
|
||||
str.remove_prefix(cp.size);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
inline bool is_valid_utf8_topic(std::string_view str) {
|
||||
return !str.empty() && is_valid_mqtt_utf8(str);
|
||||
}
|
||||
|
||||
} // namespace async_mqtt5::detail
|
||||
|
||||
#endif //ASYNC_MQTT5_UTF8_MQTT_HPP
|
@ -58,11 +58,14 @@ enum class error : int {
|
||||
malformed_packet = 100,
|
||||
/// \endcond
|
||||
|
||||
/** The Client's session does not exist or it has expired. */
|
||||
session_expired,
|
||||
|
||||
/** There are no more available Packet Identifiers to use. */
|
||||
pid_overrun,
|
||||
|
||||
/** The Client's session does not exist or it has expired. */
|
||||
session_expired,
|
||||
/** The Topic is invalid and does not conform to the specification. */
|
||||
invalid_topic,
|
||||
|
||||
// publish
|
||||
/** The Server does not support the specified \ref qos_e. */
|
||||
@ -86,6 +89,9 @@ inline std::string client_error_to_string(error err) {
|
||||
return "The Client's session does not exist or it has expired. ";
|
||||
case pid_overrun:
|
||||
return "There are no more available Packet Identifiers to use.";
|
||||
case invalid_topic:
|
||||
return "The Topic is invalid and "
|
||||
"does not conform to the specification.";
|
||||
case qos_not_supported:
|
||||
return "The Server does not support the specified QoS";
|
||||
case retain_not_available:
|
||||
@ -157,7 +163,7 @@ public:
|
||||
: _code(code), _category(cat)
|
||||
{}
|
||||
|
||||
constexpr reason_code(uint8_t code) : _code(code) {}
|
||||
constexpr explicit reason_code(uint8_t code) : _code(code) {}
|
||||
/// \endcond
|
||||
|
||||
/// Copy constructor.
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <async_mqtt5/detail/cancellable_handler.hpp>
|
||||
#include <async_mqtt5/detail/control_packet.hpp>
|
||||
#include <async_mqtt5/detail/internal_types.hpp>
|
||||
#include <async_mqtt5/detail/utf8_mqtt.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/disconnect_op.hpp>
|
||||
#include <async_mqtt5/impl/internal/codecs/message_decoders.hpp>
|
||||
@ -96,8 +97,8 @@ public:
|
||||
void perform(
|
||||
std::string topic, std::string payload,
|
||||
retain_e retain, const publish_props& props
|
||||
) {
|
||||
auto ec = validate_publish(retain, props);
|
||||
) {
|
||||
auto ec = validate_publish(topic, retain, props);
|
||||
if (ec)
|
||||
return complete_post(ec);
|
||||
|
||||
@ -120,27 +121,6 @@ public:
|
||||
send_publish(std::move(publish));
|
||||
}
|
||||
|
||||
error_code validate_publish(
|
||||
retain_e retain, const publish_props& props
|
||||
) {
|
||||
auto max_qos = _svc_ptr->connack_prop(prop::maximum_qos);
|
||||
if (max_qos && uint8_t(qos_type) > *max_qos)
|
||||
return client::error::qos_not_supported;
|
||||
|
||||
auto retain_available = _svc_ptr->connack_prop(prop::retain_available);
|
||||
if (retain_available && *retain_available == 0 && retain == retain_e::yes)
|
||||
return client::error::retain_not_available;
|
||||
|
||||
// TODO: topic alias mapping
|
||||
auto topic_alias_max = _svc_ptr->connack_prop(prop::topic_alias_maximum);
|
||||
auto topic_alias = props[prop::topic_alias];
|
||||
if ((!topic_alias_max || topic_alias_max && *topic_alias_max == 0) && topic_alias)
|
||||
return client::error::topic_alias_maximum_reached;
|
||||
if (topic_alias_max && topic_alias && *topic_alias > *topic_alias_max)
|
||||
return client::error::topic_alias_maximum_reached;
|
||||
return {};
|
||||
}
|
||||
|
||||
void send_publish(control_packet<allocator_type> publish) {
|
||||
if (_handler.empty()) { // already cancelled
|
||||
if constexpr (qos_type != qos_e::at_most_once)
|
||||
@ -339,6 +319,29 @@ public:
|
||||
|
||||
|
||||
private:
|
||||
error_code validate_publish(
|
||||
const std::string& topic, retain_e retain, const publish_props& props
|
||||
) {
|
||||
if (!is_valid_utf8_topic(topic))
|
||||
return client::error::invalid_topic;
|
||||
|
||||
auto max_qos = _svc_ptr->connack_prop(prop::maximum_qos);
|
||||
if (max_qos && uint8_t(qos_type) > *max_qos)
|
||||
return client::error::qos_not_supported;
|
||||
|
||||
auto retain_available = _svc_ptr->connack_prop(prop::retain_available);
|
||||
if (retain_available && *retain_available == 0 && retain == retain_e::yes)
|
||||
return client::error::retain_not_available;
|
||||
|
||||
auto topic_alias_max = _svc_ptr->connack_prop(prop::topic_alias_maximum);
|
||||
auto topic_alias = props[prop::topic_alias];
|
||||
if ((!topic_alias_max || topic_alias_max && *topic_alias_max == 0) && topic_alias)
|
||||
return client::error::topic_alias_maximum_reached;
|
||||
if (topic_alias_max && topic_alias && *topic_alias > *topic_alias_max)
|
||||
return client::error::topic_alias_maximum_reached;
|
||||
return {};
|
||||
}
|
||||
|
||||
void on_malformed_packet(const std::string& reason) {
|
||||
auto props = disconnect_props {};
|
||||
props[prop::reason_string] = reason;
|
||||
|
@ -4,10 +4,12 @@
|
||||
#include <boost/asio/detached.hpp>
|
||||
|
||||
#include <async_mqtt5/error.hpp>
|
||||
#include <async_mqtt5/types.hpp>
|
||||
|
||||
#include <async_mqtt5/detail/cancellable_handler.hpp>
|
||||
#include <async_mqtt5/detail/control_packet.hpp>
|
||||
#include <async_mqtt5/detail/internal_types.hpp>
|
||||
#include <async_mqtt5/detail/utf8_mqtt.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/internal/codecs/message_decoders.hpp>
|
||||
#include <async_mqtt5/impl/internal/codecs/message_encoders.hpp>
|
||||
@ -57,9 +59,13 @@ public:
|
||||
const std::vector<subscribe_topic>& topics,
|
||||
const subscribe_props& props
|
||||
) {
|
||||
auto ec = validate_topics(topics);
|
||||
if (ec)
|
||||
return complete_post(ec, topics.size());
|
||||
|
||||
uint16_t packet_id = _svc_ptr->allocate_pid();
|
||||
if (packet_id == 0)
|
||||
return complete_post(client::error::pid_overrun);
|
||||
return complete_post(client::error::pid_overrun, topics.size());
|
||||
|
||||
auto subscribe = control_packet<allocator_type>::of(
|
||||
with_pid, get_allocator(),
|
||||
@ -121,7 +127,6 @@ public:
|
||||
}
|
||||
|
||||
auto& [props, reason_codes] = *suback;
|
||||
// TODO: perhaps do something with the topics we subscribed to (one day)
|
||||
|
||||
complete(
|
||||
ec, packet_id,
|
||||
@ -131,19 +136,25 @@ public:
|
||||
|
||||
private:
|
||||
|
||||
static error_code validate_topics(const std::vector<subscribe_topic>& topics) {
|
||||
for (const auto& topic: topics)
|
||||
if (!is_valid_utf8_topic(topic.topic_filter))
|
||||
return client::error::invalid_topic;
|
||||
return error_code {};
|
||||
}
|
||||
|
||||
static std::vector<reason_code> to_reason_codes(std::vector<uint8_t> codes) {
|
||||
std::vector<reason_code> ret;
|
||||
for (uint8_t code : codes) {
|
||||
auto rc = to_reason_code<reason_codes::category::suback>(code);
|
||||
if (rc)
|
||||
ret.push_back(*rc);
|
||||
// TODO: on off chance that one of the rcs is invalid, should we push something to mark that?
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void on_malformed_packet(const std::string& reason) {
|
||||
auto props = disconnect_props{};
|
||||
auto props = disconnect_props {};
|
||||
props[prop::reason_string] = reason;
|
||||
async_disconnect(
|
||||
disconnect_rc_e::malformed_packet, props, false, _svc_ptr,
|
||||
@ -152,9 +163,9 @@ private:
|
||||
}
|
||||
|
||||
|
||||
void complete_post(error_code ec) {
|
||||
void complete_post(error_code ec, size_t num_topics) {
|
||||
_handler.complete_post(
|
||||
ec, std::vector<reason_code> {}, suback_props {}
|
||||
ec, std::vector<reason_code> { num_topics, reason_codes::empty }, suback_props {}
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <async_mqtt5/detail/cancellable_handler.hpp>
|
||||
#include <async_mqtt5/detail/control_packet.hpp>
|
||||
#include <async_mqtt5/detail/internal_types.hpp>
|
||||
#include <async_mqtt5/detail/utf8_mqtt.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/disconnect_op.hpp>
|
||||
#include <async_mqtt5/impl/internal/codecs/message_decoders.hpp>
|
||||
@ -57,9 +58,13 @@ public:
|
||||
const std::vector<std::string>& topics,
|
||||
const unsubscribe_props& props
|
||||
) {
|
||||
auto ec = validate_topics(topics);
|
||||
if (ec)
|
||||
return complete_post(ec, topics.size());
|
||||
|
||||
uint16_t packet_id = _svc_ptr->allocate_pid();
|
||||
if (packet_id == 0)
|
||||
return complete_post(client::error::pid_overrun);
|
||||
return complete_post(client::error::pid_overrun, topics.size());
|
||||
|
||||
auto unsubscribe = control_packet<allocator_type>::of(
|
||||
with_pid, get_allocator(),
|
||||
@ -133,6 +138,13 @@ public:
|
||||
|
||||
private:
|
||||
|
||||
static error_code validate_topics(const std::vector<std::string>& topics) {
|
||||
for (const auto& topic : topics)
|
||||
if (!is_valid_utf8_topic(topic))
|
||||
return client::error::invalid_topic;
|
||||
return error_code {};
|
||||
}
|
||||
|
||||
static std::vector<reason_code> to_reason_codes(std::vector<uint8_t> codes) {
|
||||
std::vector<reason_code> ret;
|
||||
for (uint8_t code : codes) {
|
||||
@ -155,9 +167,9 @@ private:
|
||||
);
|
||||
}
|
||||
|
||||
void complete_post(error_code ec) {
|
||||
void complete_post(error_code ec, size_t num_topics) {
|
||||
_handler.complete_post(
|
||||
ec, std::vector<reason_code> {}, unsuback_props {}
|
||||
ec, std::vector<reason_code> { num_topics, reason_codes::empty }, suback_props {}
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -351,6 +351,7 @@ public:
|
||||
* - \link async_mqtt5::client::error::qos_not_supported \endlink
|
||||
* - \link async_mqtt5::client::error::retain_not_available \endlink
|
||||
* - \link async_mqtt5::client::error::topic_alias_maximum_reached \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
*
|
||||
* Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code.
|
||||
*/
|
||||
@ -422,6 +423,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
*
|
||||
* Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code.
|
||||
*/
|
||||
@ -488,6 +490,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
*
|
||||
* Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code.
|
||||
*/
|
||||
@ -542,6 +545,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
*
|
||||
* Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code.
|
||||
*/
|
||||
@ -607,6 +611,7 @@ public:
|
||||
* - `boost::asio::error::no_recovery` \n
|
||||
* - `boost::asio::error::operation_aborted` \n
|
||||
* - \link async_mqtt5::client::error::pid_overrun \endlink
|
||||
* - \link async_mqtt5::client::error::invalid_topic \endlink
|
||||
*
|
||||
* Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code.
|
||||
*/
|
||||
|
@ -5,7 +5,6 @@
|
||||
#include <boost/asio/detached.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
|
||||
#include <async_mqtt5.hpp>
|
||||
|
||||
@ -57,9 +56,7 @@ void cancel_async_receive() {
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
template <test::cancellation_type type>
|
||||
@ -113,9 +110,7 @@ void cancel_async_publish() {
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
template <test::cancellation_type type>
|
||||
@ -152,9 +147,7 @@ void cancel_during_connecting() {
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
|
||||
|
@ -66,9 +66,7 @@ BOOST_AUTO_TEST_CASE(publish_qos_0) {
|
||||
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
|
||||
@ -148,9 +146,7 @@ BOOST_AUTO_TEST_CASE(two_publishes_qos_1_with_fail_on_write) {
|
||||
timer.async_wait([&](auto) { c.cancel(); });
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(receive_publish_qos_2) {
|
||||
@ -221,9 +217,7 @@ BOOST_AUTO_TEST_CASE(receive_publish_qos_2) {
|
||||
timer.async_wait([&](auto) { c.cancel(); });
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(send_publish_qos_2_with_fail_on_read) {
|
||||
@ -298,9 +292,7 @@ BOOST_AUTO_TEST_CASE(send_publish_qos_2_with_fail_on_read) {
|
||||
timer.async_wait([&](auto) { c.cancel(); });
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_ordering_after_reconnect) {
|
||||
@ -392,9 +384,7 @@ BOOST_AUTO_TEST_CASE(test_ordering_after_reconnect) {
|
||||
timer.async_wait([&](auto) { c.cancel(); });
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(throttling) {
|
||||
@ -496,9 +486,7 @@ BOOST_AUTO_TEST_CASE(throttling) {
|
||||
timer.async_wait([&](auto) { c.cancel(); });
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
|
||||
@ -569,9 +557,7 @@ BOOST_AUTO_TEST_CASE(cancel_multiple_ops) {
|
||||
"The client did not cancel properly!"
|
||||
);
|
||||
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
@ -80,7 +80,7 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) {
|
||||
|
||||
c.credentials("tcp-tester", "", "")
|
||||
.brokers("mqtt.mireo.local", 1883)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
asio::steady_timer timer(ioc);
|
||||
@ -119,7 +119,7 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) {
|
||||
|
||||
c.credentials("websocket-tcp-tester", "", "")
|
||||
.brokers("fcluster-5/mqtt", 8083)
|
||||
.will({ "test/mqtt-test", "i died", qos_e::at_least_once })
|
||||
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
|
||||
.run();
|
||||
|
||||
asio::steady_timer timer(ioc);
|
||||
|
@ -49,7 +49,7 @@ BOOST_AUTO_TEST_CASE(test_pid_overrun) {
|
||||
using client_service_type = overrun_client<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor(), "");
|
||||
|
||||
auto handler = [&](error_code ec, reason_code rc, puback_props) {
|
||||
auto handler = [&handlers_called](error_code ec, reason_code rc, puback_props) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK_EQUAL(ec, client::error::pid_overrun);
|
||||
BOOST_CHECK_EQUAL(rc, reason_codes::empty);
|
||||
@ -63,9 +63,29 @@ BOOST_AUTO_TEST_CASE(test_pid_overrun) {
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_invalid_topic) {
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = test::test_service<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
|
||||
auto handler = [&handlers_called](error_code ec) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK_EQUAL(ec, client::error::invalid_topic);
|
||||
};
|
||||
|
||||
detail::publish_send_op<
|
||||
client_service_type, decltype(handler), qos_e::at_most_once
|
||||
> { svc_ptr, std::move(handler) }
|
||||
.perform("", "payload", retain_e::no, {});
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_publish_immediate_cancellation) {
|
||||
@ -77,7 +97,7 @@ BOOST_AUTO_TEST_CASE(test_publish_immediate_cancellation) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
asio::cancellation_signal cancel_signal;
|
||||
|
||||
auto h = [&](error_code ec, reason_code rc, puback_props) {
|
||||
auto h = [&handlers_called](error_code ec, reason_code rc, puback_props) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
|
||||
BOOST_CHECK_EQUAL(rc, reason_codes::empty);
|
||||
@ -94,9 +114,7 @@ BOOST_AUTO_TEST_CASE(test_publish_immediate_cancellation) {
|
||||
|
||||
cancel_signal.emit(asio::cancellation_type::terminal);
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_publish_cancellation) {
|
||||
@ -108,7 +126,7 @@ BOOST_AUTO_TEST_CASE(test_publish_cancellation) {
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
asio::cancellation_signal cancel_signal;
|
||||
|
||||
auto h = [&](error_code ec, reason_code rc, puback_props) {
|
||||
auto h = [&handlers_called](error_code ec, reason_code rc, puback_props) {
|
||||
++handlers_called;
|
||||
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
|
||||
BOOST_CHECK_EQUAL(rc, reason_codes::empty);
|
||||
@ -132,9 +150,7 @@ BOOST_AUTO_TEST_CASE(test_publish_cancellation) {
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
@ -82,9 +82,7 @@ BOOST_AUTO_TEST_CASE(clear_waiting_on_pubrel) {
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(
|
||||
handlers_called, expected_handlers_called
|
||||
);
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
|
||||
|
55
test/unit/test/utf8_mqtt.cpp
Normal file
55
test/unit/test/utf8_mqtt.cpp
Normal file
@ -0,0 +1,55 @@
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <async_mqtt5/detail/utf8_mqtt.hpp>
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(utf8_mqtt/*, *boost::unit_test::disabled()*/)
|
||||
|
||||
std::string to_str(async_mqtt5::detail::code_point cp) {
|
||||
return cp.size == 1 ? std::string { char(cp.val) }
|
||||
: cp.size == 2 ? std::string { char((cp.val >> 6) | 0xC0), char((cp.val & 0x3F) | 0x80) }
|
||||
: cp.size == 3 ? std::string {
|
||||
char((cp.val >> 12) | 0xE0),
|
||||
char(((cp.val >> 6) & 0x3F) | 0x80),
|
||||
char((cp.val & 0x3F) | 0x80)
|
||||
}
|
||||
: std::string { // cp.size == 4
|
||||
char((cp.val >> 18) | 0xF0),
|
||||
char(((cp.val >> 12) & 0x3F) | 0x80),
|
||||
char(((cp.val >> 6) & 0x3F) | 0x80),
|
||||
char((cp.val & 0x3F) | 0x80)
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
async_mqtt5::detail::code_point cp(int32_t val) {
|
||||
return { val, uint32_t(val < 0x80 ? 1 : val < 0x800 ? 2 : val < 0xFFFF ? 3 : /* val < 0x10FFFF */ 4) };
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(utf8_string_validation) {
|
||||
using namespace async_mqtt5::detail;
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8("stringy"), true);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(""), true);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(1))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(31))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(32))), true);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(126))), true);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(127))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(159))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(160))), true);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(55296))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(57343))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(64976))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(65007))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(65008))), true);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(131070))), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_mqtt_utf8(to_str(cp(131071))), false);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(utf8_topic_validation) {
|
||||
using namespace async_mqtt5::detail;
|
||||
BOOST_CHECK_EQUAL(is_valid_utf8_topic(""), false);
|
||||
BOOST_CHECK_EQUAL(is_valid_utf8_topic("topic"), true);
|
||||
}
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END();
|
Reference in New Issue
Block a user