correct tracking executor

Summary: - don't resend per-operation cancelled publish and (un)subscribe operations

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina

Differential Revision: https://repo.mireo.local/D26724
This commit is contained in:
Bruno Iljazovic
2023-11-28 11:04:00 +01:00
parent a0edbdabf2
commit f6248eea40
14 changed files with 67 additions and 51 deletions

View File

@@ -28,20 +28,20 @@ private:
// Handler with assigned tracking executor. // Handler with assigned tracking executor.
// Objects of this type are type-erased by any_completion_handler // Objects of this type are type-erased by any_completion_handler
// and stored in the waiting queue. // and stored in the waiting queue.
template <typename Handler> template <typename Handler, typename Executor>
class tracked_op { class tracked_op {
tracking_type<Handler> _executor; tracking_type<Handler, Executor> _executor;
std::decay_t<Handler> _handler; Handler _handler;
public: public:
tracked_op(Handler&& h) : tracked_op(Handler&& h, const Executor& ex) :
_executor(tracking_executor(h)), _executor(tracking_executor(h, ex)),
_handler(std::move(h)) _handler(std::move(h))
{} {}
tracked_op(tracked_op&&) noexcept = default; tracked_op(tracked_op&&) noexcept = default;
tracked_op(const tracked_op&) = delete; tracked_op(const tracked_op&) = delete;
using executor_type = tracking_type<Handler>; using executor_type = tracking_type<Handler, Executor>;
executor_type get_executor() const noexcept { executor_type get_executor() const noexcept {
return _executor; return _executor;
} }
@@ -186,7 +186,7 @@ private:
// to asio::post to avoid recursion. // to asio::post to avoid recursion.
void execute_or_queue(auto handler) noexcept { void execute_or_queue(auto handler) noexcept {
std::unique_lock l { _thread_mutex }; std::unique_lock l { _thread_mutex };
tracked_op h { std::move(handler) }; tracked_op h { std::move(handler), _ex };
if (_locked.load(std::memory_order_relaxed)) { if (_locked.load(std::memory_order_relaxed)) {
_waiting.emplace_back(std::move(h)); _waiting.emplace_back(std::move(h));
auto slot = _waiting.back().get_cancellation_slot(); auto slot = _waiting.back().get_cancellation_slot();

View File

@@ -27,18 +27,19 @@ void assign_tls_sni(const authority_path& ap, TlsContext& ctx, TlsStream& s);
namespace detail { namespace detail {
template <typename Handler> template <typename Handler, typename DfltExecutor>
auto tracking_executor(const Handler& handler) { auto tracking_executor(const Handler& handler, const DfltExecutor& ex) {
return asio::prefer( return asio::prefer(
asio::get_associated_executor(handler), asio::get_associated_executor(handler, ex),
asio::execution::outstanding_work.tracked asio::execution::outstanding_work.tracked
); );
} }
template <typename Handler> template <typename Handler, typename DfltExecutor>
using tracking_type = std::decay_t< using tracking_type = typename asio::prefer_result<
decltype(tracking_executor(std::declval<Handler>())) asio::associated_executor_t<Handler, DfltExecutor>,
>; asio::execution::outstanding_work_t::tracked_t
>::type;
template <typename T, typename B> template <typename T, typename B>
concept has_async_write = requires(T a) { concept has_async_write = requires(T a) {

View File

@@ -21,19 +21,19 @@ template <
> >
class cancellable_handler { class cancellable_handler {
struct op_state { struct op_state {
std::decay_t<Handler> _handler; Handler _handler;
tracking_type<Handler> _handler_ex; tracking_type<Handler, Executor> _handler_ex;
cancellable_handler* _owner; cancellable_handler* _owner;
op_state(Handler&& handler, cancellable_handler* owner) : op_state(
_handler(std::move(handler)), Handler&& handler, const Executor& ex, cancellable_handler* owner
_handler_ex(tracking_executor(_handler)), ) : _handler(std::move(handler)),
_handler_ex(tracking_executor(_handler, ex)),
_owner(owner) _owner(owner)
{} {}
void cancel_op(asio::cancellation_type_t ct) { void cancel_op() {
if (ct != asio::cancellation_type_t::none) _owner->cancel();
_owner->cancel();
} }
}; };
@@ -46,16 +46,16 @@ class cancellable_handler {
{} {}
void operator()(asio::cancellation_type_t type) { void operator()(asio::cancellation_type_t type) {
auto op = []( if ((type & asio::cancellation_type_t::terminal) ==
std::weak_ptr<op_state> wptr, asio::cancellation_type_t::none)
asio::cancellation_type_t type return;
) { auto op = [wptr = _state_weak_ptr]() {
if (auto state = wptr.lock()) if (auto state = wptr.lock())
state->cancel_op(type); state->cancel_op();
}; };
asio::dispatch( asio::dispatch(
_executor, _executor,
asio::prepend(std::move(op), _state_weak_ptr, type) std::move(op)
); );
} }
}; };
@@ -67,12 +67,12 @@ public:
cancellable_handler(Handler&& handler, const Executor& ex) { cancellable_handler(Handler&& handler, const Executor& ex) {
auto alloc = asio::get_associated_allocator(handler); auto alloc = asio::get_associated_allocator(handler);
_state = std::allocate_shared<op_state>( _state = std::allocate_shared<op_state>(
alloc, std::move(handler),this alloc, std::move(handler), ex, this
); );
_executor = ex;
auto slot = asio::get_associated_cancellation_slot(_state->_handler); auto slot = asio::get_associated_cancellation_slot(_state->_handler);
if (slot.is_connected()) if (slot.is_connected())
slot.template emplace<cancel_proxy>(_state, ex); slot.template emplace<cancel_proxy>(_state, ex);
_executor = ex;
} }
cancellable_handler(cancellable_handler&& other) noexcept : cancellable_handler(cancellable_handler&& other) noexcept :
@@ -102,8 +102,9 @@ public:
if (empty()) return; if (empty()) return;
auto h = std::move(_state->_handler); auto h = std::move(_state->_handler);
_state.reset();
asio::get_associated_cancellation_slot(h).clear(); asio::get_associated_cancellation_slot(h).clear();
auto handler_ex = std::move(_state->_handler_ex);
_state.reset();
auto op = std::apply([&h](auto... args) { auto op = std::apply([&h](auto... args) {
return asio::prepend( return asio::prepend(
@@ -111,7 +112,7 @@ public:
); );
}, CancelArgs {}); }, CancelArgs {});
asio::dispatch(std::move(_executor), std::move(op)); asio::dispatch(handler_ex, std::move(op));
} }
template <typename... Args> template <typename... Args>
@@ -119,11 +120,12 @@ public:
if (empty()) return; if (empty()) return;
auto h = std::move(_state->_handler); auto h = std::move(_state->_handler);
_state.reset();
asio::get_associated_cancellation_slot(h).clear(); asio::get_associated_cancellation_slot(h).clear();
auto handler_ex = std::move(_state->_handler_ex);
_state.reset();
asio::dispatch( asio::dispatch(
std::move(_executor), handler_ex,
asio::prepend(std::move(h), std::forward<Args>(args)...) asio::prepend(std::move(h), std::forward<Args>(args)...)
); );
} }
@@ -133,14 +135,14 @@ public:
if (empty()) return; if (empty()) return;
auto h = std::move(_state->_handler); auto h = std::move(_state->_handler);
_state.reset();
asio::get_associated_cancellation_slot(h).clear(); asio::get_associated_cancellation_slot(h).clear();
auto handler_ex = std::move(_state->_handler_ex);
_state.reset();
asio::post( asio::post(
std::move(_executor), _executor,
asio::prepend(std::move(h), std::forward<Args>(args)...) asio::prepend(std::move(h), std::forward<Args>(args)...)
); );
} }
}; };

View File

@@ -54,7 +54,7 @@ class assemble_op {
static constexpr size_t max_packet_size = 65536; static constexpr size_t max_packet_size = 65536;
client_service& _svc; client_service& _svc;
std::decay_t<Handler> _handler; Handler _handler;
std::string& _read_buff; std::string& _read_buff;
data_span& _data_span; data_span& _data_span;

View File

@@ -43,7 +43,7 @@ class connect_op {
Stream& _stream; Stream& _stream;
mqtt_context& _ctx; mqtt_context& _ctx;
std::decay_t<Handler> _handler; Handler _handler;
std::unique_ptr<std::string> _buffer_ptr; std::unique_ptr<std::string> _buffer_ptr;
using endpoint = asio::ip::tcp::endpoint; using endpoint = asio::ip::tcp::endpoint;

View File

@@ -9,6 +9,7 @@
#include <async_mqtt5/detail/control_packet.hpp> #include <async_mqtt5/detail/control_packet.hpp>
#include <async_mqtt5/detail/internal_types.hpp> #include <async_mqtt5/detail/internal_types.hpp>
#include <async_mqtt5/detail/cancellable_handler.hpp>
#include <async_mqtt5/impl/internal/codecs/message_encoders.hpp> #include <async_mqtt5/impl/internal/codecs/message_encoders.hpp>
@@ -28,7 +29,10 @@ class disconnect_op {
std::shared_ptr<client_service> _svc_ptr; std::shared_ptr<client_service> _svc_ptr;
DisconnectContext _context; DisconnectContext _context;
std::decay_t<Handler> _handler; cancellable_handler<
Handler,
typename ClientService::executor_type
> _handler;
public: public:
disconnect_op( disconnect_op(
@@ -37,7 +41,7 @@ public:
) : ) :
_svc_ptr(svc_ptr), _svc_ptr(svc_ptr),
_context(std::move(context)), _context(std::move(context)),
_handler(std::move(handler)) _handler(std::move(handler), get_executor())
{} {}
disconnect_op(disconnect_op&&) noexcept = default; disconnect_op(disconnect_op&&) noexcept = default;
@@ -103,10 +107,7 @@ public:
private: private:
void complete(error_code ec) { void complete(error_code ec) {
asio::dispatch( _handler.complete(ec);
get_executor(),
asio::prepend(std::move(_handler), ec)
);
} }
}; };

View File

@@ -26,7 +26,7 @@ class resolve_op {
struct on_resolve {}; struct on_resolve {};
Owner& _owner; Owner& _owner;
std::decay_t<Handler> _handler; Handler _handler;
public: public:
resolve_op( resolve_op(

View File

@@ -142,6 +142,12 @@ public:
} }
void send_publish(control_packet<allocator_type> publish) { void send_publish(control_packet<allocator_type> publish) {
if (_handler.empty()) { // already cancelled
if constexpr (qos_type != qos_e::at_most_once)
_svc_ptr->free_pid(publish.packet_id());
return;
}
const auto& wire_data = publish.wire_data(); const auto& wire_data = publish.wire_data();
_svc_ptr->async_send( _svc_ptr->async_send(
wire_data, wire_data,

View File

@@ -19,7 +19,7 @@ class read_op {
struct on_reconnect {}; struct on_reconnect {};
Owner& _owner; Owner& _owner;
std::decay_t<Handler> _handler; Handler _handler;
public: public:
read_op(Owner& owner, Handler&& handler) : read_op(Owner& owner, Handler&& handler) :

View File

@@ -27,7 +27,7 @@ class reconnect_op {
struct on_backoff {}; struct on_backoff {};
Owner& _owner; Owner& _owner;
std::decay_t<Handler> _handler; Handler _handler;
std::unique_ptr<std::string> _buffer_ptr; std::unique_ptr<std::string> _buffer_ptr;
using endpoint = asio::ip::tcp::endpoint; using endpoint = asio::ip::tcp::endpoint;

View File

@@ -71,6 +71,9 @@ public:
} }
void send_subscribe(control_packet<allocator_type> subscribe) { void send_subscribe(control_packet<allocator_type> subscribe) {
if (_handler.empty()) // already cancelled
return _svc_ptr->free_pid(subscribe.packet_id());
const auto& wire_data = subscribe.wire_data(); const auto& wire_data = subscribe.wire_data();
_svc_ptr->async_send( _svc_ptr->async_send(
wire_data, wire_data,

View File

@@ -71,6 +71,9 @@ public:
} }
void send_unsubscribe(control_packet<allocator_type> unsubscribe) { void send_unsubscribe(control_packet<allocator_type> unsubscribe) {
if (_handler.empty()) // already cancelled
return _svc_ptr->free_pid(unsubscribe.packet_id());
const auto& wire_data = unsubscribe.wire_data(); const auto& wire_data = unsubscribe.wire_data();
_svc_ptr->async_send( _svc_ptr->async_send(
wire_data, wire_data,

View File

@@ -15,7 +15,7 @@ class write_op {
struct on_reconnect {}; struct on_reconnect {};
Owner& _owner; Owner& _owner;
std::decay_t<Handler> _handler; Handler _handler;
public: public:
write_op( write_op(

View File

@@ -94,7 +94,7 @@ template <typename Handler>
class read_op { class read_op {
struct on_read {}; struct on_read {};
std::shared_ptr<test_stream_impl> _stream_impl; std::shared_ptr<test_stream_impl> _stream_impl;
std::decay_t<Handler> _handler; Handler _handler;
public: public:
read_op( read_op(
@@ -155,7 +155,7 @@ class write_op {
struct on_write {}; struct on_write {};
std::shared_ptr<test_stream_impl> _stream_impl; std::shared_ptr<test_stream_impl> _stream_impl;
std::decay_t<Handler> _handler; Handler _handler;
public: public:
write_op( write_op(