From 28858c60fdee18306b2dcfe9cd52341bbbbbdd98 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 4 Feb 2019 06:45:45 -0800 Subject: [PATCH] Better handling of stream timeouts --- CHANGELOG.md | 1 + .../boost/beast/core/basic_timeout_stream.hpp | 5 +- .../beast/core/impl/basic_timeout_stream.hpp | 126 ++++++++++-------- test/beast/core/basic_timeout_stream.cpp | 3 +- 4 files changed, 79 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12cb71b1..8dd001ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Version 210: * Tidy up read implementation * Fix stable_async_op_base javadoc +* Better handling of stream timeouts -------------------------------------------------------------------------------- diff --git a/include/boost/beast/core/basic_timeout_stream.hpp b/include/boost/beast/core/basic_timeout_stream.hpp index 6084eafb..87216163 100644 --- a/include/boost/beast/core/basic_timeout_stream.hpp +++ b/include/boost/beast/core/basic_timeout_stream.hpp @@ -201,11 +201,14 @@ class basic_timeout_stream static std::size_t constexpr no_limit = (std::numeric_limits::max)(); + using tick_type = std::uint64_t; + struct op_state { net::steady_timer timer; // for timing out + tick_type tick = 0; // counts waits bool pending = false; // if op is pending - bool closed = false; // if timed out + bool timeout = false; // if timed out explicit op_state(net::io_context& ioc) diff --git a/include/boost/beast/core/impl/basic_timeout_stream.hpp b/include/boost/beast/core/impl/basic_timeout_stream.hpp index 26e8f6ab..53b4b4e6 100644 --- a/include/boost/beast/core/impl/basic_timeout_stream.hpp +++ b/include/boost/beast/core/impl/basic_timeout_stream.hpp @@ -28,7 +28,8 @@ struct basic_timeout_stream< Protocol, Executor>::timeout_handler { op_state& state; - std::shared_ptr impl; + std::weak_ptr wp; + tick_type tick; void operator()(error_code ec) @@ -36,20 +37,29 @@ struct basic_timeout_stream< // timer canceled if(ec == net::error::operation_aborted) return; - BOOST_ASSERT(! ec); - if(! state.closed) + auto sp = wp.lock(); + + // stream destroyed + if(! sp) + return; + + // stale timer + if(tick < state.tick) + return; + BOOST_ASSERT(tick == state.tick); + + // late completion + if(state.timeout) { - // timeout - impl->close(); - state.closed = true; - } - else - { - // late completion - state.closed = false; + state.timeout = false; + return; } + + // timeout + sp->close(); + state.timeout = true; } }; @@ -130,40 +140,40 @@ public: { // VFALCO TODO handle buffer size == 0 - // must come first state().timer.async_wait( net::bind_executor( this->get_executor(), - timeout_handler{state(), - impl_->shared_from_this()})); + timeout_handler{ + state(), + impl_, + state().tick + })); BOOST_ASIO_CORO_YIELD async_perform( std::integral_constant{}); + ++state().tick; // try cancelling timer auto const n = state().timer.cancel(); - - if(state().closed) + if(n == 0) { - // timeout handler already invoked - BOOST_ASSERT(n == 0); - ec = beast::error::timeout; - state().closed = false; - } - else if(n == 0) - { - // timeout handler already queued - ec = beast::error::timeout; - - impl_->close(); - state().closed = true; + if(state().timeout) + { + // timeout handler invoked + ec = beast::error::timeout; + state().timeout = false; + } + else + { + // timeout handler queued, stale + } } else { - // timeout was canceled BOOST_ASSERT(n == 1); + BOOST_ASSERT(! state().timeout); } pg_.reset(); @@ -192,6 +202,12 @@ class timeout_stream_connect_op typename stream_type::pending_guard pg0_; typename stream_type::pending_guard pg1_; + typename stream_type::op_state& + state() noexcept + { + return impl_->write; + } + public: template< class Endpoints, class Condition, @@ -208,12 +224,13 @@ public: , pg1_(impl_->write.pending) { // must come first - // VFALCO TODO what about the handler's allocator? impl_->write.timer.async_wait( net::bind_executor( this->get_executor(), - timeout_handler{impl_->write, - impl_->shared_from_this()})); + timeout_handler{ + state(), + impl_, + state().tick})); net::async_connect(impl_->socket, eps, cond, std::move(*this)); @@ -238,8 +255,10 @@ public: impl_->write.timer.async_wait( net::bind_executor( this->get_executor(), - timeout_handler{impl_->write, - impl_->shared_from_this()})); + timeout_handler{ + state(), + impl_, + state().tick})); net::async_connect(impl_->socket, begin, end, cond, std::move(*this)); @@ -261,8 +280,10 @@ public: impl_->write.timer.async_wait( net::bind_executor( this->get_executor(), - timeout_handler{impl_->write, - impl_->shared_from_this()})); + timeout_handler{ + state(), + impl_, + state().tick})); impl_->socket.async_connect( ep, std::move(*this)); @@ -273,31 +294,30 @@ public: void operator()(error_code ec, Args&&... args) { - // try to cancel the timer + ++state().tick; + + // try cancelling timer auto const n = impl_->write.timer.cancel(); - - if(impl_->write.closed) + if(n == 0) { - // timeout handler already invoked - BOOST_ASSERT(n == 0); - ec = beast::error::timeout; - impl_->write.closed = false; - } - else if(n == 0) - { - // timeout handler already queued - ec = beast::error::timeout; - - impl_->close(); - impl_->write.closed = true; + if(state().timeout) + { + // timeout handler invoked + ec = beast::error::timeout; + state().timeout = false; + } + else + { + // timeout handler queued, stale + } } else { - // timeout was canceled BOOST_ASSERT(n == 1); + BOOST_ASSERT(! state().timeout); } - + pg0_.reset(); pg1_.reset(); this->invoke(ec, std::forward(args)...); diff --git a/test/beast/core/basic_timeout_stream.cpp b/test/beast/core/basic_timeout_stream.cpp index 02cbdccc..b1947e43 100644 --- a/test/beast/core/basic_timeout_stream.cpp +++ b/test/beast/core/basic_timeout_stream.cpp @@ -915,8 +915,7 @@ public: [&](error_code ec) { invoked = true; - BEAST_EXPECTS(ec == error::timeout, - ec.message()); + BEAST_EXPECTS(! ec, ec.message()); }); ioc1.run(); ioc1.restart();