async_run

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina

Differential Revision: https://repo.mireo.local/D27342
This commit is contained in:
Bruno Iljazovic
2024-01-16 13:04:21 +01:00
parent 2f6751d162
commit 4f87b27861
29 changed files with 398 additions and 85 deletions

View File

@ -68,7 +68,7 @@ int main() {
c.credentials("<your-client-id>", "<client-username>", "<client-pwd>")
.brokers("<your-mqtt-broker>", 1883)
.run();
.async_run(asio::detached);
c.async_publish<async_mqtt5::qos_e::at_most_once>(
"<topic>", "Hello world!",

View File

@ -1,4 +1,4 @@
include(CTest)
if(BUILD_TESTING)
add_subdirectory(test/unit)
add_subdirectory(test)
endif()

View File

@ -62,7 +62,7 @@ The following example illustrates a simple scenario of configuring a Client and
c.credentials("clientid", "", "")
.brokers("mqtt.broker", 1883)
.run();
.async_run(asio::detached);
c.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "hello world!",

View File

@ -107,7 +107,7 @@ int main(int argc, char** argv) {
c.credentials("test-client", "username", "password")
.brokers("mqtt.broker", 1883)
.run();
.async_run(asio::detached);
run_with_callbacks(c);

View File

@ -165,7 +165,7 @@ int main(int argc, char** argv) {
c.credentials("test-client", "username", "password")
.brokers("mqtt.broker", 1883)
.run();
.async_run(asio::detached);
co_spawn(ioc.get_executor(), coroutine(c), asio::detached);
// or...

View File

@ -96,7 +96,7 @@ int main(int argc, char** argv) {
c.credentials("test-client", "", "")
.brokers("mqtt.broker", 1883)
.run();
.async_run(asio::detached);
for (int i = 0; i < thread_num - 1; ++i)
threads.emplace_back([&ioc] { ioc.run(); });
@ -108,4 +108,3 @@ int main(int argc, char** argv) {
for (auto& t : threads)
if (t.joinable()) t.join();
}

View File

@ -70,7 +70,7 @@ void publish_qos0_openssl_tls() {
c.credentials("test-qos0-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
@ -103,7 +103,7 @@ void publish_qos1_openssl_tls() {
c.credentials("test-qos1-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
@ -138,7 +138,7 @@ void publish_qos2_openssl_tls() {
c.credentials("test-qos2-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
@ -173,7 +173,7 @@ void subscribe_and_receive_openssl_tls(int num_receive) {
c.credentials("test-subscriber-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
std::vector<subscribe_topic> topics;

View File

@ -19,9 +19,9 @@ asio::awaitable<void> client_publisher(asio::io_context& ioc) {
async_mqtt5::mqtt_client<asio::ip::tcp::socket> client(ioc, "");
// Configure the ``__Client__``.
// It is mandatory to call brokers() and run() to configure the Brokers to connect to and start the Client.
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
client.brokers("mqtt.broker", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
.run(); // Start the client.
.async_run(asio::detached); // Start the client.
// Publish an Application Message with QoS 1.
auto [rc, props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(

View File

@ -19,9 +19,9 @@ asio::awaitable<void> client_receiver(asio::io_context& ioc) {
async_mqtt5::mqtt_client<asio::ip::tcp::socket> client(ioc, "");
// Configure the``__Client__``.
// It is mandatory to call brokers() and run() to configure the Brokers to connect to and start the Client.
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
client.brokers("mqtt.broker", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
.run(); // Start the client.
.async_run(asio::detached); // Start the client.
// Configure the request to subscribe to a Topic.
async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic {

View File

@ -24,7 +24,7 @@ void publish_qos0_tcp() {
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.connect_properties(std::move(props))
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
@ -52,7 +52,7 @@ void publish_qos1_tcp() {
c.credentials("test-qos1-tcp", "", "")
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
@ -79,7 +79,7 @@ void publish_qos2_tcp() {
c.credentials("test-qos2-tcp", "", "")
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
@ -107,7 +107,7 @@ void subscribe_and_receive_tcp(int num_receive) {
c.credentials("test-subscriber-tcp", "", "")
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_subscribe(
{ "test/mqtt-test", { qos_e::exactly_once } }, subscribe_props {},

View File

@ -25,7 +25,7 @@ void publish_qos0_websocket_tcp() {
c.credentials("test-qos0-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
@ -55,7 +55,7 @@ void publish_qos1_websocket_tcp() {
c.credentials("test-qos1-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
@ -86,7 +86,7 @@ void publish_qos2_websocket_tcp() {
c.credentials("test-qos2-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
@ -118,7 +118,7 @@ void subscribe_and_receive_websocket_tcp(int num_receive) {
c.credentials("test-subscriber-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
std::vector<subscribe_topic> topics;
topics.push_back(subscribe_topic{

View File

@ -85,7 +85,7 @@ void publish_qos0_websocket_tls() {
c.credentials("test-qos0-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", async_mqtt5::qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
@ -120,7 +120,7 @@ void publish_qos1_websocket_tls() {
c.credentials("test-qos1-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
@ -156,7 +156,7 @@ void publish_qos2_websocket_tls() {
c.credentials("test-qos2-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
@ -193,7 +193,7 @@ void subscribe_and_receive_websocket_tls(int num_receive) {
c.credentials("test-subscriber-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
std::vector<subscribe_topic> topics;
topics.push_back(subscribe_topic{

View File

@ -181,6 +181,9 @@ public:
return resend();
}
if (ec == asio::error::no_recovery)
_svc.cancel();
// errors, if any, are propagated to ops
for (auto& op : write_queue)
op.complete(ec);

View File

@ -184,7 +184,7 @@ public:
private:
using tls_context_type = TlsContext;
using receive_channel = asio::experimental::basic_concurrent_channel<
asio::any_io_executor,
executor_type,
channel_traits<>,
void (error_code, std::string, std::string, publish_props)
>;
@ -204,6 +204,8 @@ private:
template <typename ClientService>
friend class re_auth_op;
executor_type _executor;
stream_context_type _stream_context;
stream_type _stream;
@ -219,6 +221,8 @@ private:
asio::cancellation_signal _cancel_ping;
asio::cancellation_signal _cancel_sentry;
asio::any_completion_handler<void(error_code)> _run_handler;
public:
client_service(
@ -226,6 +230,7 @@ public:
const std::string& /* cnf */,
tls_context_type tls_context = {}
) :
_executor(ex),
_stream_context(std::move(tls_context)),
_stream(ex, _stream_context),
_replies(ex),
@ -235,7 +240,7 @@ public:
{}
executor_type get_executor() const noexcept {
return _stream.get_executor();
return _executor;
}
template <
@ -302,7 +307,18 @@ public:
return _stream_context.connack_properties();
}
void run() {
template <typename Handler>
void run(Handler&& handler) {
_executor = asio::get_associated_executor(handler, _executor);
_run_handler = std::move(handler);
auto slot = asio::get_associated_cancellation_slot(_run_handler);
if (slot.is_connected()) {
using c_t = asio::cancellation_type_t;
slot.assign([&svc = *this](c_t c) {
if ((c & c_t::terminal) != c_t::none)
svc.cancel();
});
}
_stream.open();
_rec_channel.reset();
}
@ -320,6 +336,8 @@ public:
}
void cancel() {
if (!_run_handler) return;
_cancel_ping.emit(asio::cancellation_type::terminal);
_cancel_sentry.emit(asio::cancellation_type::terminal);
@ -327,6 +345,15 @@ public:
_replies.cancel_unanswered();
_async_sender.cancel();
_stream.close();
asio::get_associated_cancellation_slot(_run_handler).clear();
asio::post(
get_executor(),
asio::prepend(
std::move(_run_handler),
asio::error::operation_aborted
)
);
}
uint16_t allocate_pid() {
@ -420,9 +447,20 @@ public:
template <typename CompletionToken>
decltype(auto) async_channel_receive(CompletionToken&& token) {
// sig = void (error_code, std::string, std::string, publish_props)
return _rec_channel.async_receive(
std::forward<CompletionToken>(token)
using Signature =
void(error_code, std::string, std::string, publish_props);
auto initiation = [] (auto handler, self_type& self) {
auto ex = asio::get_associated_executor(
handler, self.get_executor()
);
return self._rec_channel.async_receive(
asio::bind_executor(ex, std::move(handler))
);
};
return asio::async_initiate<CompletionToken, Signature> (
initiation, token, std::ref(*this)
);
}

View File

@ -259,7 +259,7 @@ inline std::string encode_subscribe(
inline std::string encode_suback(
uint16_t packet_id,
std::vector<uint8_t>& reason_codes,
const std::vector<uint8_t>& reason_codes,
const suback_props& props
) {
@ -319,7 +319,7 @@ inline std::string encode_unsubscribe(
inline std::string encode_unsuback(
uint16_t packet_id,
std::vector<uint8_t>& reason_codes,
const std::vector<uint8_t>& reason_codes,
const unsuback_props& props
) {

View File

@ -64,10 +64,10 @@ public:
"Malformed Packet received from the Server"
);
if (
ec == asio::error::operation_aborted ||
ec == asio::error::no_recovery
)
if (ec == asio::error::no_recovery)
return _svc_ptr->cancel();
if (ec == asio::error::operation_aborted)
return;
dispatch(control_code, first, last);
@ -115,6 +115,8 @@ private:
re_auth_op { _svc_ptr }.perform(std::move(*rv));
}
break;
default:
assert(false);
}
perform();

View File

@ -156,13 +156,42 @@ public:
/**
* \brief Start the Client.
*
* \param token Completion token that will be used to produce a
* completion handler. The handler will be invoked when the operation completes.
*
* \par Handler signature
* The handler signature for this operation:
* \code
* void (__ERROR_CODE__)
* \endcode
*
* \par Completion condition
* The asynchronous operation will complete with
* `boost::asio::error::operation_aborted` when client is cancelled by calling
* \ref mqtt_client::async_disconnect, \ref mqtt_client::cancel, destruction or
* if non-recoverable error happens during connection attempt (e.g. access denied).
*
* \par Error codes
* The list of all possible error codes that this operation can finish with:\n
* - `boost::asio::error::operation_aborted`\n
*/
void run() {
_svc_ptr->run();
detail::ping_op { _svc_ptr }
.perform(read_timeout - std::chrono::seconds(1));
detail::read_message_op { _svc_ptr }.perform();
detail::sentry_op { _svc_ptr }.perform();
template <typename CompletionToken>
decltype(auto) async_run(CompletionToken&& token) {
using Signature = void(error_code);
auto initiation = [] (auto handler, const clisvc_ptr& svc_ptr) {
svc_ptr->run(std::move(handler));
detail::ping_op { svc_ptr }
.perform(read_timeout - std::chrono::seconds(1));
detail::read_message_op { svc_ptr }.perform();
detail::sentry_op { svc_ptr }.perform();
};
return asio::async_initiate<CompletionToken, Signature>(
initiation, token, _svc_ptr
);
}
/**
@ -172,12 +201,10 @@ public:
* with `boost::asio::error::operation_aborted`.
*
* \attention This function has terminal effects and will close the Client.
* The Client cannot be used before calling \ref mqtt_client::run again.
* The Client cannot be used before calling \ref mqtt_client::async_run again.
*/
void cancel() {
get_executor().execute([svc_ptr = _svc_ptr]() {
svc_ptr->cancel();
});
_svc_ptr->cancel();
}
/**
@ -188,9 +215,9 @@ public:
* closed normally.
*
* \attention This function takes action when the client is in a non-operational state,
* meaning the \ref run function has not been invoked.
* meaning the \ref async_run function has not been invoked.
* Furthermore, you can use this function after the \ref cancel function has been called,
* before the \ref run function is invoked again.
* before the \ref async_run function is invoked again.
*/
mqtt_client& will(will will) {
_svc_ptr->will(std::move(will));
@ -204,9 +231,9 @@ public:
* a User Name and Password.
*
* \attention This function takes action when the client is in a non-operational state,
* meaning the \ref run function has not been invoked.
* meaning the \ref async_run function has not been invoked.
* Furthermore, you can use this function after the \ref cancel function has been called,
* before the \ref run function is invoked again.
* before the \ref async_run function is invoked again.
*/
mqtt_client& credentials(
std::string client_id,
@ -233,9 +260,9 @@ public:
* explicitly specified in the `hosts` list.
*
* \attention This function takes action when the client is in a non-operational state,
* meaning the \ref run function has not been invoked.
* meaning the \ref async_run function has not been invoked.
* Furthermore, you can use this function after the \ref cancel function has been called,
* before the \ref run function is invoked again.
* before the \ref async_run function is invoked again.
*
* \par Example
* Some valid `hosts` string:
@ -260,9 +287,9 @@ public:
* and used for authentication. It needs to satisfy \__is_authenticator\__ concept.
*
* \attention This function takes action when the client is in a non-operational state,
* meaning the \ref run function has not been invoked.
* meaning the \ref async_run function has not been invoked.
* Furthermore, you can use this function after the \ref cancel function has been called,
* before the \ref run function is invoked again.
* before the \ref async_run function is invoked again.
*
*/
template <

View File

@ -2,6 +2,7 @@
#define ASYNC_MQTT5_TEST_PACKET_UTIL_HPP
#include <string>
#include <bitset>
#include <async_mqtt5/detail/control_packet.hpp>
@ -89,7 +90,7 @@ inline std::string to_readable_packet(std::string packet) {
auto& [topic, packet_id, flags, props, payload] = *publish;
stream << code_to_str(code);
stream << (packet_id ? " " + std::to_string(*packet_id) : "");
stream << "flags: " << flags;
stream << " flags: " << std::bitset<8>(flags);
return stream.str();
}

View File

@ -155,8 +155,8 @@ public:
stream << "Packet mismatch! Expected: "
<< to_readable_packet(expected[i])
<< " Received: "
<< to_readable_packet(std::string((const char*)it->data(), len));
log(stream.str());
<< to_readable_packet(std::string((const char*)it->data(), it->size()));
BOOST_CHECK_MESSAGE(false, stream.str());
}
}
}
@ -235,7 +235,9 @@ public:
private:
void shutdown() override { }
void shutdown() override {
_pending_read.complete(get_executor(), asio::error::operation_aborted, 0);
}
void launch_broker_ops() {
for (auto& op: _broker_side.pop_broker_ops(get_executor())) {

View File

@ -37,7 +37,7 @@ void cancel_async_receive() {
c.brokers("127.0.0.1", 1883)
.credentials("test-cli", "", "")
.run();
.async_run(asio::detached);
auto handler = [&handlers_called](
error_code ec, std::string, std::string, publish_props
@ -89,7 +89,7 @@ void cancel_async_publish() {
c.brokers("127.0.0.1", 1883)
.credentials("test-cli", "", "")
.run();
.async_run(asio::detached);
std::vector<asio::cancellation_signal> signals(3);
@ -177,6 +177,50 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_publish) {
cancel_async_publish<test::cancel_type::signal_emit>();
}
BOOST_AUTO_TEST_CASE(signal_emit_async_run) {
using namespace test;
constexpr int expected_handlers_called = 2;
int handlers_called = 0;
asio::io_context ioc;
using stream_type = asio::ip::tcp::socket;
using client_type = mqtt_client<stream_type>;
client_type c(ioc, "");
asio::cancellation_signal signal;
c.brokers("127.0.0.1", 1883)
.credentials("test-cli", "", "")
.async_run(
asio::bind_cancellation_slot(
signal.slot(),
[&handlers_called](error_code ec) {
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
handlers_called++;
}
)
);
c.async_publish<qos_e::at_most_once>(
"topic", "payload", retain_e::yes, {},
[&handlers_called](error_code ec) {
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
handlers_called++;
}
);
asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::milliseconds(10));
timer.async_wait([&](auto) {
signal.emit(asio::cancellation_type::terminal);
});
ioc.run_for(std::chrono::milliseconds(20));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
}
#ifdef BOOST_ASIO_HAS_CO_AWAIT
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
@ -191,7 +235,7 @@ BOOST_AUTO_TEST_CASE(rerunning_the_client) {
client_type c(ioc, "");
c.brokers("broker.hivemq.com,broker.hivemq.com", 1883) // to avoid reconnect backoff
.run();
.async_run(asio::detached);
auto [ec] = co_await c.async_publish<qos_e::at_most_once>(
"t", "p", retain_e::yes, publish_props {}, use_nothrow_awaitable
@ -205,7 +249,7 @@ BOOST_AUTO_TEST_CASE(rerunning_the_client) {
);
BOOST_CHECK(cec == asio::error::operation_aborted);
c.run();
c.async_run(asio::detached);
auto [rec] = co_await c.async_publish<qos_e::at_most_once>(
"ct", "cp", retain_e::yes, publish_props {}, use_nothrow_awaitable

View File

@ -115,7 +115,7 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) {
c.credentials("tcp-tester", "", "")
.brokers("broker.hivemq.com", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
asio::steady_timer timer(ioc);
timer.expires_after(std::chrono::seconds(5));
@ -151,7 +151,7 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) {
c.brokers("broker.hivemq.com/mqtt", 8000)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
asio::steady_timer timer(ioc);
timer.expires_after(std::chrono::seconds(5));
@ -187,7 +187,7 @@ BOOST_AUTO_TEST_CASE(openssl_tls_client_check) {
c.brokers("broker.hivemq.com", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
asio::steady_timer timer(ioc);
timer.expires_after(std::chrono::seconds(5));
@ -225,7 +225,7 @@ BOOST_AUTO_TEST_CASE(websocket_tls_client_check) {
c.brokers("broker.hivemq.com/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.run();
.async_run(asio::detached);
asio::steady_timer timer(ioc);
timer.expires_after(std::chrono::seconds(5));

View File

@ -0,0 +1,196 @@
#include <boost/test/unit_test.hpp>
#include <chrono>
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/steady_timer.hpp>
#include <async_mqtt5/impl/codecs/message_encoders.hpp>
#include <async_mqtt5.hpp>
#include "test_common/message_exchange.hpp"
#include "test_common/test_stream.hpp"
using namespace async_mqtt5;
BOOST_AUTO_TEST_SUITE(executors)
BOOST_AUTO_TEST_CASE(async_run) {
using test::after;
using std::chrono_literals::operator ""ms;
constexpr int expected_handlers_called = 9;
int handlers_called = 0;
// packets
auto connect = encoders::encode_connect(
"", std::nullopt, std::nullopt, 10, false, {}, std::nullopt
);
auto connack = encoders::encode_connack(
false, reason_codes::success.value(), {}
);
auto publish_0 = encoders::encode_publish(
0, "t_0", "p_0", qos_e::at_most_once, retain_e::no, dup_e::no, {}
);
auto publish_1 = encoders::encode_publish(
1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {}
);
auto puback = encoders::encode_puback(
1, reason_codes::success.value(), {}
);
auto publish_2 = encoders::encode_publish(
2, "t_2", "p_2", qos_e::exactly_once, retain_e::no, dup_e::no, {}
);
auto pubrec = encoders::encode_pubrec(
2, reason_codes::success.value(), {}
);
auto pubrel = encoders::encode_pubrel(
2, reason_codes::success.value(), {}
);
auto pubcomp = encoders::encode_pubcomp(
2, reason_codes::success.value(), {}
);
auto subscribe = encoders::encode_subscribe(
3, std::vector<subscribe_topic> { { "t_0", {} } }, {}
);
auto suback = encoders::encode_suback(
3, std::vector<uint8_t> { reason_codes::granted_qos_2.value() }, {}
);
auto unsubscribe = encoders::encode_unsubscribe(
1, std::vector<std::string> { "t_0" }, {}
);
auto unsuback = encoders::encode_unsuback(
1, std::vector<uint8_t> { reason_codes::success.value() }, {}
);
auto disconnect = encoders::encode_disconnect(
reason_codes::normal_disconnection.value(), {}
);
test::msg_exchange broker_side;
error_code success {};
broker_side
.expect(connect)
.complete_with(success, after(0ms))
.reply_with(connack, after(0ms))
.expect(subscribe, publish_0, publish_1, publish_2)
.complete_with(success, after(0ms))
.reply_with(puback, pubrec, suback, after(0ms))
.expect(pubrel)
.complete_with(success, after(0ms))
.reply_with(pubcomp, after(0ms))
.send(publish_0, after(50ms))
.expect(unsubscribe)
.complete_with(success, after(0ms))
.reply_with(unsuback, after(0ms))
.expect(publish_1)
.complete_with(success, after(0ms))
.expect(disconnect)
.complete_with(success, after(0ms))
;
asio::io_context ioc;
auto executor = ioc.get_executor();
auto& broker = asio::make_service<test::test_broker>(
ioc, executor, std::move(broker_side)
);
auto strand = asio::make_strand(ioc);
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.async_run(asio::bind_executor(
strand,
[&](error_code ec) {
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
));
c.async_publish<qos_e::at_most_once>(
"t_0", "p_0", retain_e::no, {},
[&](error_code ec) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
);
c.async_publish<qos_e::at_least_once>(
"t_1", "p_1", retain_e::no, {},
[&](error_code ec, reason_code rc, auto) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_MESSAGE(!rc, rc.message());
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
);
c.async_publish<qos_e::exactly_once>(
"t_2", "p_2", retain_e::no, {},
[&](error_code ec, reason_code rc, auto) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_MESSAGE(!rc, rc.message());
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
);
c.async_subscribe(
subscribe_topic { "t_0", {} }, {},
[&](error_code ec, std::vector<reason_code> rcs, auto) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message());
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
);
c.async_receive(
[&](
error_code ec,
std::string rec_topic, std::string rec_payload,
publish_props
) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_EQUAL("t_0", rec_topic);
BOOST_CHECK_EQUAL("p_0", rec_payload);
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
c.async_unsubscribe(
"t_0", {},
[&](error_code ec, std::vector<reason_code> rcs, auto) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message());
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
c.async_publish<qos_e::at_least_once>(
"t_1", "p_1", retain_e::no, {},
[&](error_code ec, reason_code rc, auto) {
BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted);
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
);
c.async_disconnect(
[&](error_code ec) {
BOOST_CHECK_MESSAGE(!ec, ec.message());
BOOST_CHECK(strand.running_in_this_thread());
++handlers_called;
}
);
}
);
}
);
ioc.run_for(500ms);
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
BOOST_CHECK(broker.received_all_expected());
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -52,7 +52,7 @@ BOOST_AUTO_TEST_CASE(test_malformed_publish) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::seconds(2));
@ -125,7 +125,7 @@ BOOST_AUTO_TEST_CASE(test_malformed_pubrel, *boost::unit_test::disabled()) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_receive(
[&](
@ -205,7 +205,7 @@ BOOST_AUTO_TEST_CASE(malformed_puback) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"t", "p", retain_e::no, publish_props {},
@ -302,7 +302,7 @@ BOOST_AUTO_TEST_CASE(malformed_pubrec_pubcomp) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"t", "p", retain_e::no, publish_props {},

View File

@ -73,7 +73,7 @@ void receive_publish() {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
.async_run(asio::detached);
c.async_receive(
[&](
@ -164,7 +164,7 @@ BOOST_AUTO_TEST_CASE(test_waiting_on_pubrel) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_receive(
[&](

View File

@ -79,7 +79,7 @@ BOOST_AUTO_TEST_CASE(ordering_after_reconnect) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"t_1", "p_1", retain_e::no, publish_props{},
@ -172,7 +172,7 @@ BOOST_AUTO_TEST_CASE(throttling) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"t_1", "p_1", retain_e::no, publish_props{},
@ -270,7 +270,7 @@ BOOST_AUTO_TEST_CASE(cancel_multiple_ops) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"t_1", "p_1", retain_e::no, publish_props{},

View File

@ -57,7 +57,7 @@ void test_receive_malformed_packet(
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::milliseconds(100));
@ -136,7 +136,7 @@ BOOST_AUTO_TEST_CASE(receive_disconnect) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::milliseconds(100));

View File

@ -63,7 +63,7 @@ BOOST_AUTO_TEST_CASE(resend_multiple_publishes) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"t", "p_1", retain_e::no, publish_props{},
@ -152,7 +152,7 @@ BOOST_AUTO_TEST_CASE(resend_pubrel) {
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"t_1", "p_1", retain_e::no, publish_props{},
@ -220,7 +220,7 @@ BOOST_AUTO_TEST_CASE(resend_subscribe) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_subscribe(
topics, subscribe_props,
@ -289,7 +289,7 @@ BOOST_AUTO_TEST_CASE(resend_unsubscribe) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.run();
.async_run(asio::detached);
c.async_unsubscribe(
topics, unsubscribe_props,

View File

@ -105,7 +105,7 @@ BOOST_AUTO_TEST_CASE(test_omitting_props) {
using client_type = mqtt_client<test::test_stream>;
client_type c(executor, "");
c.brokers("127.0.0.1")
.run();
.async_run(asio::detached);
asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::milliseconds(200));

View File

@ -271,6 +271,7 @@ BOOST_AUTO_TEST_CASE(test_publish_cancellation) {
asio::io_context ioc;
using client_service_type = test::test_service<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
svc_ptr->run([](error_code){});
asio::cancellation_signal cancel_signal;
auto h = [&handlers_called](error_code ec, reason_code rc, puback_props) {