From 4f87b2786110940de87bf5d3bff0fc8d2b6d22d7 Mon Sep 17 00:00:00 2001 From: Bruno Iljazovic Date: Tue, 16 Jan 2024 13:04:21 +0100 Subject: [PATCH] async_run Reviewers: ivica Reviewed By: ivica Subscribers: korina Differential Revision: https://repo.mireo.local/D27342 --- README.md | 2 +- cmake/dev-mode.cmake | 2 +- doc/qbk/01_intro.qbk | 2 +- example/callbacks.cpp | 2 +- example/cpp20_coroutines.cpp | 2 +- example/futures.cpp | 3 +- example/openssl_tls.cpp | 8 +- example/publisher.cpp | 4 +- example/receiver.cpp | 4 +- example/tcp.cpp | 8 +- example/websocket_tcp.cpp | 8 +- example/websocket_tls.cpp | 8 +- include/async_mqtt5/impl/async_sender.hpp | 3 + include/async_mqtt5/impl/client_service.hpp | 50 ++++- .../impl/codecs/message_encoders.hpp | 4 +- include/async_mqtt5/impl/read_message_op.hpp | 10 +- include/async_mqtt5/mqtt_client.hpp | 63 ++++-- test/include/test_common/packet_util.hpp | 3 +- test/include/test_common/test_broker.hpp | 8 +- test/integration/cancellation.cpp | 52 ++++- test/integration/coroutine.cpp | 8 +- test/integration/executors.cpp | 196 ++++++++++++++++++ test/integration/malformed_packet.cpp | 8 +- test/integration/publish_receive.cpp | 4 +- test/integration/publish_send.cpp | 6 +- test/integration/read_message.cpp | 4 +- test/integration/resending.cpp | 8 +- test/unit/disconnect_op.cpp | 2 +- test/unit/publish_send_op.cpp | 1 + 29 files changed, 398 insertions(+), 85 deletions(-) create mode 100644 test/integration/executors.cpp diff --git a/README.md b/README.md index 18d1ad3..9338ccc 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ int main() { c.credentials("", "", "") .brokers("", 1883) - .run(); + .async_run(asio::detached); c.async_publish( "", "Hello world!", diff --git a/cmake/dev-mode.cmake b/cmake/dev-mode.cmake index a6e82a3..92cbfc5 100644 --- a/cmake/dev-mode.cmake +++ b/cmake/dev-mode.cmake @@ -1,4 +1,4 @@ include(CTest) if(BUILD_TESTING) - add_subdirectory(test/unit) + add_subdirectory(test) endif() diff --git a/doc/qbk/01_intro.qbk b/doc/qbk/01_intro.qbk index a52fc90..f424212 100644 --- a/doc/qbk/01_intro.qbk +++ b/doc/qbk/01_intro.qbk @@ -62,7 +62,7 @@ The following example illustrates a simple scenario of configuring a Client and c.credentials("clientid", "", "") .brokers("mqtt.broker", 1883) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world!", diff --git a/example/callbacks.cpp b/example/callbacks.cpp index b336132..28e9fc4 100644 --- a/example/callbacks.cpp +++ b/example/callbacks.cpp @@ -107,7 +107,7 @@ int main(int argc, char** argv) { c.credentials("test-client", "username", "password") .brokers("mqtt.broker", 1883) - .run(); + .async_run(asio::detached); run_with_callbacks(c); diff --git a/example/cpp20_coroutines.cpp b/example/cpp20_coroutines.cpp index 33d21ec..2e1e50a 100644 --- a/example/cpp20_coroutines.cpp +++ b/example/cpp20_coroutines.cpp @@ -165,7 +165,7 @@ int main(int argc, char** argv) { c.credentials("test-client", "username", "password") .brokers("mqtt.broker", 1883) - .run(); + .async_run(asio::detached); co_spawn(ioc.get_executor(), coroutine(c), asio::detached); // or... diff --git a/example/futures.cpp b/example/futures.cpp index 216d691..d4e86c7 100644 --- a/example/futures.cpp +++ b/example/futures.cpp @@ -96,7 +96,7 @@ int main(int argc, char** argv) { c.credentials("test-client", "", "") .brokers("mqtt.broker", 1883) - .run(); + .async_run(asio::detached); for (int i = 0; i < thread_num - 1; ++i) threads.emplace_back([&ioc] { ioc.run(); }); @@ -108,4 +108,3 @@ int main(int argc, char** argv) { for (auto& t : threads) if (t.joinable()) t.join(); } - diff --git a/example/openssl_tls.cpp b/example/openssl_tls.cpp index 883e23d..cbef07f 100644 --- a/example/openssl_tls.cpp +++ b/example/openssl_tls.cpp @@ -70,7 +70,7 @@ void publish_qos0_openssl_tls() { c.credentials("test-qos0-openssl-tls", "", "") .brokers("emqtt.mireo.local", 8883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos0!", @@ -103,7 +103,7 @@ void publish_qos1_openssl_tls() { c.credentials("test-qos1-openssl-tls", "", "") .brokers("emqtt.mireo.local", 8883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos1!", @@ -138,7 +138,7 @@ void publish_qos2_openssl_tls() { c.credentials("test-qos2-openssl-tls", "", "") .brokers("emqtt.mireo.local", 8883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos2!", @@ -173,7 +173,7 @@ void subscribe_and_receive_openssl_tls(int num_receive) { c.credentials("test-subscriber-openssl-tls", "", "") .brokers("emqtt.mireo.local", 8883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); std::vector topics; diff --git a/example/publisher.cpp b/example/publisher.cpp index fbd626a..015c223 100644 --- a/example/publisher.cpp +++ b/example/publisher.cpp @@ -19,9 +19,9 @@ asio::awaitable client_publisher(asio::io_context& ioc) { async_mqtt5::mqtt_client client(ioc, ""); // Configure the ``__Client__``. - // It is mandatory to call brokers() and run() to configure the Brokers to connect to and start the Client. + // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. client.brokers("mqtt.broker", 1883) // Broker that we want to connect to. 1883 is the default TCP port. - .run(); // Start the client. + .async_run(asio::detached); // Start the client. // Publish an Application Message with QoS 1. auto [rc, props] = co_await client.async_publish( diff --git a/example/receiver.cpp b/example/receiver.cpp index 08f209c..e5d7315 100644 --- a/example/receiver.cpp +++ b/example/receiver.cpp @@ -19,9 +19,9 @@ asio::awaitable client_receiver(asio::io_context& ioc) { async_mqtt5::mqtt_client client(ioc, ""); // Configure the``__Client__``. - // It is mandatory to call brokers() and run() to configure the Brokers to connect to and start the Client. + // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. client.brokers("mqtt.broker", 1883) // Broker that we want to connect to. 1883 is the default TCP port. - .run(); // Start the client. + .async_run(asio::detached); // Start the client. // Configure the request to subscribe to a Topic. async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic { diff --git a/example/tcp.cpp b/example/tcp.cpp index 5d0a087..4917fb6 100644 --- a/example/tcp.cpp +++ b/example/tcp.cpp @@ -24,7 +24,7 @@ void publish_qos0_tcp() { .brokers("emqtt.mireo.local", 1883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) .connect_properties(std::move(props)) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos0!", @@ -52,7 +52,7 @@ void publish_qos1_tcp() { c.credentials("test-qos1-tcp", "", "") .brokers("emqtt.mireo.local", 1883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos1!", @@ -79,7 +79,7 @@ void publish_qos2_tcp() { c.credentials("test-qos2-tcp", "", "") .brokers("emqtt.mireo.local", 1883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos2!", @@ -107,7 +107,7 @@ void subscribe_and_receive_tcp(int num_receive) { c.credentials("test-subscriber-tcp", "", "") .brokers("emqtt.mireo.local", 1883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_subscribe( { "test/mqtt-test", { qos_e::exactly_once } }, subscribe_props {}, diff --git a/example/websocket_tcp.cpp b/example/websocket_tcp.cpp index f7b41ce..a89172d 100644 --- a/example/websocket_tcp.cpp +++ b/example/websocket_tcp.cpp @@ -25,7 +25,7 @@ void publish_qos0_websocket_tcp() { c.credentials("test-qos0-websocket-tcp", "", "") .brokers("emqtt.mireo.local/mqtt", 8083) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos0!", @@ -55,7 +55,7 @@ void publish_qos1_websocket_tcp() { c.credentials("test-qos1-websocket-tcp", "", "") .brokers("emqtt.mireo.local/mqtt", 8083) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos1!", @@ -86,7 +86,7 @@ void publish_qos2_websocket_tcp() { c.credentials("test-qos2-websocket-tcp", "", "") .brokers("emqtt.mireo.local/mqtt", 8083) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos2!", @@ -118,7 +118,7 @@ void subscribe_and_receive_websocket_tcp(int num_receive) { c.credentials("test-subscriber-websocket-tcp", "", "") .brokers("emqtt.mireo.local/mqtt", 8083) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); std::vector topics; topics.push_back(subscribe_topic{ diff --git a/example/websocket_tls.cpp b/example/websocket_tls.cpp index d510752..bed4aa3 100644 --- a/example/websocket_tls.cpp +++ b/example/websocket_tls.cpp @@ -85,7 +85,7 @@ void publish_qos0_websocket_tls() { c.credentials("test-qos0-websocket-tls", "", "") .brokers("emqtt.mireo.local/mqtt", 8884) .will({ "test/mqtt-test", "Client disconnected!", async_mqtt5::qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos0!", @@ -120,7 +120,7 @@ void publish_qos1_websocket_tls() { c.credentials("test-qos1-websocket-tls", "", "") .brokers("emqtt.mireo.local/mqtt", 8884) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos1!", @@ -156,7 +156,7 @@ void publish_qos2_websocket_tls() { c.credentials("test-qos2-websocket-tls", "", "") .brokers("emqtt.mireo.local/mqtt", 8884) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); c.async_publish( "test/mqtt-test", "hello world with qos2!", @@ -193,7 +193,7 @@ void subscribe_and_receive_websocket_tls(int num_receive) { c.credentials("test-subscriber-websocket-tls", "", "") .brokers("emqtt.mireo.local/mqtt", 8884) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); std::vector topics; topics.push_back(subscribe_topic{ diff --git a/include/async_mqtt5/impl/async_sender.hpp b/include/async_mqtt5/impl/async_sender.hpp index 33e1d95..0fad16e 100644 --- a/include/async_mqtt5/impl/async_sender.hpp +++ b/include/async_mqtt5/impl/async_sender.hpp @@ -181,6 +181,9 @@ public: return resend(); } + if (ec == asio::error::no_recovery) + _svc.cancel(); + // errors, if any, are propagated to ops for (auto& op : write_queue) op.complete(ec); diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index c35043b..641b3c0 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -184,7 +184,7 @@ public: private: using tls_context_type = TlsContext; using receive_channel = asio::experimental::basic_concurrent_channel< - asio::any_io_executor, + executor_type, channel_traits<>, void (error_code, std::string, std::string, publish_props) >; @@ -204,6 +204,8 @@ private: template friend class re_auth_op; + executor_type _executor; + stream_context_type _stream_context; stream_type _stream; @@ -219,6 +221,8 @@ private: asio::cancellation_signal _cancel_ping; asio::cancellation_signal _cancel_sentry; + asio::any_completion_handler _run_handler; + public: client_service( @@ -226,6 +230,7 @@ public: const std::string& /* cnf */, tls_context_type tls_context = {} ) : + _executor(ex), _stream_context(std::move(tls_context)), _stream(ex, _stream_context), _replies(ex), @@ -235,7 +240,7 @@ public: {} executor_type get_executor() const noexcept { - return _stream.get_executor(); + return _executor; } template < @@ -302,7 +307,18 @@ public: return _stream_context.connack_properties(); } - void run() { + template + void run(Handler&& handler) { + _executor = asio::get_associated_executor(handler, _executor); + _run_handler = std::move(handler); + auto slot = asio::get_associated_cancellation_slot(_run_handler); + if (slot.is_connected()) { + using c_t = asio::cancellation_type_t; + slot.assign([&svc = *this](c_t c) { + if ((c & c_t::terminal) != c_t::none) + svc.cancel(); + }); + } _stream.open(); _rec_channel.reset(); } @@ -320,6 +336,8 @@ public: } void cancel() { + if (!_run_handler) return; + _cancel_ping.emit(asio::cancellation_type::terminal); _cancel_sentry.emit(asio::cancellation_type::terminal); @@ -327,6 +345,15 @@ public: _replies.cancel_unanswered(); _async_sender.cancel(); _stream.close(); + + asio::get_associated_cancellation_slot(_run_handler).clear(); + asio::post( + get_executor(), + asio::prepend( + std::move(_run_handler), + asio::error::operation_aborted + ) + ); } uint16_t allocate_pid() { @@ -420,9 +447,20 @@ public: template decltype(auto) async_channel_receive(CompletionToken&& token) { - // sig = void (error_code, std::string, std::string, publish_props) - return _rec_channel.async_receive( - std::forward(token) + using Signature = + void(error_code, std::string, std::string, publish_props); + + auto initiation = [] (auto handler, self_type& self) { + auto ex = asio::get_associated_executor( + handler, self.get_executor() + ); + return self._rec_channel.async_receive( + asio::bind_executor(ex, std::move(handler)) + ); + }; + + return asio::async_initiate ( + initiation, token, std::ref(*this) ); } diff --git a/include/async_mqtt5/impl/codecs/message_encoders.hpp b/include/async_mqtt5/impl/codecs/message_encoders.hpp index 46b7183..0f6d3ea 100644 --- a/include/async_mqtt5/impl/codecs/message_encoders.hpp +++ b/include/async_mqtt5/impl/codecs/message_encoders.hpp @@ -259,7 +259,7 @@ inline std::string encode_subscribe( inline std::string encode_suback( uint16_t packet_id, - std::vector& reason_codes, + const std::vector& reason_codes, const suback_props& props ) { @@ -319,7 +319,7 @@ inline std::string encode_unsubscribe( inline std::string encode_unsuback( uint16_t packet_id, - std::vector& reason_codes, + const std::vector& reason_codes, const unsuback_props& props ) { diff --git a/include/async_mqtt5/impl/read_message_op.hpp b/include/async_mqtt5/impl/read_message_op.hpp index bd8d8d8..ff8aab2 100644 --- a/include/async_mqtt5/impl/read_message_op.hpp +++ b/include/async_mqtt5/impl/read_message_op.hpp @@ -64,10 +64,10 @@ public: "Malformed Packet received from the Server" ); - if ( - ec == asio::error::operation_aborted || - ec == asio::error::no_recovery - ) + if (ec == asio::error::no_recovery) + return _svc_ptr->cancel(); + + if (ec == asio::error::operation_aborted) return; dispatch(control_code, first, last); @@ -115,6 +115,8 @@ private: re_auth_op { _svc_ptr }.perform(std::move(*rv)); } break; + default: + assert(false); } perform(); diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index 2c08a5a..b3d911a 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -156,13 +156,42 @@ public: /** * \brief Start the Client. + * + * \param token Completion token that will be used to produce a + * completion handler. The handler will be invoked when the operation completes. + * + * \par Handler signature + * The handler signature for this operation: + * \code + * void (__ERROR_CODE__) + * \endcode + * + * \par Completion condition + * The asynchronous operation will complete with + * `boost::asio::error::operation_aborted` when client is cancelled by calling + * \ref mqtt_client::async_disconnect, \ref mqtt_client::cancel, destruction or + * if non-recoverable error happens during connection attempt (e.g. access denied). + * + * \par Error codes + * The list of all possible error codes that this operation can finish with:\n + * - `boost::asio::error::operation_aborted`\n */ - void run() { - _svc_ptr->run(); - detail::ping_op { _svc_ptr } - .perform(read_timeout - std::chrono::seconds(1)); - detail::read_message_op { _svc_ptr }.perform(); - detail::sentry_op { _svc_ptr }.perform(); + template + decltype(auto) async_run(CompletionToken&& token) { + using Signature = void(error_code); + + auto initiation = [] (auto handler, const clisvc_ptr& svc_ptr) { + svc_ptr->run(std::move(handler)); + + detail::ping_op { svc_ptr } + .perform(read_timeout - std::chrono::seconds(1)); + detail::read_message_op { svc_ptr }.perform(); + detail::sentry_op { svc_ptr }.perform(); + }; + + return asio::async_initiate( + initiation, token, _svc_ptr + ); } /** @@ -172,12 +201,10 @@ public: * with `boost::asio::error::operation_aborted`. * * \attention This function has terminal effects and will close the Client. - * The Client cannot be used before calling \ref mqtt_client::run again. + * The Client cannot be used before calling \ref mqtt_client::async_run again. */ void cancel() { - get_executor().execute([svc_ptr = _svc_ptr]() { - svc_ptr->cancel(); - }); + _svc_ptr->cancel(); } /** @@ -188,9 +215,9 @@ public: * closed normally. * * \attention This function takes action when the client is in a non-operational state, - * meaning the \ref run function has not been invoked. + * meaning the \ref async_run function has not been invoked. * Furthermore, you can use this function after the \ref cancel function has been called, - * before the \ref run function is invoked again. + * before the \ref async_run function is invoked again. */ mqtt_client& will(will will) { _svc_ptr->will(std::move(will)); @@ -204,9 +231,9 @@ public: * a User Name and Password. * * \attention This function takes action when the client is in a non-operational state, - * meaning the \ref run function has not been invoked. + * meaning the \ref async_run function has not been invoked. * Furthermore, you can use this function after the \ref cancel function has been called, - * before the \ref run function is invoked again. + * before the \ref async_run function is invoked again. */ mqtt_client& credentials( std::string client_id, @@ -233,9 +260,9 @@ public: * explicitly specified in the `hosts` list. * * \attention This function takes action when the client is in a non-operational state, - * meaning the \ref run function has not been invoked. + * meaning the \ref async_run function has not been invoked. * Furthermore, you can use this function after the \ref cancel function has been called, - * before the \ref run function is invoked again. + * before the \ref async_run function is invoked again. * * \par Example * Some valid `hosts` string: @@ -260,9 +287,9 @@ public: * and used for authentication. It needs to satisfy \__is_authenticator\__ concept. * * \attention This function takes action when the client is in a non-operational state, - * meaning the \ref run function has not been invoked. + * meaning the \ref async_run function has not been invoked. * Furthermore, you can use this function after the \ref cancel function has been called, - * before the \ref run function is invoked again. + * before the \ref async_run function is invoked again. * */ template < diff --git a/test/include/test_common/packet_util.hpp b/test/include/test_common/packet_util.hpp index 0e1e81e..886d3f2 100644 --- a/test/include/test_common/packet_util.hpp +++ b/test/include/test_common/packet_util.hpp @@ -2,6 +2,7 @@ #define ASYNC_MQTT5_TEST_PACKET_UTIL_HPP #include +#include #include @@ -89,7 +90,7 @@ inline std::string to_readable_packet(std::string packet) { auto& [topic, packet_id, flags, props, payload] = *publish; stream << code_to_str(code); stream << (packet_id ? " " + std::to_string(*packet_id) : ""); - stream << "flags: " << flags; + stream << " flags: " << std::bitset<8>(flags); return stream.str(); } diff --git a/test/include/test_common/test_broker.hpp b/test/include/test_common/test_broker.hpp index 668d573..fde47f5 100644 --- a/test/include/test_common/test_broker.hpp +++ b/test/include/test_common/test_broker.hpp @@ -155,8 +155,8 @@ public: stream << "Packet mismatch! Expected: " << to_readable_packet(expected[i]) << " Received: " - << to_readable_packet(std::string((const char*)it->data(), len)); - log(stream.str()); + << to_readable_packet(std::string((const char*)it->data(), it->size())); + BOOST_CHECK_MESSAGE(false, stream.str()); } } } @@ -235,7 +235,9 @@ public: private: - void shutdown() override { } + void shutdown() override { + _pending_read.complete(get_executor(), asio::error::operation_aborted, 0); + } void launch_broker_ops() { for (auto& op: _broker_side.pop_broker_ops(get_executor())) { diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index 0ca3514..a7fb964 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -37,7 +37,7 @@ void cancel_async_receive() { c.brokers("127.0.0.1", 1883) .credentials("test-cli", "", "") - .run(); + .async_run(asio::detached); auto handler = [&handlers_called]( error_code ec, std::string, std::string, publish_props @@ -89,7 +89,7 @@ void cancel_async_publish() { c.brokers("127.0.0.1", 1883) .credentials("test-cli", "", "") - .run(); + .async_run(asio::detached); std::vector signals(3); @@ -177,6 +177,50 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_publish) { cancel_async_publish(); } +BOOST_AUTO_TEST_CASE(signal_emit_async_run) { + using namespace test; + + constexpr int expected_handlers_called = 2; + int handlers_called = 0; + + asio::io_context ioc; + + using stream_type = asio::ip::tcp::socket; + using client_type = mqtt_client; + client_type c(ioc, ""); + + asio::cancellation_signal signal; + + c.brokers("127.0.0.1", 1883) + .credentials("test-cli", "", "") + .async_run( + asio::bind_cancellation_slot( + signal.slot(), + [&handlers_called](error_code ec) { + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + handlers_called++; + } + ) + ); + + c.async_publish( + "topic", "payload", retain_e::yes, {}, + [&handlers_called](error_code ec) { + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + handlers_called++; + } + ); + + asio::steady_timer timer(c.get_executor()); + timer.expires_after(std::chrono::milliseconds(10)); + timer.async_wait([&](auto) { + signal.emit(asio::cancellation_type::terminal); + }); + + ioc.run_for(std::chrono::milliseconds(20)); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); +} + #ifdef BOOST_ASIO_HAS_CO_AWAIT constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable); @@ -191,7 +235,7 @@ BOOST_AUTO_TEST_CASE(rerunning_the_client) { client_type c(ioc, ""); c.brokers("broker.hivemq.com,broker.hivemq.com", 1883) // to avoid reconnect backoff - .run(); + .async_run(asio::detached); auto [ec] = co_await c.async_publish( "t", "p", retain_e::yes, publish_props {}, use_nothrow_awaitable @@ -205,7 +249,7 @@ BOOST_AUTO_TEST_CASE(rerunning_the_client) { ); BOOST_CHECK(cec == asio::error::operation_aborted); - c.run(); + c.async_run(asio::detached); auto [rec] = co_await c.async_publish( "ct", "cp", retain_e::yes, publish_props {}, use_nothrow_awaitable diff --git a/test/integration/coroutine.cpp b/test/integration/coroutine.cpp index 7165cf2..bfd164f 100644 --- a/test/integration/coroutine.cpp +++ b/test/integration/coroutine.cpp @@ -115,7 +115,7 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) { c.credentials("tcp-tester", "", "") .brokers("broker.hivemq.com", 1883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); asio::steady_timer timer(ioc); timer.expires_after(std::chrono::seconds(5)); @@ -151,7 +151,7 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) { c.brokers("broker.hivemq.com/mqtt", 8000) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); asio::steady_timer timer(ioc); timer.expires_after(std::chrono::seconds(5)); @@ -187,7 +187,7 @@ BOOST_AUTO_TEST_CASE(openssl_tls_client_check) { c.brokers("broker.hivemq.com", 8883) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); asio::steady_timer timer(ioc); timer.expires_after(std::chrono::seconds(5)); @@ -225,7 +225,7 @@ BOOST_AUTO_TEST_CASE(websocket_tls_client_check) { c.brokers("broker.hivemq.com/mqtt", 8884) .will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once }) - .run(); + .async_run(asio::detached); asio::steady_timer timer(ioc); timer.expires_after(std::chrono::seconds(5)); diff --git a/test/integration/executors.cpp b/test/integration/executors.cpp new file mode 100644 index 0000000..b4d6c19 --- /dev/null +++ b/test/integration/executors.cpp @@ -0,0 +1,196 @@ +#include + +#include + +#include +#include +#include + +#include + +#include + +#include "test_common/message_exchange.hpp" +#include "test_common/test_stream.hpp" + +using namespace async_mqtt5; + +BOOST_AUTO_TEST_SUITE(executors) + +BOOST_AUTO_TEST_CASE(async_run) { + using test::after; + using std::chrono_literals::operator ""ms; + + constexpr int expected_handlers_called = 9; + int handlers_called = 0; + + // packets + auto connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 10, false, {}, std::nullopt + ); + auto connack = encoders::encode_connack( + false, reason_codes::success.value(), {} + ); + auto publish_0 = encoders::encode_publish( + 0, "t_0", "p_0", qos_e::at_most_once, retain_e::no, dup_e::no, {} + ); + auto publish_1 = encoders::encode_publish( + 1, "t_1", "p_1", qos_e::at_least_once, retain_e::no, dup_e::no, {} + ); + auto puback = encoders::encode_puback( + 1, reason_codes::success.value(), {} + ); + auto publish_2 = encoders::encode_publish( + 2, "t_2", "p_2", qos_e::exactly_once, retain_e::no, dup_e::no, {} + ); + auto pubrec = encoders::encode_pubrec( + 2, reason_codes::success.value(), {} + ); + auto pubrel = encoders::encode_pubrel( + 2, reason_codes::success.value(), {} + ); + auto pubcomp = encoders::encode_pubcomp( + 2, reason_codes::success.value(), {} + ); + auto subscribe = encoders::encode_subscribe( + 3, std::vector { { "t_0", {} } }, {} + ); + auto suback = encoders::encode_suback( + 3, std::vector { reason_codes::granted_qos_2.value() }, {} + ); + auto unsubscribe = encoders::encode_unsubscribe( + 1, std::vector { "t_0" }, {} + ); + auto unsuback = encoders::encode_unsuback( + 1, std::vector { reason_codes::success.value() }, {} + ); + auto disconnect = encoders::encode_disconnect( + reason_codes::normal_disconnection.value(), {} + ); + + test::msg_exchange broker_side; + error_code success {}; + + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(subscribe, publish_0, publish_1, publish_2) + .complete_with(success, after(0ms)) + .reply_with(puback, pubrec, suback, after(0ms)) + .expect(pubrel) + .complete_with(success, after(0ms)) + .reply_with(pubcomp, after(0ms)) + .send(publish_0, after(50ms)) + .expect(unsubscribe) + .complete_with(success, after(0ms)) + .reply_with(unsuback, after(0ms)) + .expect(publish_1) + .complete_with(success, after(0ms)) + .expect(disconnect) + .complete_with(success, after(0ms)) + ; + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + auto strand = asio::make_strand(ioc); + + using client_type = mqtt_client; + client_type c(executor, ""); + c.brokers("127.0.0.1") + .async_run(asio::bind_executor( + strand, + [&](error_code ec) { + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + )); + + c.async_publish( + "t_0", "p_0", retain_e::no, {}, + [&](error_code ec) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ); + + c.async_publish( + "t_1", "p_1", retain_e::no, {}, + [&](error_code ec, reason_code rc, auto) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rc, rc.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ); + + c.async_publish( + "t_2", "p_2", retain_e::no, {}, + [&](error_code ec, reason_code rc, auto) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rc, rc.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ); + + c.async_subscribe( + subscribe_topic { "t_0", {} }, {}, + [&](error_code ec, std::vector rcs, auto) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ); + + c.async_receive( + [&]( + error_code ec, + std::string rec_topic, std::string rec_payload, + publish_props + ) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_EQUAL("t_0", rec_topic); + BOOST_CHECK_EQUAL("p_0", rec_payload); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + c.async_unsubscribe( + "t_0", {}, + [&](error_code ec, std::vector rcs, auto) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK_MESSAGE(!rcs[0], rcs[0].message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + c.async_publish( + "t_1", "p_1", retain_e::no, {}, + [&](error_code ec, reason_code rc, auto) { + BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ); + c.async_disconnect( + [&](error_code ec) { + BOOST_CHECK_MESSAGE(!ec, ec.message()); + BOOST_CHECK(strand.running_in_this_thread()); + ++handlers_called; + } + ); + } + ); + } + ); + + ioc.run_for(500ms); + BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + BOOST_CHECK(broker.received_all_expected()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/integration/malformed_packet.cpp b/test/integration/malformed_packet.cpp index 0030860..a76282b 100644 --- a/test/integration/malformed_packet.cpp +++ b/test/integration/malformed_packet.cpp @@ -52,7 +52,7 @@ BOOST_AUTO_TEST_CASE(test_malformed_publish) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); asio::steady_timer timer(c.get_executor()); timer.expires_after(std::chrono::seconds(2)); @@ -125,7 +125,7 @@ BOOST_AUTO_TEST_CASE(test_malformed_pubrel, *boost::unit_test::disabled()) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_receive( [&]( @@ -205,7 +205,7 @@ BOOST_AUTO_TEST_CASE(malformed_puback) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_publish( "t", "p", retain_e::no, publish_props {}, @@ -302,7 +302,7 @@ BOOST_AUTO_TEST_CASE(malformed_pubrec_pubcomp) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_publish( "t", "p", retain_e::no, publish_props {}, diff --git a/test/integration/publish_receive.cpp b/test/integration/publish_receive.cpp index 5a2470f..ccafa87 100644 --- a/test/integration/publish_receive.cpp +++ b/test/integration/publish_receive.cpp @@ -73,7 +73,7 @@ void receive_publish() { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1") - .run(); + .async_run(asio::detached); c.async_receive( [&]( @@ -164,7 +164,7 @@ BOOST_AUTO_TEST_CASE(test_waiting_on_pubrel) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_receive( [&]( diff --git a/test/integration/publish_send.cpp b/test/integration/publish_send.cpp index d0e7406..5c745c5 100644 --- a/test/integration/publish_send.cpp +++ b/test/integration/publish_send.cpp @@ -79,7 +79,7 @@ BOOST_AUTO_TEST_CASE(ordering_after_reconnect) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_publish( "t_1", "p_1", retain_e::no, publish_props{}, @@ -172,7 +172,7 @@ BOOST_AUTO_TEST_CASE(throttling) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1") - .run(); + .async_run(asio::detached); c.async_publish( "t_1", "p_1", retain_e::no, publish_props{}, @@ -270,7 +270,7 @@ BOOST_AUTO_TEST_CASE(cancel_multiple_ops) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1") - .run(); + .async_run(asio::detached); c.async_publish( "t_1", "p_1", retain_e::no, publish_props{}, diff --git a/test/integration/read_message.cpp b/test/integration/read_message.cpp index 34434da..c30ad34 100644 --- a/test/integration/read_message.cpp +++ b/test/integration/read_message.cpp @@ -57,7 +57,7 @@ void test_receive_malformed_packet( using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); asio::steady_timer timer(c.get_executor()); timer.expires_after(std::chrono::milliseconds(100)); @@ -136,7 +136,7 @@ BOOST_AUTO_TEST_CASE(receive_disconnect) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); asio::steady_timer timer(c.get_executor()); timer.expires_after(std::chrono::milliseconds(100)); diff --git a/test/integration/resending.cpp b/test/integration/resending.cpp index d491bee..97271be 100644 --- a/test/integration/resending.cpp +++ b/test/integration/resending.cpp @@ -63,7 +63,7 @@ BOOST_AUTO_TEST_CASE(resend_multiple_publishes) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_publish( "t", "p_1", retain_e::no, publish_props{}, @@ -152,7 +152,7 @@ BOOST_AUTO_TEST_CASE(resend_pubrel) { client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_publish( "t_1", "p_1", retain_e::no, publish_props{}, @@ -220,7 +220,7 @@ BOOST_AUTO_TEST_CASE(resend_subscribe) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_subscribe( topics, subscribe_props, @@ -289,7 +289,7 @@ BOOST_AUTO_TEST_CASE(resend_unsubscribe) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff - .run(); + .async_run(asio::detached); c.async_unsubscribe( topics, unsubscribe_props, diff --git a/test/unit/disconnect_op.cpp b/test/unit/disconnect_op.cpp index f6fab96..91a347a 100644 --- a/test/unit/disconnect_op.cpp +++ b/test/unit/disconnect_op.cpp @@ -105,7 +105,7 @@ BOOST_AUTO_TEST_CASE(test_omitting_props) { using client_type = mqtt_client; client_type c(executor, ""); c.brokers("127.0.0.1") - .run(); + .async_run(asio::detached); asio::steady_timer timer(c.get_executor()); timer.expires_after(std::chrono::milliseconds(200)); diff --git a/test/unit/publish_send_op.cpp b/test/unit/publish_send_op.cpp index c12fb0d..9293e76 100644 --- a/test/unit/publish_send_op.cpp +++ b/test/unit/publish_send_op.cpp @@ -271,6 +271,7 @@ BOOST_AUTO_TEST_CASE(test_publish_cancellation) { asio::io_context ioc; using client_service_type = test::test_service; auto svc_ptr = std::make_shared(ioc.get_executor()); + svc_ptr->run([](error_code){}); asio::cancellation_signal cancel_signal; auto h = [&handlers_called](error_code ec, reason_code rc, puback_props) {