From 19aaba3cdca377cbb58b863118af10d621e45c0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Tue, 13 Feb 2024 10:31:18 +0100 Subject: [PATCH] Re-create client_service after stopping client's runloop and let it destroy asynchronously Summary: related to T11798, #5 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27882 --- .../async_mqtt5/detail/any_authenticator.hpp | 2 +- include/async_mqtt5/detail/internal_types.hpp | 9 +++ .../async_mqtt5/impl/autoconnect_stream.hpp | 4 + include/async_mqtt5/impl/client_service.hpp | 29 +++++++- include/async_mqtt5/impl/endpoints.hpp | 4 + include/async_mqtt5/mqtt_client.hpp | 73 ++++++++++--------- include/async_mqtt5/types.hpp | 3 +- test/integration/cancellation.cpp | 2 +- 8 files changed, 85 insertions(+), 41 deletions(-) diff --git a/include/async_mqtt5/detail/any_authenticator.hpp b/include/async_mqtt5/detail/any_authenticator.hpp index f48f8d4..4d1eb1f 100644 --- a/include/async_mqtt5/detail/any_authenticator.hpp +++ b/include/async_mqtt5/detail/any_authenticator.hpp @@ -84,7 +84,7 @@ public: class any_authenticator { std::string _method; - std::unique_ptr _auth_fun; + std::shared_ptr _auth_fun; public: any_authenticator() = default; diff --git a/include/async_mqtt5/detail/internal_types.hpp b/include/async_mqtt5/detail/internal_types.hpp index a34f80d..861638a 100644 --- a/include/async_mqtt5/detail/internal_types.hpp +++ b/include/async_mqtt5/detail/internal_types.hpp @@ -75,6 +75,15 @@ struct mqtt_ctx { connack_props ca_props; session_state state; any_authenticator authenticator; + + mqtt_ctx() = default; + + mqtt_ctx(const mqtt_ctx& other) : + creds(other.creds), will_msg(other.will_msg), + keep_alive(other.keep_alive), co_props(other.co_props), + ca_props {}, state {}, + authenticator(other.authenticator) + {} }; struct disconnect_ctx { diff --git a/include/async_mqtt5/impl/autoconnect_stream.hpp b/include/async_mqtt5/impl/autoconnect_stream.hpp index 2356eb6..4a92bac 100644 --- a/include/async_mqtt5/impl/autoconnect_stream.hpp +++ b/include/async_mqtt5/impl/autoconnect_stream.hpp @@ -81,6 +81,10 @@ public: _endpoints.brokers(std::move(hosts), default_port); } + void clone_endpoints(const autoconnect_stream& other) { + _endpoints.clone_servers(other._endpoints); + } + bool is_open() const noexcept { return lowest_layer(*_stream_ptr).is_open(); } diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index e2d75b3..5787ac8 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -36,10 +36,15 @@ class stream_context< using tls_context_type = TlsContext; mqtt_ctx _mqtt_context; - tls_context_type _tls_context; + std::shared_ptr _tls_context_ptr; + public: explicit stream_context(TlsContext tls_context) : - _tls_context(std::move(tls_context)) + _tls_context_ptr(std::make_shared(std::move(tls_context))) + {} + + stream_context(const stream_context& other) : + _mqtt_context(other._mqtt_context), _tls_context_ptr(other._tls_context_ptr) {} auto& mqtt_context() { @@ -51,7 +56,7 @@ public: } auto& tls_context() { - return _tls_context; + return *_tls_context_ptr; } auto& session_state() { @@ -115,6 +120,9 @@ class stream_context< mqtt_ctx _mqtt_context; public: explicit stream_context(std::monostate) {} + stream_context(const stream_context& other) : + _mqtt_context(other._mqtt_context) + {} auto& mqtt_context() { return _mqtt_context; @@ -231,6 +239,17 @@ private: asio::any_completion_handler _run_handler; + client_service(const client_service& other) : + _executor(other._executor), _stream_context(other._stream_context), + _stream(_executor, _stream_context), + _replies(_executor), + _async_sender(*this), + _active_span(_read_buff.cend(), _read_buff.cend()), + _rec_channel(_executor, std::numeric_limits::max()) + { + _stream.clone_endpoints(other._stream); + } + public: client_service( @@ -251,6 +270,10 @@ public: return _executor; } + auto dup() const { + return std::shared_ptr(new client_service(*this)); + } + template < typename Ctx = TlsContext, std::enable_if_t, bool> = true diff --git a/include/async_mqtt5/impl/endpoints.hpp b/include/async_mqtt5/impl/endpoints.hpp index abea53c..8990554 100644 --- a/include/async_mqtt5/impl/endpoints.hpp +++ b/include/async_mqtt5/impl/endpoints.hpp @@ -149,6 +149,10 @@ public: : _resolver(ex), _connect_timer(timer) {} + void clone_servers(const endpoints& other) { + _servers = other._servers; + } + using executor_type = asio::ip::tcp::resolver::executor_type; // NOTE: asio::ip::basic_resolver returns executor by value executor_type get_executor() { diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index 380ec78..6f1853c 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -46,8 +46,8 @@ private: using client_service_type = detail::client_service< stream_type, tls_context_type >; - using clisvc_ptr = std::shared_ptr; - clisvc_ptr _svc_ptr; + using impl_type = std::shared_ptr; + impl_type _impl; public: /** @@ -62,7 +62,7 @@ public: const std::string& cnf, TlsContext tls_context = {} ) : - _svc_ptr(std::make_shared( + _impl(std::make_shared( ex, cnf, std::move(tls_context) )) {} @@ -109,8 +109,8 @@ public: * \details Cancels this client first. Moved-from client can only be destructed. */ mqtt_client& operator=(mqtt_client&& other) noexcept { - cancel(); - _svc_ptr = std::move(other._svc_ptr); + _impl->cancel(); + _impl = std::move(other._impl); return *this; } @@ -120,14 +120,15 @@ public: * \details Automatically calls \ref mqtt_client::cancel. */ ~mqtt_client() { - if (_svc_ptr) cancel(); + if (_impl) + _impl->cancel(); } /** * \brief Get the executor associated with the object. */ executor_type get_executor() const noexcept { - return _svc_ptr->get_executor(); + return _impl->get_executor(); } @@ -149,7 +150,7 @@ public: > decltype(auto) tls_context() { - return _svc_ptr->tls_context(); + return _impl->tls_context(); } /** @@ -178,16 +179,16 @@ public: decltype(auto) async_run(CompletionToken&& token) { using Signature = void(error_code); - auto initiation = [] (auto handler, const clisvc_ptr& svc_ptr) { - svc_ptr->run(std::move(handler)); + auto initiation = [] (auto handler, const impl_type& impl) { + impl->run(std::move(handler)); - detail::ping_op { svc_ptr }.perform(); - detail::read_message_op { svc_ptr }.perform(); - detail::sentry_op { svc_ptr }.perform(); + detail::ping_op { impl }.perform(); + detail::read_message_op { impl }.perform(); + detail::sentry_op { impl }.perform(); }; return asio::async_initiate( - initiation, token, _svc_ptr + initiation, token, _impl ); } @@ -201,7 +202,9 @@ public: * The Client cannot be used before calling \ref mqtt_client::async_run again. */ void cancel() { - _svc_ptr->cancel(); + auto impl = _impl; + _impl = impl->dup(); + impl->cancel(); } /** @@ -217,7 +220,7 @@ public: * before the \ref async_run function is invoked again. */ mqtt_client& will(will will) { - _svc_ptr->will(std::move(will)); + _impl->will(std::move(will)); return *this; } @@ -236,7 +239,7 @@ public: std::string client_id, std::string username = "", std::string password = "" ) { - _svc_ptr->credentials( + _impl->credentials( std::move(client_id), std::move(username), std::move(password) ); @@ -271,7 +274,7 @@ public: * */ mqtt_client& brokers(std::string hosts, uint16_t default_port = 1883) { - _svc_ptr->brokers(std::move(hosts), default_port); + _impl->brokers(std::move(hosts), default_port); return *this; } @@ -294,7 +297,7 @@ public: std::enable_if_t, bool> = true > mqtt_client& authenticator(Authenticator&& authenticator) { - _svc_ptr->authenticator(std::forward(authenticator)); + _impl->authenticator(std::forward(authenticator)); return *this; } @@ -318,7 +321,7 @@ public: * */ mqtt_client& keep_alive(uint16_t seconds) { - _svc_ptr->keep_alive(seconds); + _impl->keep_alive(seconds); return *this; } @@ -328,7 +331,7 @@ public: * \see See \__CONNECT_PROPS\__ for all eligible properties. */ mqtt_client& connect_properties(connect_props props) { - _svc_ptr->connect_properties(std::move(props)); + _impl->connect_properties(std::move(props)); return *this; } @@ -350,7 +353,7 @@ public: std::integral_constant prop, prop::value_type_t

