Cancellation fixes

Summary:
related to T11798
- Cancel functions post outstanding handlers
- Properly cancel autoconnect stream

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Differential Revision: https://repo.mireo.local/D27763
This commit is contained in:
Korina Šimičević
2024-02-06 13:40:05 +01:00
parent 0de02e3c53
commit e0ae572e1b
9 changed files with 48 additions and 57 deletions

View File

@@ -160,10 +160,14 @@ public:
_waiting.pop_front(); _waiting.pop_front();
if (!op) continue; if (!op) continue;
op.get_cancellation_slot().clear(); op.get_cancellation_slot().clear();
auto ex = asio::get_associated_executor(op, _ex); asio::require(_ex, asio::execution::blocking.never)
asio::require(ex, asio::execution::blocking.possibly) .execute([ex = _ex, op = std::move(op)]() mutable {
.execute([op = std::move(op)]() mutable { auto opex = asio::get_associated_executor(op, ex);
std::move(op)(asio::error::operation_aborted); opex.execute(
[op = std::move(op)]() mutable {
op(asio::error::operation_aborted);
}
);
}); });
} }
} }
@@ -178,7 +182,9 @@ private:
.execute([ex = _ex, op = std::move(op)]() mutable { .execute([ex = _ex, op = std::move(op)]() mutable {
auto opex = asio::get_associated_executor(op, ex); auto opex = asio::get_associated_executor(op, ex);
opex.execute( opex.execute(
[op = std::move(op)]() mutable { op(error_code{}); } [op = std::move(op)]() mutable {
op(error_code {});
}
); );
}); });
} }

View File

