Use async_initiate in basic_stream

This commit is contained in:
Vinnie Falco
2019-02-19 09:01:45 -08:00
parent 23a7bcc67e
commit 0ad7390e94
5 changed files with 226 additions and 173 deletions

View File

@@ -292,8 +292,12 @@ private:
template<class, class, class> template<class, class, class>
friend class basic_stream; friend class basic_stream;
struct run_async_op;
struct timeout_handler; struct timeout_handler;
struct ops;
#if ! BOOST_BEAST_DOXYGEN #if ! BOOST_BEAST_DOXYGEN
// boost::asio::ssl::stream needs these // boost::asio::ssl::stream needs these
// DEPRECATED // DEPRECATED

View File

@@ -135,7 +135,7 @@ struct is_contiguous_container<T, E, void_t<
#define BOOST_BEAST_HANDLER_INIT(type, sig) \ #define BOOST_BEAST_HANDLER_INIT(type, sig) \
static_assert(::boost::beast::detail::is_invocable< \ static_assert(::boost::beast::detail::is_invocable< \
BOOST_ASIO_HANDLER_TYPE(type, sig), sig>::value, \ BOOST_ASIO_HANDLER_TYPE(type, sig), sig>::value, \
"CompletionHandler signature requirements not met"); \ "CompletionHandler type requirements not met"); \
::boost::beast::net::async_completion<type, sig> init{handler} ::boost::beast::net::async_completion<type, sig> init{handler}
} // detail } // detail

View File