value ) { - _svc_ptr->connect_property(prop) = value; + _impl->connect_property(prop) = value; return *this; } @@ -361,7 +364,7 @@ public: * \note If \ref authenticator was not called, this method does nothing. */ void re_authenticate() { - detail::re_auth_op { _svc_ptr }.perform(); + detail::re_auth_op { _impl }.perform(); } /** @@ -385,7 +388,7 @@ public: const auto& connack_property( std::integral_constant prop ) const { - return _svc_ptr->connack_property(prop); + return _impl->connack_property(prop); } /** @@ -394,7 +397,7 @@ public: * \see See \__CONNACK_PROPS\__ for all eligible properties. */ const connack_props& connack_properties() const { - return _svc_ptr->connack_properties(); + return _impl->connack_properties(); } /** @@ -477,11 +480,11 @@ public: auto initiation = [] ( auto handler, std::string topic, std::string payload, retain_e retain, const publish_props& props, - const clisvc_ptr& svc_ptr + const impl_type& impl ) { detail::publish_send_op< client_service_type, decltype(handler), qos_type - > { svc_ptr, std::move(handler) } + > { impl, std::move(handler) } .perform( std::move(topic), std::move(payload), retain, props @@ -490,7 +493,7 @@ public: return asio::async_initiate( initiation, token, - std::move(topic), std::move(payload), retain, props, _svc_ptr + std::move(topic), std::move(payload), retain, props, _impl ); } @@ -555,14 +558,14 @@ public: auto initiation = [] ( auto handler, const std::vector& topics, - const subscribe_props& props, const clisvc_ptr& impl + const subscribe_props& props, const impl_type& impl ) { detail::subscribe_op { impl, std::move(handler) } .perform(topics, props); }; return asio::async_initiate( - initiation, token, topics, props, _svc_ptr + initiation, token, topics, props, _impl ); } @@ -684,14 +687,14 @@ public: auto initiation = []( auto handler, const std::vector& topics, - const unsubscribe_props& props, const clisvc_ptr& impl + const unsubscribe_props& props, const impl_type& impl ) { detail::unsubscribe_op { impl, std::move(handler) } .perform(topics, props); }; return asio::async_initiate( - initiation, token, topics, props, _svc_ptr + initiation, token, topics, props, _impl ); } @@ -797,7 +800,7 @@ public: template decltype(auto) async_receive(CompletionToken&& token) { // Sig = void (error_code, std::string, std::string, publish_props) - return _svc_ptr->async_channel_receive( + return _impl->async_channel_receive( std::forward(token) ); } @@ -851,9 +854,11 @@ public: disconnect_rc_e reason_code, const disconnect_props& props, CompletionToken&& token ) { + auto impl = _impl; + _impl = impl->dup(); return detail::async_disconnect( detail::disconnect_rc_e(static_cast(reason_code)), - props, true, _svc_ptr, + props, true, impl, std::forward(token) ); } diff --git a/include/async_mqtt5/types.hpp b/include/async_mqtt5/types.hpp index 508d5ae..f084536 100644 --- a/include/async_mqtt5/types.hpp +++ b/include/async_mqtt5/types.hpp @@ -323,9 +323,8 @@ public: _qos(qos), _retain(retain) {} - // just to make sure that we don't accidentally make a copy /// Copy constructor. - will(const will&) = delete; + will(const will&) = default; /// Move constructor. will(will&&) noexcept = default; diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index b282294..acc0b08 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -243,7 +243,7 @@ using namespace std::chrono; constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable); -BOOST_FIXTURE_TEST_CASE(rerunning_the_client, shared_test_data, *boost::unit_test::disabled()) { +BOOST_FIXTURE_TEST_CASE(rerunning_the_client, shared_test_data) { // packets auto disconnect = encoders::encode_disconnect(uint8_t(0x00), {});