@@ -5,6 +5,7 @@
#include <boost/asio/bind_allocator.hpp> #include <boost/asio/bind_allocator.hpp>
#include <boost/asio/bind_executor.hpp> #include <boost/asio/bind_executor.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/prepend.hpp> #include <boost/asio/prepend.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
@@ -46,6 +47,13 @@ public:
std::move(_handler)(ec); std::move(_handler)(ec);
} }
void complete_post(const asio::any_io_executor& ex, error_code ec) {
asio::post(
ex,
asio::prepend(std::move(_handler), ec)
);
}
auto get_executor() { auto get_executor() {
return asio::get_associated_executor(_handler); return asio::get_associated_executor(_handler);
} }
@@ -140,7 +148,7 @@ public:
void cancel() { void cancel() {
auto ops = std::move(_write_queue); auto ops = std::move(_write_queue);
for (auto& op : ops) for (auto& op : ops)
op.complete(asio::error::operation_aborted); op.complete_post(_svc.get_executor(), asio::error::operation_aborted);
} }
void resend() { void resend() {

View File

@@ -94,13 +94,14 @@ public:
void cancel() { void cancel() {
error_code ec; error_code ec;
lowest_layer(*_stream_ptr).cancel(ec); lowest_layer(*_stream_ptr).cancel(ec);
_conn_mtx.cancel();
_connect_timer.cancel();
} }
void close() { void close() {
error_code ec; error_code ec;
shutdown(asio::ip::tcp::socket::shutdown_both); shutdown(asio::ip::tcp::socket::shutdown_both);
lowest_layer(*_stream_ptr).close(ec); lowest_layer(*_stream_ptr).close(ec);
_connect_timer.cancel();
} }
void shutdown(asio::ip::tcp::socket::shutdown_type what) { void shutdown(asio::ip::tcp::socket::shutdown_type what) {

View File

@@ -362,6 +362,7 @@ public:
_rec_channel.close(); _rec_channel.close();
_replies.cancel_unanswered(); _replies.cancel_unanswered();
_async_sender.cancel(); _async_sender.cancel();
_stream.cancel();
_stream.close(); _stream.close();
asio::get_associated_cancellation_slot(_run_handler).clear(); asio::get_associated_cancellation_slot(_run_handler).clear();

View File

@@ -181,6 +181,8 @@ public:
private: private:
void complete(error_code ec) { void complete(error_code ec) {
get_cancellation_slot().clear(); get_cancellation_slot().clear();
if (ec != asio::error::operation_aborted)
_owner._conn_mtx.unlock(); _owner._conn_mtx.unlock();
std::move(_handler)(ec); std::move(_handler)(ec);

View File

@@ -162,7 +162,7 @@ public:
void cancel_unanswered() { void cancel_unanswered() {
auto ua = std::move(_handlers); auto ua = std::move(_handlers);
for (auto& h : ua) for (auto& h : ua)
h.complete(asio::error::operation_aborted); h.complete_post(_ex, asio::error::operation_aborted);
} }
bool any_expired() { bool any_expired() {

View File

@@ -52,6 +52,8 @@ public:
_test_broker = &asio::use_service<test_broker>(_ex.context()); _test_broker = &asio::use_service<test_broker>(_ex.context());
} }
void cancel(error_code&) {}
void close(error_code& ec) { void close(error_code& ec) {
disconnect(); disconnect();
ec = {}; ec = {};
@@ -248,6 +250,10 @@ public:
_impl->open(p, ec); _impl->open(p, ec);
} }
void cancel(error_code& ec) {
_impl->cancel(ec);
}
void close(error_code& ec) { void close(error_code& ec) {
_impl->close(ec); _impl->close(ec);
} }

View File

@@ -28,8 +28,7 @@ enum operation_type {
}; };
enum cancel_type { enum cancel_type {
ioc_stop = 1, client_cancel = 1,
client_cancel,
signal_emit signal_emit
}; };
@@ -49,7 +48,7 @@ void setup_cancel_op_test_case(
asio::bind_cancellation_slot( asio::bind_cancellation_slot(
signal.slot(), signal.slot(),
[&handlers_called](error_code ec) { [&handlers_called](error_code ec) {
handlers_called++; ++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(ec == asio::error::operation_aborted);
} }
) )
@@ -68,7 +67,7 @@ void setup_cancel_op_test_case(
asio::bind_cancellation_slot( asio::bind_cancellation_slot(
signal.slot(), signal.slot(),
[&handlers_called](error_code ec) { [&handlers_called](error_code ec) {
handlers_called++; ++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(ec == asio::error::operation_aborted);
} }
) )
@@ -89,7 +88,7 @@ void setup_cancel_op_test_case(
[&handlers_called]( [&handlers_called](
error_code ec, std::string t, std::string p, publish_props error_code ec, std::string t, std::string p, publish_props
) { ) {
handlers_called++; ++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST(t == ""); BOOST_TEST(t == "");
BOOST_TEST(p == ""); BOOST_TEST(p == "");
@@ -113,7 +112,7 @@ void setup_cancel_op_test_case(
[&handlers_called]( [&handlers_called](
error_code ec, std::vector<reason_code> rcs, unsuback_props error_code ec, std::vector<reason_code> rcs, unsuback_props
) { ) {
handlers_called++; ++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST_REQUIRE(rcs.size() == 1u); BOOST_TEST_REQUIRE(rcs.size() == 1u);
BOOST_TEST(rcs[0] == reason_codes::empty); BOOST_TEST(rcs[0] == reason_codes::empty);
@@ -137,7 +136,7 @@ void setup_cancel_op_test_case(
[&handlers_called]( [&handlers_called](
error_code ec, std::vector<reason_code> rcs, suback_props error_code ec, std::vector<reason_code> rcs, suback_props
) { ) {
handlers_called++; ++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST_REQUIRE(rcs.size() == 1u); BOOST_TEST_REQUIRE(rcs.size() == 1u);
BOOST_TEST(rcs[0] == reason_codes::empty); BOOST_TEST(rcs[0] == reason_codes::empty);
@@ -150,39 +149,31 @@ template<test::cancel_type c_type, test::operation_type op_type>
void run_cancel_op_test() { void run_cancel_op_test() {
using namespace test; using namespace test;
constexpr int expected_handlers_called = c_type == ioc_stop ? 0 : 1; constexpr int expected_handlers_called = 1;
int handlers_called = 0; int handlers_called = 0;
asio::io_context ioc; asio::io_context ioc;
asio::cancellation_signal signal;
client_type c(ioc, ""); client_type c(ioc, "");
c.brokers("127.0.0.1"); c.brokers("127.0.0.1");
asio::cancellation_signal signal;
setup_cancel_op_test_case<op_type>(c, signal, handlers_called); setup_cancel_op_test_case<op_type>(c, signal, handlers_called);
asio::steady_timer timer(c.get_executor()); asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::milliseconds(10)); timer.expires_after(std::chrono::milliseconds(100));
timer.async_wait([&](auto) { timer.async_wait([&](auto) {
if constexpr (c_type == ioc_stop) if constexpr (c_type == client_cancel)
ioc.stop();
else if constexpr (c_type == client_cancel)
c.cancel(); c.cancel();
else if constexpr (c_type == signal_emit) else if constexpr (c_type == signal_emit)
signal.emit(asio::cancellation_type_t::terminal); signal.emit(asio::cancellation_type_t::terminal);
}); });
ioc.run();
ioc.run_for(std::chrono::seconds(1)); BOOST_TEST(handlers_called == expected_handlers_called);
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
} }
BOOST_AUTO_TEST_SUITE(cancellation/*, *boost::unit_test::disabled()*/) BOOST_AUTO_TEST_SUITE(cancellation/*, *boost::unit_test::disabled()*/)
// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_run, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::async_run>();
}
BOOST_AUTO_TEST_CASE(client_cancel_async_run) { BOOST_AUTO_TEST_CASE(client_cancel_async_run) {
run_cancel_op_test<test::client_cancel, test::async_run>(); run_cancel_op_test<test::client_cancel, test::async_run>();
} }
@@ -191,11 +182,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_run) {
run_cancel_op_test<test::signal_emit, test::async_run>(); run_cancel_op_test<test::signal_emit, test::async_run>();
} }
// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_publish, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::publish>();
}
BOOST_AUTO_TEST_CASE(client_cancel_async_publish) { BOOST_AUTO_TEST_CASE(client_cancel_async_publish) {
run_cancel_op_test<test::client_cancel, test::publish>(); run_cancel_op_test<test::client_cancel, test::publish>();
} }
@@ -204,11 +190,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_publish) {
run_cancel_op_test<test::signal_emit, test::publish>(); run_cancel_op_test<test::signal_emit, test::publish>();
} }
// hangs after ping changes
BOOST_AUTO_TEST_CASE(ioc_stop_async_receive, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::receive>();
}
BOOST_AUTO_TEST_CASE(client_cancel_async_receive) { BOOST_AUTO_TEST_CASE(client_cancel_async_receive) {
run_cancel_op_test<test::client_cancel, test::receive>(); run_cancel_op_test<test::client_cancel, test::receive>();
} }
@@ -218,11 +199,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_receive, *boost::unit_test::disabled()) {
run_cancel_op_test<test::signal_emit, test::receive>(); run_cancel_op_test<test::signal_emit, test::receive>();
} }
// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_subscribe, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::subscribe>();
}
BOOST_AUTO_TEST_CASE(client_cancel_async_subscribe) { BOOST_AUTO_TEST_CASE(client_cancel_async_subscribe) {
run_cancel_op_test<test::client_cancel, test::subscribe>(); run_cancel_op_test<test::client_cancel, test::subscribe>();
} }
@@ -231,11 +207,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_subscribe) {
run_cancel_op_test<test::signal_emit, test::subscribe>(); run_cancel_op_test<test::signal_emit, test::subscribe>();
} }
// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_unsubscribe, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::unsubscribe>();
}
BOOST_AUTO_TEST_CASE(client_cancel_async_unsubscribe) { BOOST_AUTO_TEST_CASE(client_cancel_async_unsubscribe) {
run_cancel_op_test<test::client_cancel, test::unsubscribe>(); run_cancel_op_test<test::client_cancel, test::unsubscribe>();
} }

View File

@@ -123,7 +123,6 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) {
[&](boost::system::error_code ec) { [&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel(); c.cancel();
ioc.stop();
} }
); );
@@ -159,7 +158,6 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) {
[&](boost::system::error_code ec) { [&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel(); c.cancel();
ioc.stop();
} }
); );
@@ -195,7 +193,6 @@ BOOST_AUTO_TEST_CASE(openssl_tls_client_check) {
[&](boost::system::error_code ec) { [&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel(); c.cancel();
ioc.stop();
} }
); );
@@ -233,7 +230,6 @@ BOOST_AUTO_TEST_CASE(websocket_tls_client_check) {
[&](boost::system::error_code ec) { [&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel(); c.cancel();
ioc.stop();
} }
); );