@@ -179,17 +179,157 @@ struct basic_stream<Protocol, Executor, RatePolicy>::
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/* namespace detail {
The algorithm for implementing the timeout depends
on the executor providing ordered execution guarantee. template<
`net::strand` automatically provides this, and we assume class Protocol, class Executor, class RatePolicy,
that an implicit strand (one thread calling io_context::run) class Handler>
also provides this. class basic_stream_connect_op
*/ : public async_op_base<Handler, Executor>
{
using stream_type = beast::basic_stream<
Protocol, Executor, RatePolicy>;
using timeout_handler =
typename stream_type::timeout_handler;
boost::shared_ptr<typename
stream_type::impl_type> impl_;
typename stream_type::pending_guard pg0_;
typename stream_type::pending_guard pg1_;
typename stream_type::op_state&
state() noexcept
{
return impl_->write;
}
public:
template<class Handler_>
basic_stream_connect_op(
Handler_&& h,
stream_type& s,
typename stream_type::endpoint_type ep)
: async_op_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
if(state().timer.expiry() != stream_base::never())
impl_->write.timer.async_wait(
net::bind_executor(
this->get_executor(),
timeout_handler{
state(),
impl_,
state().tick}));
impl_->socket.async_connect(
ep, std::move(*this));
// *this is now moved-from
}
template<
class Endpoints, class Condition,
class Handler_>
basic_stream_connect_op(
Handler_&& h,
stream_type& s,
Endpoints const& eps,
Condition const& cond)
: async_op_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
if(state().timer.expiry() != stream_base::never())
impl_->write.timer.async_wait(
net::bind_executor(
this->get_executor(),
timeout_handler{
state(),
impl_,
state().tick}));
net::async_connect(impl_->socket,
eps, cond, std::move(*this));
// *this is now moved-from
}
template<
class Iterator, class Condition,
class Handler_>
basic_stream_connect_op(
Handler_&& h,
stream_type& s,
Iterator begin, Iterator end,
Condition const& cond)
: async_op_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
if(state().timer.expiry() != stream_base::never())
impl_->write.timer.async_wait(
net::bind_executor(
this->get_executor(),
timeout_handler{
state(),
impl_,
state().tick}));
net::async_connect(impl_->socket,
begin, end, cond, std::move(*this));
// *this is now moved-from
}
template<class... Args>
void
operator()(error_code ec, Args&&... args)
{
if(state().timer.expiry() != stream_base::never())
{
++state().tick;
// try cancelling timer
auto const n =
impl_->write.timer.cancel();
if(n == 0)
{
// timeout handler invoked?
if(state().timeout)
{
// yes, socket already closed
ec = beast::error::timeout;
state().timeout = false;
}
}
else
{
BOOST_ASSERT(n == 1);
BOOST_ASSERT(! state().timeout);
}
}
pg0_.reset();
pg1_.reset();
this->invoke_now(ec, std::forward<Args>(args)...);
}
};
} // detail
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy> template<class Protocol, class Executor, class RatePolicy>
struct basic_stream<Protocol, Executor, RatePolicy>::ops
{
template<bool isRead, class Buffers, class Handler> template<bool isRead, class Buffers, class Handler>
class basic_stream<Protocol, Executor, RatePolicy>::async_op class async_op
: public async_op_base<Handler, Executor> : public async_op_base<Handler, Executor>
, public boost::asio::coroutine , public boost::asio::coroutine
{ {
@@ -279,9 +419,9 @@ class basic_stream<Protocol, Executor, RatePolicy>::async_op
public: public:
template<class Handler_> template<class Handler_>
async_op( async_op(
Handler_&& h,
basic_stream& s, basic_stream& s,
Buffers const& b, Buffers const& b)
Handler_&& h)
: async_op_base<Handler, Executor>( : async_op_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor()) std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_) , impl_(s.impl_)
@@ -390,150 +530,56 @@ public:
} }
}; };
//------------------------------------------------------------------------------ struct run_read_op
namespace detail {
template<
class Protocol, class Executor, class RatePolicy,
class Handler>
class basic_stream_connect_op
: public async_op_base<Handler, Executor>
{ {
using stream_type = beast::basic_stream< template<class ReadHandler, class Buffers>
Protocol, Executor, RatePolicy>;
using timeout_handler =
typename stream_type::timeout_handler;
boost::shared_ptr<typename
stream_type::impl_type> impl_;
typename stream_type::pending_guard pg0_;
typename stream_type::pending_guard pg1_;
typename stream_type::op_state&
state() noexcept
{
return impl_->write;
}
public:
template<class Handler_>
basic_stream_connect_op(
stream_type& s,
typename stream_type::endpoint_type ep,
Handler_&& h)
: async_op_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
if(state().timer.expiry() != stream_base::never())
impl_->write.timer.async_wait(
net::bind_executor(
this->get_executor(),
timeout_handler{
state(),
impl_,
state().tick}));
impl_->socket.async_connect(
ep, std::move(*this));
// *this is now moved-from
}
template<
class Endpoints, class Condition,
class Handler_>
basic_stream_connect_op(
stream_type& s,
Endpoints const& eps,
Condition const& cond,
Handler_&& h)
: async_op_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
if(state().timer.expiry() != stream_base::never())
impl_->write.timer.async_wait(
net::bind_executor(
this->get_executor(),
timeout_handler{
state(),
impl_,
state().tick}));
net::async_connect(impl_->socket,
eps, cond, std::move(*this));
// *this is now moved-from
}
template<
class Iterator, class Condition,
class Handler_>
basic_stream_connect_op(
stream_type& s,
Iterator begin, Iterator end,
Condition const& cond,
Handler_&& h)
: async_op_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
if(state().timer.expiry() != stream_base::never())
impl_->write.timer.async_wait(
net::bind_executor(
this->get_executor(),
timeout_handler{
state(),
impl_,
state().tick}));
net::async_connect(impl_->socket,
begin, end, cond, std::move(*this));
// *this is now moved-from
}
template<class... Args>
void void
operator()(error_code ec, Args&&... args) operator()(
ReadHandler&& h,
basic_stream& s,
Buffers const& b
)
{ {
if(state().timer.expiry() != stream_base::never()) // If you get an error on the following line it means
{ // that your handler does not meet the documented type
++state().tick; // requirements for a ReadHandler.
static_assert(
// try cancelling timer detail::is_invocable<ReadHandler,
auto const n = void(error_code, std::size_t)>::value,
impl_->write.timer.cancel(); "ReadHandler type requirements not met");
if(n == 0) async_op<
{ true,
// timeout handler invoked? Buffers,
if(state().timeout) typename std::decay<ReadHandler>::type>(
{ std::forward<ReadHandler>(h), s, b);
// yes, socket already closed
ec = beast::error::timeout;
state().timeout = false;
}
}
else
{
BOOST_ASSERT(n == 1);
BOOST_ASSERT(! state().timeout);
}
}
pg0_.reset();
pg1_.reset();
this->invoke_now(ec, std::forward<Args>(args)...);
} }
}; };
} // detail struct run_write_op
{
template<class WriteHandler, class Buffers>
void
operator()(
WriteHandler&& h,
basic_stream& s,
Buffers const& b)
{
// If you get an error on the following line it means
// that your handler does not meet the documented type
// requirements for a WriteHandler.
static_assert(
detail::is_invocable<WriteHandler,
void(error_code, std::size_t)>::value,
"WriteHandler type requirements not met");
async_op<
false,
Buffers,
typename std::decay<WriteHandler>::type>(
std::forward<WriteHandler>(h), s, b);
}
};
};
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -668,8 +714,9 @@ async_connect(
detail::basic_stream_connect_op< detail::basic_stream_connect_op<
Protocol, Executor, RatePolicy, Protocol, Executor, RatePolicy,
BOOST_ASIO_HANDLER_TYPE( BOOST_ASIO_HANDLER_TYPE(
ConnectHandler, void(error_code))>(*this, ConnectHandler, void(error_code))>(
ep, std::forward<ConnectHandler>(handler)); std::forward<ConnectHandler>(handler),
*this, ep);
return init.result.get(); return init.result.get();
} }
@@ -685,12 +732,13 @@ async_read_some(
static_assert(net::is_mutable_buffer_sequence< static_assert(net::is_mutable_buffer_sequence<
MutableBufferSequence>::value, MutableBufferSequence>::value,
"MutableBufferSequence requirements not met"); "MutableBufferSequence requirements not met");
BOOST_BEAST_HANDLER_INIT( return net::async_initiate<
ReadHandler, void(error_code, std::size_t)); ReadHandler,
async_op<true, MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE( void(error_code, std::size_t)>(
ReadHandler, void(error_code, std::size_t))>( ops::run_read_op{},
*this, buffers, std::move(init.completion_handler)); handler,
return init.result.get(); *this,
buffers);
} }
template<class Protocol, class Executor, class RatePolicy> template<class Protocol, class Executor, class RatePolicy>
@@ -705,12 +753,13 @@ async_write_some(
static_assert(net::is_const_buffer_sequence< static_assert(net::is_const_buffer_sequence<
ConstBufferSequence>::value, ConstBufferSequence>::value,
"ConstBufferSequence requirements not met"); "ConstBufferSequence requirements not met");
BOOST_BEAST_HANDLER_INIT( return net::async_initiate<
WriteHandler, void(error_code, std::size_t)); WriteHandler,
async_op<false, ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE( void(error_code, std::size_t)>(
WriteHandler, void(error_code, std::size_t))>( ops::run_write_op{},
*this, buffers, std::move(init.completion_handler)); handler,
return init.result.get(); *this,
buffers);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -733,8 +782,8 @@ async_connect(
Protocol, Executor, RatePolicy, Protocol, Executor, RatePolicy,
BOOST_ASIO_HANDLER_TYPE(RangeConnectHandler, BOOST_ASIO_HANDLER_TYPE(RangeConnectHandler,
void(error_code, typename Protocol::endpoint))>( void(error_code, typename Protocol::endpoint))>(
stream, endpoints, detail::any_endpoint{}, std::move(init.completion_handler), stream,
std::move(init.completion_handler)); endpoints, detail::any_endpoint{});
return init.result.get(); return init.result.get();
} }
@@ -758,8 +807,8 @@ async_connect(
Protocol, Executor, RatePolicy, Protocol, Executor, RatePolicy,
BOOST_ASIO_HANDLER_TYPE(RangeConnectHandler, BOOST_ASIO_HANDLER_TYPE(RangeConnectHandler,
void(error_code, typename Protocol::endpoint))>( void(error_code, typename Protocol::endpoint))>(
stream, endpoints, connect_condition, std::move(init.completion_handler), stream,
std::move(init.completion_handler)); endpoints, connect_condition);
return init.result.get(); return init.result.get();
} }
@@ -780,8 +829,8 @@ async_connect(
Protocol, Executor, RatePolicy, Protocol, Executor, RatePolicy,
BOOST_ASIO_HANDLER_TYPE(IteratorConnectHandler, BOOST_ASIO_HANDLER_TYPE(IteratorConnectHandler,
void(error_code, Iterator))>( void(error_code, Iterator))>(
stream, begin, end, detail::any_endpoint{}, std::move(init.completion_handler), stream,
std::move(init.completion_handler)); begin, end, detail::any_endpoint{});
return init.result.get(); return init.result.get();
} }
@@ -804,8 +853,8 @@ async_connect(
Protocol, Executor, RatePolicy, Protocol, Executor, RatePolicy,
BOOST_ASIO_HANDLER_TYPE(IteratorConnectHandler, BOOST_ASIO_HANDLER_TYPE(IteratorConnectHandler,
void(error_code, Iterator))>( void(error_code, Iterator))>(
stream, begin, end, connect_condition, std::move(init.completion_handler), stream,
std::move(init.completion_handler)); begin, end, connect_condition);
return init.result.get(); return init.result.get();
} }

View File

@@ -157,7 +157,7 @@ struct test_acceptor
a.set_option( a.set_option(
net::socket_base::reuse_address(true)); net::socket_base::reuse_address(true));
a.bind(ep); a.bind(ep);
a.listen(0); a.listen(1);
ep = a.local_endpoint(); ep = a.local_endpoint();
a.async_accept( a.async_accept(
[](error_code, net::ip::tcp::socket) [](error_code, net::ip::tcp::socket)

View File

@@ -49,7 +49,7 @@ public:
} }
}; };
BEAST_DEFINE_TESTSUITE(beast,websocket,error); BEAST_DEFINE_TESTSUITE(beast,core,error);
} // beast } // beast
} // boost } // boost