diff --git a/include/async_mqtt5/detail/async_mutex.hpp b/include/async_mqtt5/detail/async_mutex.hpp index 659f63c..b9e6515 100644 --- a/include/async_mqtt5/detail/async_mutex.hpp +++ b/include/async_mqtt5/detail/async_mutex.hpp @@ -28,20 +28,20 @@ private: // Handler with assigned tracking executor. // Objects of this type are type-erased by any_completion_handler // and stored in the waiting queue. - template + template class tracked_op { - tracking_type _executor; - std::decay_t _handler; + tracking_type _executor; + Handler _handler; public: - tracked_op(Handler&& h) : - _executor(tracking_executor(h)), + tracked_op(Handler&& h, const Executor& ex) : + _executor(tracking_executor(h, ex)), _handler(std::move(h)) {} tracked_op(tracked_op&&) noexcept = default; tracked_op(const tracked_op&) = delete; - using executor_type = tracking_type; + using executor_type = tracking_type; executor_type get_executor() const noexcept { return _executor; } @@ -186,7 +186,7 @@ private: // to asio::post to avoid recursion. void execute_or_queue(auto handler) noexcept { std::unique_lock l { _thread_mutex }; - tracked_op h { std::move(handler) }; + tracked_op h { std::move(handler), _ex }; if (_locked.load(std::memory_order_relaxed)) { _waiting.emplace_back(std::move(h)); auto slot = _waiting.back().get_cancellation_slot(); diff --git a/include/async_mqtt5/detail/async_traits.hpp b/include/async_mqtt5/detail/async_traits.hpp index 0741f71..76b065d 100644 --- a/include/async_mqtt5/detail/async_traits.hpp +++ b/include/async_mqtt5/detail/async_traits.hpp @@ -27,18 +27,19 @@ void assign_tls_sni(const authority_path& ap, TlsContext& ctx, TlsStream& s); namespace detail { -template -auto tracking_executor(const Handler& handler) { +template +auto tracking_executor(const Handler& handler, const DfltExecutor& ex) { return asio::prefer( - asio::get_associated_executor(handler), + asio::get_associated_executor(handler, ex), asio::execution::outstanding_work.tracked ); } -template -using tracking_type = std::decay_t< - decltype(tracking_executor(std::declval())) ->; +template +using tracking_type = typename asio::prefer_result< + asio::associated_executor_t, + asio::execution::outstanding_work_t::tracked_t +>::type; template concept has_async_write = requires(T a) { diff --git a/include/async_mqtt5/detail/cancellable_handler.hpp b/include/async_mqtt5/detail/cancellable_handler.hpp index 0f8f7be..02b072f 100644 --- a/include/async_mqtt5/detail/cancellable_handler.hpp +++ b/include/async_mqtt5/detail/cancellable_handler.hpp @@ -21,19 +21,19 @@ template < > class cancellable_handler { struct op_state { - std::decay_t _handler; - tracking_type _handler_ex; + Handler _handler; + tracking_type _handler_ex; cancellable_handler* _owner; - op_state(Handler&& handler, cancellable_handler* owner) : - _handler(std::move(handler)), - _handler_ex(tracking_executor(_handler)), + op_state( + Handler&& handler, const Executor& ex, cancellable_handler* owner + ) : _handler(std::move(handler)), + _handler_ex(tracking_executor(_handler, ex)), _owner(owner) {} - void cancel_op(asio::cancellation_type_t ct) { - if (ct != asio::cancellation_type_t::none) - _owner->cancel(); + void cancel_op() { + _owner->cancel(); } }; @@ -46,16 +46,16 @@ class cancellable_handler { {} void operator()(asio::cancellation_type_t type) { - auto op = []( - std::weak_ptr wptr, - asio::cancellation_type_t type - ) { + if ((type & asio::cancellation_type_t::terminal) == + asio::cancellation_type_t::none) + return; + auto op = [wptr = _state_weak_ptr]() { if (auto state = wptr.lock()) - state->cancel_op(type); + state->cancel_op(); }; asio::dispatch( _executor, - asio::prepend(std::move(op), _state_weak_ptr, type) + std::move(op) ); } }; @@ -67,12 +67,12 @@ public: cancellable_handler(Handler&& handler, const Executor& ex) { auto alloc = asio::get_associated_allocator(handler); _state = std::allocate_shared( - alloc, std::move(handler),this + alloc, std::move(handler), ex, this ); - _executor = ex; auto slot = asio::get_associated_cancellation_slot(_state->_handler); if (slot.is_connected()) slot.template emplace(_state, ex); + _executor = ex; } cancellable_handler(cancellable_handler&& other) noexcept : @@ -102,8 +102,9 @@ public: if (empty()) return; auto h = std::move(_state->_handler); - _state.reset(); asio::get_associated_cancellation_slot(h).clear(); + auto handler_ex = std::move(_state->_handler_ex); + _state.reset(); auto op = std::apply([&h](auto... args) { return asio::prepend( @@ -111,7 +112,7 @@ public: ); }, CancelArgs {}); - asio::dispatch(std::move(_executor), std::move(op)); + asio::dispatch(handler_ex, std::move(op)); } template @@ -119,11 +120,12 @@ public: if (empty()) return; auto h = std::move(_state->_handler); - _state.reset(); asio::get_associated_cancellation_slot(h).clear(); + auto handler_ex = std::move(_state->_handler_ex); + _state.reset(); asio::dispatch( - std::move(_executor), + handler_ex, asio::prepend(std::move(h), std::forward(args)...) ); } @@ -133,14 +135,14 @@ public: if (empty()) return; auto h = std::move(_state->_handler); - _state.reset(); asio::get_associated_cancellation_slot(h).clear(); + auto handler_ex = std::move(_state->_handler_ex); + _state.reset(); asio::post( - std::move(_executor), + _executor, asio::prepend(std::move(h), std::forward(args)...) ); - } }; diff --git a/include/async_mqtt5/impl/assemble_op.hpp b/include/async_mqtt5/impl/assemble_op.hpp index 4dedb8a..4e5133c 100644 --- a/include/async_mqtt5/impl/assemble_op.hpp +++ b/include/async_mqtt5/impl/assemble_op.hpp @@ -54,7 +54,7 @@ class assemble_op { static constexpr size_t max_packet_size = 65536; client_service& _svc; - std::decay_t _handler; + Handler _handler; std::string& _read_buff; data_span& _data_span; diff --git a/include/async_mqtt5/impl/connect_op.hpp b/include/async_mqtt5/impl/connect_op.hpp index ed98e96..50af454 100644 --- a/include/async_mqtt5/impl/connect_op.hpp +++ b/include/async_mqtt5/impl/connect_op.hpp @@ -43,7 +43,7 @@ class connect_op { Stream& _stream; mqtt_context& _ctx; - std::decay_t _handler; + Handler _handler; std::unique_ptr _buffer_ptr; using endpoint = asio::ip::tcp::endpoint; diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index 6dbf389..53a61a2 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -9,6 +9,7 @@ #include #include +#include #include @@ -28,7 +29,10 @@ class disconnect_op { std::shared_ptr _svc_ptr; DisconnectContext _context; - std::decay_t _handler; + cancellable_handler< + Handler, + typename ClientService::executor_type + > _handler; public: disconnect_op( @@ -37,7 +41,7 @@ public: ) : _svc_ptr(svc_ptr), _context(std::move(context)), - _handler(std::move(handler)) + _handler(std::move(handler), get_executor()) {} disconnect_op(disconnect_op&&) noexcept = default; @@ -103,10 +107,7 @@ public: private: void complete(error_code ec) { - asio::dispatch( - get_executor(), - asio::prepend(std::move(_handler), ec) - ); + _handler.complete(ec); } }; diff --git a/include/async_mqtt5/impl/endpoints.hpp b/include/async_mqtt5/impl/endpoints.hpp index f2c65af..4749624 100644 --- a/include/async_mqtt5/impl/endpoints.hpp +++ b/include/async_mqtt5/impl/endpoints.hpp @@ -26,7 +26,7 @@ class resolve_op { struct on_resolve {}; Owner& _owner; - std::decay_t _handler; + Handler _handler; public: resolve_op( diff --git a/include/async_mqtt5/impl/publish_send_op.hpp b/include/async_mqtt5/impl/publish_send_op.hpp index 2b7f19d..c18fc39 100644 --- a/include/async_mqtt5/impl/publish_send_op.hpp +++ b/include/async_mqtt5/impl/publish_send_op.hpp @@ -142,6 +142,12 @@ public: } void send_publish(control_packet publish) { + if (_handler.empty()) { // already cancelled + if constexpr (qos_type != qos_e::at_most_once) + _svc_ptr->free_pid(publish.packet_id()); + return; + } + const auto& wire_data = publish.wire_data(); _svc_ptr->async_send( wire_data, diff --git a/include/async_mqtt5/impl/read_op.hpp b/include/async_mqtt5/impl/read_op.hpp index dfff235..21fc938 100644 --- a/include/async_mqtt5/impl/read_op.hpp +++ b/include/async_mqtt5/impl/read_op.hpp @@ -19,7 +19,7 @@ class read_op { struct on_reconnect {}; Owner& _owner; - std::decay_t _handler; + Handler _handler; public: read_op(Owner& owner, Handler&& handler) : diff --git a/include/async_mqtt5/impl/reconnect_op.hpp b/include/async_mqtt5/impl/reconnect_op.hpp index c567f78..9e383c1 100644 --- a/include/async_mqtt5/impl/reconnect_op.hpp +++ b/include/async_mqtt5/impl/reconnect_op.hpp @@ -27,7 +27,7 @@ class reconnect_op { struct on_backoff {}; Owner& _owner; - std::decay_t _handler; + Handler _handler; std::unique_ptr _buffer_ptr; using endpoint = asio::ip::tcp::endpoint; diff --git a/include/async_mqtt5/impl/subscribe_op.hpp b/include/async_mqtt5/impl/subscribe_op.hpp index d8c2dc4..c7cf733 100644 --- a/include/async_mqtt5/impl/subscribe_op.hpp +++ b/include/async_mqtt5/impl/subscribe_op.hpp @@ -71,6 +71,9 @@ public: } void send_subscribe(control_packet subscribe) { + if (_handler.empty()) // already cancelled + return _svc_ptr->free_pid(subscribe.packet_id()); + const auto& wire_data = subscribe.wire_data(); _svc_ptr->async_send( wire_data, diff --git a/include/async_mqtt5/impl/unsubscribe_op.hpp b/include/async_mqtt5/impl/unsubscribe_op.hpp index 573e2a9..c5d891f 100644 --- a/include/async_mqtt5/impl/unsubscribe_op.hpp +++ b/include/async_mqtt5/impl/unsubscribe_op.hpp @@ -71,6 +71,9 @@ public: } void send_unsubscribe(control_packet unsubscribe) { + if (_handler.empty()) // already cancelled + return _svc_ptr->free_pid(unsubscribe.packet_id()); + const auto& wire_data = unsubscribe.wire_data(); _svc_ptr->async_send( wire_data, diff --git a/include/async_mqtt5/impl/write_op.hpp b/include/async_mqtt5/impl/write_op.hpp index 37e1785..5492b5e 100644 --- a/include/async_mqtt5/impl/write_op.hpp +++ b/include/async_mqtt5/impl/write_op.hpp @@ -15,7 +15,7 @@ class write_op { struct on_reconnect {}; Owner& _owner; - std::decay_t _handler; + Handler _handler; public: write_op( diff --git a/test/unit/include/test_common/test_stream.hpp b/test/unit/include/test_common/test_stream.hpp index fc352db..83effef 100644 --- a/test/unit/include/test_common/test_stream.hpp +++ b/test/unit/include/test_common/test_stream.hpp @@ -94,7 +94,7 @@ template class read_op { struct on_read {}; std::shared_ptr _stream_impl; - std::decay_t _handler; + Handler _handler; public: read_op( @@ -155,7 +155,7 @@ class write_op { struct on_write {}; std::shared_ptr _stream_impl; - std::decay_t _handler; + Handler _handler; public: write_op(