diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index 2a558d8..655d160 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -20,9 +20,11 @@ #include #include #include +#include #include #include + namespace async_mqtt5::detail { namespace asio = boost::asio; @@ -438,6 +440,44 @@ public: return _async_sender.next_serial_num(); } + bool subscriptions_present() const { + return _stream_context.session_state().subscriptions_present(); + } + + void subscriptions_present(bool present) { + _stream_context.session_state().subscriptions_present(present); + } + + void update_session_state() { + auto& session_state = _stream_context.session_state(); + + if (!session_state.session_present()) { + _replies.clear_pending_pubrels(); + session_state.session_present(true); + + if (session_state.subscriptions_present()) { + channel_store_error(client::error::session_expired); + session_state.subscriptions_present(false); + } + } + + _cancel_ping.emit(asio::cancellation_type::total); + } + + bool channel_store(decoders::publish_message message) { + auto& [topic, packet_id, flags, props, payload] = message; + return _rec_channel.try_send( + error_code {}, std::move(topic), + std::move(payload), std::move(props) + ); + } + + bool channel_store_error(error_code ec) { + return _rec_channel.try_send( + ec, std::string {}, std::string {}, publish_props {} + ); + } + template decltype(auto) async_send( const BufferType& buffer, @@ -477,65 +517,37 @@ public: ); } - bool subscriptions_present() const { - return _stream_context.session_state().subscriptions_present(); - } - - void subscriptions_present(bool present) { - _stream_context.session_state().subscriptions_present(present); - } - - void update_session_state() { - auto& session_state = _stream_context.session_state(); - - if (!session_state.session_present()) { - _replies.clear_pending_pubrels(); - session_state.session_present(true); - - if (session_state.subscriptions_present()) { - channel_store_error(client::error::session_expired); - session_state.subscriptions_present(false); - } - } - - _cancel_ping.emit(asio::cancellation_type::total); - } - - bool channel_store(decoders::publish_message message) { - auto& [topic, packet_id, flags, props, payload] = message; - return _rec_channel.try_send( - error_code {}, std::move(topic), - std::move(payload), std::move(props) - ); - } - - bool channel_store_error(error_code ec) { - return _rec_channel.try_send( - ec, std::string {}, std::string {}, publish_props {} - ); - } - template decltype(auto) async_channel_receive(CompletionToken&& token) { - using Signature = - void(error_code, std::string, std::string, publish_props); - - auto initiation = [] (auto handler, self_type& self) { - auto ex = asio::get_associated_executor( - handler, self.get_executor() - ); - return self._rec_channel.async_receive( - asio::bind_executor(ex, std::move(handler)) - ); - }; - - return asio::async_initiate ( - initiation, token, std::ref(*this) - ); + return _rec_channel.async_receive(std::forward(token)); } }; +template +class initiate_async_run { + std::shared_ptr _svc_ptr; +public: + explicit initiate_async_run(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)) + {} + + using executor_type = typename ClientService::executor_type; + executor_type get_executor() const noexcept { + return _svc_ptr->get_executor(); + } + + template + void operator()(Handler&& handler) { + auto ex = asio::get_associated_executor(handler, get_executor()); + + _svc_ptr->run(std::move(handler)); + detail::ping_op { _svc_ptr, ex }.perform(); + detail::read_message_op { _svc_ptr, ex }.perform(); + detail::sentry_op { _svc_ptr, ex }.perform(); + } +}; + } // namespace async_mqtt5::detail diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index 881e5ad..6f38c31 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -52,10 +52,10 @@ class disconnect_op { public: template disconnect_op( - const std::shared_ptr& svc_ptr, + std::shared_ptr svc_ptr, DisconnectContext&& context, Handler&& handler ) : - _svc_ptr(svc_ptr), _context(std::move(context)), + _svc_ptr(std::move(svc_ptr)), _context(std::move(context)), _handler(std::move(handler), _svc_ptr->get_executor()) { auto slot = asio::get_associated_cancellation_slot(_handler); @@ -189,10 +189,10 @@ class terminal_disconnect_op { public: terminal_disconnect_op( - const std::shared_ptr& svc_ptr, + std::shared_ptr svc_ptr, Handler&& handler ) : - _svc_ptr(svc_ptr), + _svc_ptr(std::move(svc_ptr)), _timer(new asio::steady_timer(_svc_ptr->get_executor())), _handler(std::move(handler), _svc_ptr->get_executor()) {} @@ -254,6 +254,34 @@ public: } }; +template +class initiate_async_disconnect { + std::shared_ptr _svc_ptr; +public: + explicit initiate_async_disconnect(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)) + {} + + using executor_type = typename ClientService::executor_type; + executor_type get_executor() const noexcept { + return _svc_ptr->get_executor(); + } + + template + void operator()( + Handler&& handler, + disconnect_rc_e rc, const disconnect_props& props + ) { + auto ctx = disconnect_ctx { rc, props, terminal }; + if constexpr (terminal) + terminal_disconnect_op { _svc_ptr, std::move(handler) } + .perform(std::move(ctx)); + else + disconnect_op { _svc_ptr, std::move(ctx), std::move(handler) } + .perform(); + } +}; + template decltype(auto) async_disconnect( disconnect_rc_e reason_code, const disconnect_props& props, @@ -261,20 +289,9 @@ decltype(auto) async_disconnect( CompletionToken&& token ) { using Signature = void (error_code); - - auto initiation = []( - auto handler, disconnect_ctx ctx, - const std::shared_ptr& svc_ptr - ) { - disconnect_op { - svc_ptr, std::move(ctx), std::move(handler) - }.perform(); - }; - return asio::async_initiate( - initiation, token, - disconnect_ctx { reason_code, props, false }, - svc_ptr + initiate_async_disconnect(svc_ptr), token, + reason_code, props ); } @@ -285,20 +302,9 @@ decltype(auto) async_terminal_disconnect( CompletionToken&& token ) { using Signature = void (error_code); - - auto initiation = []( - auto handler, disconnect_ctx ctx, - const std::shared_ptr& svc_ptr - ) { - terminal_disconnect_op { - svc_ptr, std::move(handler) - }.perform(std::move(ctx)); - }; - return asio::async_initiate( - initiation, token, - disconnect_ctx { reason_code, props, true }, - svc_ptr + initiate_async_disconnect(svc_ptr), token, + reason_code, props ); } diff --git a/include/async_mqtt5/impl/ping_op.hpp b/include/async_mqtt5/impl/ping_op.hpp index 2e017e4..ffce930 100644 --- a/include/async_mqtt5/impl/ping_op.hpp +++ b/include/async_mqtt5/impl/ping_op.hpp @@ -44,13 +44,13 @@ private: public: ping_op( - const std::shared_ptr& svc_ptr, - const executor_type& ex + std::shared_ptr svc_ptr, + executor_type ex ) : - _svc_ptr(svc_ptr), _executor(ex), + _svc_ptr(std::move(svc_ptr)), _executor(ex), _ping_timer(new asio::steady_timer(_svc_ptr->get_executor())), _cancellation_state( - svc_ptr->_cancel_ping.slot(), + _svc_ptr->_cancel_ping.slot(), asio::enable_total_cancellation {}, asio::enable_total_cancellation {} ) diff --git a/include/async_mqtt5/impl/publish_rec_op.hpp b/include/async_mqtt5/impl/publish_rec_op.hpp index 13ad207..94cb4f2 100644 --- a/include/async_mqtt5/impl/publish_rec_op.hpp +++ b/include/async_mqtt5/impl/publish_rec_op.hpp @@ -43,8 +43,8 @@ class publish_rec_op { decoders::publish_message _message; public: - explicit publish_rec_op(const std::shared_ptr& svc_ptr) : - _svc_ptr(svc_ptr) + explicit publish_rec_op(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)) {} publish_rec_op(publish_rec_op&&) noexcept = default; diff --git a/include/async_mqtt5/impl/publish_send_op.hpp b/include/async_mqtt5/impl/publish_send_op.hpp index 808dd2a..ced7f5f 100644 --- a/include/async_mqtt5/impl/publish_send_op.hpp +++ b/include/async_mqtt5/impl/publish_send_op.hpp @@ -75,10 +75,10 @@ class publish_send_op { public: publish_send_op( - const std::shared_ptr& svc_ptr, + std::shared_ptr svc_ptr, Handler&& handler ) : - _svc_ptr(svc_ptr), + _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler), _svc_ptr->get_executor()) { auto slot = asio::get_associated_cancellation_slot(_handler); @@ -469,6 +469,32 @@ private: } }; +template +class initiate_async_publish { + std::shared_ptr _svc_ptr; +public: + explicit initiate_async_publish(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)) + {} + + using executor_type = typename ClientService::executor_type; + executor_type get_executor() const noexcept { + return _svc_ptr->get_executor(); + } + + template + void operator()( + Handler&& handler, + std::string topic, std::string payload, + retain_e retain, const publish_props& props + ) { + detail::publish_send_op { + _svc_ptr, std::move(handler) + }.perform( + std::move(topic), std::move(payload), retain, props + ); + } +}; } // end namespace async_mqtt5::detail diff --git a/include/async_mqtt5/impl/re_auth_op.hpp b/include/async_mqtt5/impl/re_auth_op.hpp index 8f355c6..10fb414 100644 --- a/include/async_mqtt5/impl/re_auth_op.hpp +++ b/include/async_mqtt5/impl/re_auth_op.hpp @@ -35,8 +35,8 @@ class re_auth_op { any_authenticator& _auth; public: - explicit re_auth_op(const std::shared_ptr& svc_ptr) : - _svc_ptr(svc_ptr), + explicit re_auth_op(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)), _auth(_svc_ptr->_stream_context.mqtt_context().authenticator) {} diff --git a/include/async_mqtt5/impl/read_message_op.hpp b/include/async_mqtt5/impl/read_message_op.hpp index d6f01d3..516ab9f 100644 --- a/include/async_mqtt5/impl/read_message_op.hpp +++ b/include/async_mqtt5/impl/read_message_op.hpp @@ -41,10 +41,10 @@ private: executor_type _executor; public: read_message_op( - const std::shared_ptr& svc_ptr, - const executor_type& ex + std::shared_ptr svc_ptr, + executor_type ex ) : - _svc_ptr(svc_ptr), _executor(ex) + _svc_ptr(std::move(svc_ptr)), _executor(ex) {} read_message_op(read_message_op&&) noexcept = default; diff --git a/include/async_mqtt5/impl/read_op.hpp b/include/async_mqtt5/impl/read_op.hpp index 43db04f..00eda20 100644 --- a/include/async_mqtt5/impl/read_op.hpp +++ b/include/async_mqtt5/impl/read_op.hpp @@ -35,8 +35,7 @@ class read_op { public: read_op(Owner& owner, Handler&& handler) : - _owner(owner), - _handler(std::move(handler)) + _owner(owner), _handler(std::move(handler)) {} read_op(read_op&&) = default; diff --git a/include/async_mqtt5/impl/reconnect_op.hpp b/include/async_mqtt5/impl/reconnect_op.hpp index 689e5d8..01f52ad 100644 --- a/include/async_mqtt5/impl/reconnect_op.hpp +++ b/include/async_mqtt5/impl/reconnect_op.hpp @@ -72,8 +72,7 @@ class reconnect_op { public: template reconnect_op(Owner& owner, Handler&& handler) : - _owner(owner), - _handler(std::forward(handler)) + _owner(owner), _handler(std::move(handler)) {} reconnect_op(reconnect_op&&) = default; diff --git a/include/async_mqtt5/impl/replies.hpp b/include/async_mqtt5/impl/replies.hpp index a8f50f7..93f3e44 100644 --- a/include/async_mqtt5/impl/replies.hpp +++ b/include/async_mqtt5/impl/replies.hpp @@ -93,7 +93,7 @@ private: public: template - explicit replies(const Executor& ex) : _ex(ex) {} + explicit replies(Executor ex) : _ex(ex) {} replies(replies&&) = default; replies(const replies&) = delete; diff --git a/include/async_mqtt5/impl/sentry_op.hpp b/include/async_mqtt5/impl/sentry_op.hpp index 3af73a8..1eafebd 100644 --- a/include/async_mqtt5/impl/sentry_op.hpp +++ b/include/async_mqtt5/impl/sentry_op.hpp @@ -43,10 +43,9 @@ private: public: sentry_op( - const std::shared_ptr& svc_ptr, - const executor_type& ex + std::shared_ptr svc_ptr, executor_type ex ) : - _svc_ptr(svc_ptr), _executor(ex), + _svc_ptr(std::move(svc_ptr)), _executor(ex), _sentry_timer(new asio::steady_timer(_svc_ptr->get_executor())) {} diff --git a/include/async_mqtt5/impl/subscribe_op.hpp b/include/async_mqtt5/impl/subscribe_op.hpp index 2b89c10..7b87a0b 100644 --- a/include/async_mqtt5/impl/subscribe_op.hpp +++ b/include/async_mqtt5/impl/subscribe_op.hpp @@ -52,10 +52,10 @@ class subscribe_op { public: subscribe_op( - const std::shared_ptr& svc_ptr, + std::shared_ptr svc_ptr, Handler&& handler ) : - _svc_ptr(svc_ptr), + _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler), _svc_ptr->get_executor()) { auto slot = asio::get_associated_cancellation_slot(_handler); @@ -302,6 +302,28 @@ private: } }; +template +class initiate_async_subscribe { + std::shared_ptr _svc_ptr; +public: + explicit initiate_async_subscribe(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)) + {} + + using executor_type = typename ClientService::executor_type; + executor_type get_executor() const noexcept { + return _svc_ptr->get_executor(); + } + + template + void operator()( + Handler&& handler, + const std::vector& topics, const subscribe_props& props + ) { + detail::subscribe_op { _svc_ptr, std::move(handler) } + .perform(topics, props); + } +}; } // end namespace async_mqtt5::detail diff --git a/include/async_mqtt5/impl/unsubscribe_op.hpp b/include/async_mqtt5/impl/unsubscribe_op.hpp index ffa56cf..8ba3c9a 100644 --- a/include/async_mqtt5/impl/unsubscribe_op.hpp +++ b/include/async_mqtt5/impl/unsubscribe_op.hpp @@ -46,10 +46,10 @@ class unsubscribe_op { public: unsubscribe_op( - const std::shared_ptr& svc_ptr, + std::shared_ptr svc_ptr, Handler&& handler ) : - _svc_ptr(svc_ptr), + _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler), _svc_ptr->get_executor()) { auto slot = asio::get_associated_cancellation_slot(_handler); @@ -237,6 +237,29 @@ private: } }; +template +class initiate_async_unsubscribe { + std::shared_ptr _svc_ptr; +public: + explicit initiate_async_unsubscribe(std::shared_ptr svc_ptr) : + _svc_ptr(std::move(svc_ptr)) + {} + + using executor_type = typename ClientService::executor_type; + executor_type get_executor() const noexcept { + return _svc_ptr->get_executor(); + } + + template + void operator()( + Handler&& handler, + const std::vector& topics, const unsubscribe_props& props + ) { + detail::unsubscribe_op { _svc_ptr, std::move(handler) } + .perform(topics, props); + } +}; + } // end namespace async_mqtt5::detail diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index 6a2f33f..e7f8f72 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -18,10 +18,9 @@ #include #include -#include +#include #include #include -#include namespace async_mqtt5 { @@ -202,18 +201,8 @@ public: > decltype(auto) async_run(CompletionToken&& token = {}) { using Signature = void (error_code); - - auto initiation = [] (auto handler, const impl_type& impl) { - auto ex = asio::get_associated_executor(handler, impl->get_executor()); - - impl->run(std::move(handler)); - detail::ping_op { impl, ex }.perform(); - detail::read_message_op { impl, ex }.perform(); - detail::sentry_op { impl, ex }.perform(); - }; - return asio::async_initiate( - initiation, token, _impl + detail::initiate_async_run(_impl), token ); } @@ -511,24 +500,10 @@ public: CompletionToken&& token = {} ) { using Signature = detail::on_publish_signature; - - auto initiation = [] ( - auto handler, std::string topic, std::string payload, - retain_e retain, const publish_props& props, - const impl_type& impl - ) { - detail::publish_send_op< - client_service_type, decltype(handler), qos_type - > { impl, std::move(handler) } - .perform( - std::move(topic), std::move(payload), - retain, props - ); - }; - return asio::async_initiate( - initiation, token, - std::move(topic), std::move(payload), retain, props, _impl + detail::initiate_async_publish(_impl), + token, + std::move(topic), std::move(payload), retain, props ); } @@ -599,17 +574,9 @@ public: using Signature = void ( error_code, std::vector, suback_props ); - - auto initiation = [] ( - auto handler, const std::vector& topics, - const subscribe_props& props, const impl_type& impl - ) { - detail::subscribe_op { impl, std::move(handler) } - .perform(topics, props); - }; - return asio::async_initiate( - initiation, token, topics, props, _impl + detail::initiate_async_subscribe(_impl), token, + topics, props ); } @@ -745,18 +712,9 @@ public: using Signature = void ( error_code, std::vector, unsuback_props ); - - auto initiation = []( - auto handler, - const std::vector& topics, - const unsubscribe_props& props, const impl_type& impl - ) { - detail::unsubscribe_op { impl, std::move(handler) } - .perform(topics, props); - }; - return asio::async_initiate( - initiation, token, topics, props, _impl + detail::initiate_async_unsubscribe(_impl), token, + topics, props ); } @@ -879,10 +837,7 @@ public: typename asio::default_completion_token::type > decltype(auto) async_receive(CompletionToken&& token = {}) { - // Sig = void (error_code, std::string, std::string, publish_props) - return _impl->async_channel_receive( - std::forward(token) - ); + return _impl->async_channel_receive(std::forward(token)); } /** @@ -1008,6 +963,7 @@ public: disconnect_props {}, std::forward(token) ); } + };