diff --git a/include/async_mqtt5/detail/any_authenticator.hpp b/include/async_mqtt5/detail/any_authenticator.hpp index 0774d4e..fb569d1 100644 --- a/include/async_mqtt5/detail/any_authenticator.hpp +++ b/include/async_mqtt5/detail/any_authenticator.hpp @@ -14,7 +14,7 @@ using error_code = boost::system::error_code; namespace detail { using auth_handler_type = asio::any_completion_handler< - void(error_code ec, std::string auth_data) + void (error_code ec, std::string auth_data) >; template @@ -91,14 +91,19 @@ public: auth_step_e step, std::string data, CompletionToken&& token ) { - using Signature = void(error_code, std::string); + using Signature = void (error_code, std::string); - auto initiate = [this](auto handler, auth_step_e step, std::string data) { - _auth_fun->async_auth(step, std::move(data), std::move(handler)); + auto initiation = []( + auto handler, any_authenticator& self, + auth_step_e step, std::string data + ) { + self._auth_fun->async_auth( + step, std::move(data), std::move(handler) + ); }; return asio::async_initiate( - initiate, token, step, std::move(data) + initiation, token, std::ref(*this), step, std::move(data) ); } }; diff --git a/include/async_mqtt5/impl/assemble_op.hpp b/include/async_mqtt5/impl/assemble_op.hpp index 4e5133c..89fd58f 100644 --- a/include/async_mqtt5/impl/assemble_op.hpp +++ b/include/async_mqtt5/impl/assemble_op.hpp @@ -15,7 +15,6 @@ #include #include -#include #include @@ -49,19 +48,21 @@ public: template class assemble_op { using client_service = ClientService; + using handler_type = Handler; + struct on_read {}; static constexpr size_t max_packet_size = 65536; client_service& _svc; - Handler _handler; + handler_type _handler; std::string& _read_buff; data_span& _data_span; public: assemble_op( - client_service& svc, Handler&& handler, + client_service& svc, handler_type&& handler, std::string& read_buff, data_span& active_span ) : _svc(svc), @@ -77,7 +78,7 @@ public: return _svc.get_executor(); } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } @@ -214,10 +215,7 @@ private: ) { asio::dispatch( get_executor(), - asio::prepend( - std::move(_handler), ec, control_code, - first, last - ) + asio::prepend(std::move(_handler), ec, control_code, first, last) ); } }; diff --git a/include/async_mqtt5/impl/autoconnect_stream.hpp b/include/async_mqtt5/impl/autoconnect_stream.hpp index f9c7b92..8b5136c 100644 --- a/include/async_mqtt5/impl/autoconnect_stream.hpp +++ b/include/async_mqtt5/impl/autoconnect_stream.hpp @@ -24,6 +24,7 @@ template < > class autoconnect_stream { public: + using self_type = autoconnect_stream; using stream_type = StreamType; using stream_context_type = StreamContext; using executor_type = typename stream_type::executor_type; @@ -38,16 +39,16 @@ private: stream_ptr _stream_ptr; stream_context_type& _stream_context; - template - friend class reconnect_op; - template friend class read_op; template friend class write_op; - template + template + friend class reconnect_op; + + template friend class disconnect_op; public: @@ -116,16 +117,17 @@ public: decltype(auto) async_read_some( const BufferType& buffer, duration wait_for, CompletionToken&& token ) { - auto initiation = [this]( - auto handler, const BufferType& buffer, duration wait_for + using Signature = void (error_code, size_t); + + auto initiation = []( + auto handler, self_type& self, + const BufferType& buffer, duration wait_for ) { - read_op { *this, std::move(handler) } - .perform(buffer, wait_for); + read_op { self, std::move(handler) }.perform(buffer, wait_for); }; - return asio::async_initiate( - std::move(initiation), token, - buffer, wait_for + return asio::async_initiate( + initiation, token, std::ref(*this), buffer, wait_for ); } @@ -133,14 +135,16 @@ public: decltype(auto) async_write( const BufferType& buffer, CompletionToken&& token ) { - auto initiation = [this]( - auto handler, const BufferType& buffer + using Signature = void (error_code, size_t); + + auto initiation = []( + auto handler, self_type& self, const BufferType& buffer ) { - write_op { *this, std::move(handler) }.perform(buffer); + write_op { self, std::move(handler) }.perform(buffer); }; - return asio::async_initiate( - std::move(initiation), token, buffer + return asio::async_initiate( + initiation, token, std::ref(*this), buffer ); } @@ -176,12 +180,14 @@ private: template decltype(auto) async_reconnect(stream_ptr s, CompletionToken&& token) { - auto initiation = [this](auto handler, stream_ptr s) { - reconnect_op { *this, std::move(handler) }.perform(s); + using Signature = void (error_code); + + auto initiation = [](auto handler, self_type& self, stream_ptr s) { + reconnect_op { self, std::move(handler) }.perform(s); }; - return asio::async_initiate( - std::move(initiation), token, s + return asio::async_initiate( + initiation, token, std::ref(*this), s ); } }; diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index 20a70a7..e252592 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -126,6 +126,7 @@ template < typename TlsContext = std::monostate > class client_service { + using self_type = client_service; using stream_context_type = stream_context; using stream_type = autoconnect_stream< StreamType, stream_context_type @@ -281,18 +282,21 @@ public: template decltype(auto) async_assemble(duration wait_for, CompletionToken&& token) { - auto initiation = [this] (auto handler, duration wait_for) mutable { + using Signature = void (error_code, uint8_t, byte_citer, byte_citer); + + auto initiation = [] ( + auto handler, self_type& self, + duration wait_for, std::string& read_buff, data_span& active_span + ) { assemble_op { - *this, std::move(handler), - _read_buff, _active_span + self, std::move(handler), read_buff, active_span }.perform(wait_for, asio::transfer_at_least(0)); }; - using Signature = void ( - error_code, uint8_t, byte_citer, byte_citer - ); return asio::async_initiate ( - std::move(initiation), token, wait_for + initiation, + token, std::ref(*this), + wait_for, std::ref(_read_buff), std::ref(_active_span) ); } diff --git a/include/async_mqtt5/impl/connect_op.hpp b/include/async_mqtt5/impl/connect_op.hpp index 50af454..042ce9d 100644 --- a/include/async_mqtt5/impl/connect_op.hpp +++ b/include/async_mqtt5/impl/connect_op.hpp @@ -7,26 +7,23 @@ #include #include #include +#include +#include #include -#include - #include #include #include #include -#include #include #include namespace async_mqtt5::detail { -template < - typename Stream, typename Handler -> +template class connect_op { static constexpr size_t min_packet_sz = 5; @@ -43,18 +40,22 @@ class connect_op { Stream& _stream; mqtt_context& _ctx; - Handler _handler; + + using handler_type = asio::any_completion_handler; + handler_type _handler; + std::unique_ptr _buffer_ptr; using endpoint = asio::ip::tcp::endpoint; using epoints = asio::ip::tcp::resolver::results_type; public: + template connect_op( Stream& stream, Handler&& handler, mqtt_context& ctx ) : _stream(stream), _ctx(ctx), - _handler(std::move(handler)) + _handler(std::forward(handler)) {} connect_op(connect_op&&) noexcept = default; @@ -65,13 +66,13 @@ public: return _stream.get_executor(); } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } using cancellation_slot_type = - asio::associated_cancellation_slot_t; + asio::associated_cancellation_slot_t; cancellation_slot_type get_cancellation_slot() const noexcept { return asio::get_associated_cancellation_slot(_handler); } @@ -233,6 +234,7 @@ public: auto varlen = decoders::type_parse( varlen_ptr, _buffer_ptr->cend(), decoders::basic::varint_ ); + if (!varlen) complete(asio::error::try_again); diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index 53a61a2..c9a11fe 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -19,8 +19,7 @@ namespace asio = boost::asio; template < typename ClientService, - typename DisconnectContext, - typename Handler + typename DisconnectContext > class disconnect_op { using client_service = ClientService; @@ -29,12 +28,15 @@ class disconnect_op { std::shared_ptr _svc_ptr; DisconnectContext _context; - cancellable_handler< - Handler, + + using handler_type = cancellable_handler< + asio::any_completion_handler, typename ClientService::executor_type - > _handler; + >; + handler_type _handler; public: + template disconnect_op( const std::shared_ptr& svc_ptr, DisconnectContext&& context, Handler&& handler @@ -52,7 +54,7 @@ public: return _svc_ptr->get_executor(); } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } diff --git a/include/async_mqtt5/impl/endpoints.hpp b/include/async_mqtt5/impl/endpoints.hpp index 4749624..134337f 100644 --- a/include/async_mqtt5/impl/endpoints.hpp +++ b/include/async_mqtt5/impl/endpoints.hpp @@ -6,10 +6,8 @@ #include #include #include - -#include - #include +#include #include @@ -26,13 +24,13 @@ class resolve_op { struct on_resolve {}; Owner& _owner; - Handler _handler; + + using handler_type = Handler; + handler_type _handler; public: - resolve_op( - Owner& owner, Handler&& handler) : - _owner(owner), - _handler(std::move(handler)) + resolve_op(Owner& owner, Handler&& handler) : + _owner(owner), _handler(std::move(handler)) {} resolve_op(resolve_op&&) noexcept = default; @@ -43,13 +41,13 @@ public: return _owner.get_executor(); } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } using cancellation_slot_type = - asio::associated_cancellation_slot_t; + asio::associated_cancellation_slot_t; cancellation_slot_type get_cancellation_slot() const noexcept { return asio::get_associated_cancellation_slot(_handler); } @@ -165,15 +163,14 @@ public: template decltype(auto) async_next_endpoint(CompletionToken&& token) { - auto initiation = [this](auto handler) { - resolve_op { *this, std::move(handler) }.perform(); + using Signature = void (error_code, epoints, authority_path); + + auto initiation = [](auto handler, endpoints& self) { + resolve_op { self, std::move(handler) }.perform(); }; - return asio::async_initiate< - CompletionToken, - void (error_code, epoints, authority_path) - >( - std::move(initiation), token + return asio::async_initiate( + initiation, token, std::ref(*this) ); } diff --git a/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp b/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp index c13bd58..73672fa 100644 --- a/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp +++ b/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp @@ -5,9 +5,7 @@ #include #include - #include -#include namespace async_mqtt5::decoders { diff --git a/include/async_mqtt5/impl/internal/codecs/message_encoders.hpp b/include/async_mqtt5/impl/internal/codecs/message_encoders.hpp index 3418f3e..0bd6892 100644 --- a/include/async_mqtt5/impl/internal/codecs/message_encoders.hpp +++ b/include/async_mqtt5/impl/internal/codecs/message_encoders.hpp @@ -5,9 +5,7 @@ #include #include - #include -#include namespace async_mqtt5::encoders { diff --git a/include/async_mqtt5/impl/re_auth_op.hpp b/include/async_mqtt5/impl/re_auth_op.hpp index ee9f8eb..6d9b80e 100644 --- a/include/async_mqtt5/impl/re_auth_op.hpp +++ b/include/async_mqtt5/impl/re_auth_op.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -25,9 +26,7 @@ class re_auth_op { any_authenticator& _auth; public: - re_auth_op( - const std::shared_ptr& svc_ptr - ) : + re_auth_op(const std::shared_ptr& svc_ptr) : _svc_ptr(svc_ptr), _auth(_svc_ptr->_stream_context.mqtt_context().authenticator) {} @@ -75,10 +74,10 @@ public: disconnect_rc_e::protocol_error ); - auto auth_step = auth_rc == reason_codes::success - ? auth_step_e::server_final - : auth_step_e::server_challenge; + auto auth_step = auth_rc == reason_codes::success ? + auth_step_e::server_final : auth_step_e::server_challenge; auto data = auth_props[prop::authentication_data].value_or(""); + return _auth.async_auth( auth_step, std::move(data), asio::prepend(std::move(*this), on_auth_data {}, auth_step) @@ -100,9 +99,9 @@ public: auth_props props; props[prop::authentication_method] = _auth.method(); props[prop::authentication_data] = std::move(data); - auto rc = auth_step == auth_step_e::client_initial - ? reason_codes::reauthenticate - : reason_codes::continue_authentication; + auto rc = auth_step == auth_step_e::client_initial ? + reason_codes::reauthenticate : reason_codes::continue_authentication; + auto packet = control_packet::of( no_pid, get_allocator(), encoders::encode_auth, @@ -110,6 +109,7 @@ public: ); const auto& wire_data = packet.wire_data(); + _svc_ptr->async_send( wire_data, no_serial, send_flag::none, @@ -121,10 +121,8 @@ private: void on_auth_fail(std::string message, disconnect_rc_e reason) { auto props = disconnect_props{}; props[prop::reason_string] = std::move(message); - async_disconnect( - reason, props, false, _svc_ptr, - asio::detached - ); + + async_disconnect(reason, props, false, _svc_ptr, asio::detached); } }; diff --git a/include/async_mqtt5/impl/read_op.hpp b/include/async_mqtt5/impl/read_op.hpp index 21fc938..b164fb5 100644 --- a/include/async_mqtt5/impl/read_op.hpp +++ b/include/async_mqtt5/impl/read_op.hpp @@ -19,7 +19,9 @@ class read_op { struct on_reconnect {}; Owner& _owner; - Handler _handler; + + using handler_type = Handler; + handler_type _handler; public: read_op(Owner& owner, Handler&& handler) : @@ -35,7 +37,7 @@ public: return _owner.get_executor(); } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } @@ -107,7 +109,8 @@ private: static bool should_reconnect(error_code ec) { using namespace asio::error; return ec == connection_aborted || ec == not_connected || - ec == timed_out || ec == connection_reset || ec == broken_pipe; + ec == timed_out || ec == connection_reset || + ec == broken_pipe || ec == asio::error::eof; } }; diff --git a/include/async_mqtt5/impl/reconnect_op.hpp b/include/async_mqtt5/impl/reconnect_op.hpp index 9e383c1..0438d2d 100644 --- a/include/async_mqtt5/impl/reconnect_op.hpp +++ b/include/async_mqtt5/impl/reconnect_op.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -19,7 +20,7 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; -template +template class reconnect_op { struct on_locked {}; struct on_next_endpoint {}; @@ -27,16 +28,20 @@ class reconnect_op { struct on_backoff {}; Owner& _owner; - Handler _handler; + + using handler_type = asio::any_completion_handler; + handler_type _handler; + std::unique_ptr _buffer_ptr; using endpoint = asio::ip::tcp::endpoint; using epoints = asio::ip::tcp::resolver::results_type; public: + template reconnect_op(Owner& owner, Handler&& handler) : _owner(owner), - _handler(std::move(handler)) + _handler(std::forward(handler)) {} reconnect_op(reconnect_op&&) noexcept = default; @@ -47,13 +52,13 @@ public: return _owner.get_executor(); } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } using cancellation_slot_type = - asio::associated_cancellation_slot_t; + asio::associated_cancellation_slot_t; cancellation_slot_type get_cancellation_slot() const noexcept { return asio::get_associated_cancellation_slot(_handler); } @@ -122,19 +127,19 @@ public: // wait max 5 seconds for the connect (handshake) op to finish _owner._connect_timer.expires_from_now(std::chrono::seconds(5)); - auto init_connect = [this, sptr]( - auto handler, const auto& eps, auto ap + auto init_connect = []( + auto handler, typename Owner::stream_type& stream, + mqtt_context& context, const epoints& eps, authority_path ap ) { - connect_op { - *sptr, std::move(handler), - _owner._stream_context.mqtt_context() - }.perform(eps, std::move(ap)); + connect_op { stream, std::move(handler), context } + .perform(eps, std::move(ap)); }; auto timed_connect = asioex::make_parallel_group( asio::async_initiate( - std::move(init_connect), asio::deferred, - std::move(eps), std::move(ap) + init_connect, asio::deferred, std::ref(*sptr), + std::ref(_owner._stream_context.mqtt_context()), + eps, std::move(ap) ), _owner._connect_timer.async_wait(asio::deferred) ); @@ -185,7 +190,6 @@ private: asio::prepend(std::move(_handler), ec) ); } - }; diff --git a/include/async_mqtt5/impl/write_op.hpp b/include/async_mqtt5/impl/write_op.hpp index 5492b5e..94bc707 100644 --- a/include/async_mqtt5/impl/write_op.hpp +++ b/include/async_mqtt5/impl/write_op.hpp @@ -15,13 +15,15 @@ class write_op { struct on_reconnect {}; Owner& _owner; - Handler _handler; + + using handler_type = Handler; + handler_type _handler; public: write_op( Owner& owner, Handler&& handler) : _owner(owner), - _handler(std::forward(handler)) + _handler(std::move(handler)) {} write_op(write_op&&) noexcept = default; @@ -32,7 +34,7 @@ public: return _owner.get_executor(); } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); }