Better handling of stream timeouts

This commit is contained in:
Vinnie Falco
2019-02-04 06:45:45 -08:00
parent 8814ac4b35
commit 28858c60fd
4 changed files with 79 additions and 56 deletions

View File

@ -2,6 +2,7 @@ Version 210:
* Tidy up read implementation * Tidy up read implementation
* Fix stable_async_op_base javadoc * Fix stable_async_op_base javadoc
* Better handling of stream timeouts
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------

View File

@ -201,11 +201,14 @@ class basic_timeout_stream
static std::size_t constexpr no_limit = static std::size_t constexpr no_limit =
(std::numeric_limits<std::size_t>::max)(); (std::numeric_limits<std::size_t>::max)();
using tick_type = std::uint64_t;
struct op_state struct op_state
{ {
net::steady_timer timer; // for timing out net::steady_timer timer; // for timing out
tick_type tick = 0; // counts waits
bool pending = false; // if op is pending bool pending = false; // if op is pending
bool closed = false; // if timed out bool timeout = false; // if timed out
explicit explicit
op_state(net::io_context& ioc) op_state(net::io_context& ioc)

View File

@ -28,7 +28,8 @@ struct basic_timeout_stream<
Protocol, Executor>::timeout_handler Protocol, Executor>::timeout_handler
{ {
op_state& state; op_state& state;
std::shared_ptr<impl_type> impl; std::weak_ptr<impl_type> wp;
tick_type tick;
void void
operator()(error_code ec) operator()(error_code ec)
@ -36,20 +37,29 @@ struct basic_timeout_stream<
// timer canceled // timer canceled
if(ec == net::error::operation_aborted) if(ec == net::error::operation_aborted)
return; return;
BOOST_ASSERT(! ec); BOOST_ASSERT(! ec);
if(! state.closed) auto sp = wp.lock();
// stream destroyed
if(! sp)
return;
// stale timer
if(tick < state.tick)
return;
BOOST_ASSERT(tick == state.tick);
// late completion
if(state.timeout)
{ {
// timeout state.timeout = false;
impl->close(); return;
state.closed = true;
}
else
{
// late completion
state.closed = false;
} }
// timeout
sp->close();
state.timeout = true;
} }
}; };
@ -130,40 +140,40 @@ public:
{ {
// VFALCO TODO handle buffer size == 0 // VFALCO TODO handle buffer size == 0
// must come first
state().timer.async_wait( state().timer.async_wait(
net::bind_executor( net::bind_executor(
this->get_executor(), this->get_executor(),
timeout_handler{state(), timeout_handler{
impl_->shared_from_this()})); state(),
impl_,
state().tick
}));
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
async_perform( async_perform(
std::integral_constant<bool, isRead>{}); std::integral_constant<bool, isRead>{});
++state().tick;
// try cancelling timer // try cancelling timer
auto const n = auto const n =
state().timer.cancel(); state().timer.cancel();
if(n == 0)
if(state().closed)
{ {
// timeout handler already invoked if(state().timeout)
BOOST_ASSERT(n == 0); {
ec = beast::error::timeout; // timeout handler invoked
state().closed = false; ec = beast::error::timeout;
} state().timeout = false;
else if(n == 0) }
{ else
// timeout handler already queued {
ec = beast::error::timeout; // timeout handler queued, stale
}
impl_->close();
state().closed = true;
} }
else else
{ {
// timeout was canceled
BOOST_ASSERT(n == 1); BOOST_ASSERT(n == 1);
BOOST_ASSERT(! state().timeout);
} }
pg_.reset(); pg_.reset();
@ -192,6 +202,12 @@ class timeout_stream_connect_op
typename stream_type::pending_guard pg0_; typename stream_type::pending_guard pg0_;
typename stream_type::pending_guard pg1_; typename stream_type::pending_guard pg1_;
typename stream_type::op_state&
state() noexcept
{
return impl_->write;
}
public: public:
template< template<
class Endpoints, class Condition, class Endpoints, class Condition,
@ -208,12 +224,13 @@ public:
, pg1_(impl_->write.pending) , pg1_(impl_->write.pending)
{ {
// must come first // must come first
// VFALCO TODO what about the handler's allocator?
impl_->write.timer.async_wait( impl_->write.timer.async_wait(
net::bind_executor( net::bind_executor(
this->get_executor(), this->get_executor(),
timeout_handler{impl_->write, timeout_handler{
impl_->shared_from_this()})); state(),
impl_,
state().tick}));
net::async_connect(impl_->socket, net::async_connect(impl_->socket,
eps, cond, std::move(*this)); eps, cond, std::move(*this));
@ -238,8 +255,10 @@ public:
impl_->write.timer.async_wait( impl_->write.timer.async_wait(
net::bind_executor( net::bind_executor(
this->get_executor(), this->get_executor(),
timeout_handler{impl_->write, timeout_handler{
impl_->shared_from_this()})); state(),
impl_,
state().tick}));
net::async_connect(impl_->socket, net::async_connect(impl_->socket,
begin, end, cond, std::move(*this)); begin, end, cond, std::move(*this));
@ -261,8 +280,10 @@ public:
impl_->write.timer.async_wait( impl_->write.timer.async_wait(
net::bind_executor( net::bind_executor(
this->get_executor(), this->get_executor(),
timeout_handler{impl_->write, timeout_handler{
impl_->shared_from_this()})); state(),
impl_,
state().tick}));
impl_->socket.async_connect( impl_->socket.async_connect(
ep, std::move(*this)); ep, std::move(*this));
@ -273,31 +294,30 @@ public:
void void
operator()(error_code ec, Args&&... args) operator()(error_code ec, Args&&... args)
{ {
// try to cancel the timer ++state().tick;
// try cancelling timer
auto const n = auto const n =
impl_->write.timer.cancel(); impl_->write.timer.cancel();
if(n == 0)
if(impl_->write.closed)
{ {
// timeout handler already invoked if(state().timeout)
BOOST_ASSERT(n == 0); {
ec = beast::error::timeout; // timeout handler invoked
impl_->write.closed = false; ec = beast::error::timeout;
} state().timeout = false;
else if(n == 0) }
{ else
// timeout handler already queued {
ec = beast::error::timeout; // timeout handler queued, stale
}
impl_->close();
impl_->write.closed = true;
} }
else else
{ {
// timeout was canceled
BOOST_ASSERT(n == 1); BOOST_ASSERT(n == 1);
BOOST_ASSERT(! state().timeout);
} }
pg0_.reset(); pg0_.reset();
pg1_.reset(); pg1_.reset();
this->invoke(ec, std::forward<Args>(args)...); this->invoke(ec, std::forward<Args>(args)...);

View File

@ -915,8 +915,7 @@ public:
[&](error_code ec) [&](error_code ec)
{ {
invoked = true; invoked = true;
BEAST_EXPECTS(ec == error::timeout, BEAST_EXPECTS(! ec, ec.message());
ec.message());
}); });
ioc1.run(); ioc1.run();
ioc1.restart(); ioc1.restart();