From 0ad7390e9421984b4432ef08e225b0810555c6bb Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 19 Feb 2019 09:01:45 -0800 Subject: [PATCH] Use async_initiate in basic_stream --- include/boost/beast/core/basic_stream.hpp | 4 + .../boost/beast/core/detail/type_traits.hpp | 2 +- .../boost/beast/core/impl/basic_stream.hpp | 389 ++++++++++-------- test/beast/core/basic_stream.cpp | 2 +- test/beast/core/error.cpp | 2 +- 5 files changed, 226 insertions(+), 173 deletions(-) diff --git a/include/boost/beast/core/basic_stream.hpp b/include/boost/beast/core/basic_stream.hpp index 271bc2af..2417b48b 100644 --- a/include/boost/beast/core/basic_stream.hpp +++ b/include/boost/beast/core/basic_stream.hpp @@ -292,8 +292,12 @@ private: template friend class basic_stream; + struct run_async_op; + struct timeout_handler; + struct ops; + #if ! BOOST_BEAST_DOXYGEN // boost::asio::ssl::stream needs these // DEPRECATED diff --git a/include/boost/beast/core/detail/type_traits.hpp b/include/boost/beast/core/detail/type_traits.hpp index e88f541f..f57458d7 100644 --- a/include/boost/beast/core/detail/type_traits.hpp +++ b/include/boost/beast/core/detail/type_traits.hpp @@ -135,7 +135,7 @@ struct is_contiguous_container::value, \ - "CompletionHandler signature requirements not met"); \ + "CompletionHandler type requirements not met"); \ ::boost::beast::net::async_completion init{handler} } // detail diff --git a/include/boost/beast/core/impl/basic_stream.hpp b/include/boost/beast/core/impl/basic_stream.hpp index 9befc076..c544ddf3 100644 --- a/include/boost/beast/core/impl/basic_stream.hpp +++ b/include/boost/beast/core/impl/basic_stream.hpp @@ -179,17 +179,157 @@ struct basic_stream:: //------------------------------------------------------------------------------ -/* - The algorithm for implementing the timeout depends - on the executor providing ordered execution guarantee. - `net::strand` automatically provides this, and we assume - that an implicit strand (one thread calling io_context::run) - also provides this. -*/ +namespace detail { + +template< + class Protocol, class Executor, class RatePolicy, + class Handler> +class basic_stream_connect_op + : public async_op_base +{ + using stream_type = beast::basic_stream< + Protocol, Executor, RatePolicy>; + + using timeout_handler = + typename stream_type::timeout_handler; + + boost::shared_ptr impl_; + typename stream_type::pending_guard pg0_; + typename stream_type::pending_guard pg1_; + + typename stream_type::op_state& + state() noexcept + { + return impl_->write; + } + +public: + template + basic_stream_connect_op( + Handler_&& h, + stream_type& s, + typename stream_type::endpoint_type ep) + : async_op_base( + std::forward(h), s.get_executor()) + , impl_(s.impl_) + , pg0_(impl_->read.pending) + , pg1_(impl_->write.pending) + { + if(state().timer.expiry() != stream_base::never()) + impl_->write.timer.async_wait( + net::bind_executor( + this->get_executor(), + timeout_handler{ + state(), + impl_, + state().tick})); + + impl_->socket.async_connect( + ep, std::move(*this)); + // *this is now moved-from + } + + template< + class Endpoints, class Condition, + class Handler_> + basic_stream_connect_op( + Handler_&& h, + stream_type& s, + Endpoints const& eps, + Condition const& cond) + : async_op_base( + std::forward(h), s.get_executor()) + , impl_(s.impl_) + , pg0_(impl_->read.pending) + , pg1_(impl_->write.pending) + { + if(state().timer.expiry() != stream_base::never()) + impl_->write.timer.async_wait( + net::bind_executor( + this->get_executor(), + timeout_handler{ + state(), + impl_, + state().tick})); + + net::async_connect(impl_->socket, + eps, cond, std::move(*this)); + // *this is now moved-from + } + + template< + class Iterator, class Condition, + class Handler_> + basic_stream_connect_op( + Handler_&& h, + stream_type& s, + Iterator begin, Iterator end, + Condition const& cond) + : async_op_base( + std::forward(h), s.get_executor()) + , impl_(s.impl_) + , pg0_(impl_->read.pending) + , pg1_(impl_->write.pending) + { + if(state().timer.expiry() != stream_base::never()) + impl_->write.timer.async_wait( + net::bind_executor( + this->get_executor(), + timeout_handler{ + state(), + impl_, + state().tick})); + + net::async_connect(impl_->socket, + begin, end, cond, std::move(*this)); + // *this is now moved-from + } + + template + void + operator()(error_code ec, Args&&... args) + { + if(state().timer.expiry() != stream_base::never()) + { + ++state().tick; + + // try cancelling timer + auto const n = + impl_->write.timer.cancel(); + if(n == 0) + { + // timeout handler invoked? + if(state().timeout) + { + // yes, socket already closed + ec = beast::error::timeout; + state().timeout = false; + } + } + else + { + BOOST_ASSERT(n == 1); + BOOST_ASSERT(! state().timeout); + } + } + + pg0_.reset(); + pg1_.reset(); + this->invoke_now(ec, std::forward(args)...); + } +}; + +} // detail + +//------------------------------------------------------------------------------ template +struct basic_stream::ops +{ + template -class basic_stream::async_op +class async_op : public async_op_base , public boost::asio::coroutine { @@ -279,9 +419,9 @@ class basic_stream::async_op public: template async_op( + Handler_&& h, basic_stream& s, - Buffers const& b, - Handler_&& h) + Buffers const& b) : async_op_base( std::forward(h), s.get_executor()) , impl_(s.impl_) @@ -390,150 +530,56 @@ public: } }; -//------------------------------------------------------------------------------ - -namespace detail { - -template< - class Protocol, class Executor, class RatePolicy, - class Handler> -class basic_stream_connect_op - : public async_op_base +struct run_read_op { - using stream_type = beast::basic_stream< - Protocol, Executor, RatePolicy>; - - using timeout_handler = - typename stream_type::timeout_handler; - - boost::shared_ptr impl_; - typename stream_type::pending_guard pg0_; - typename stream_type::pending_guard pg1_; - - typename stream_type::op_state& - state() noexcept - { - return impl_->write; - } - -public: - template - basic_stream_connect_op( - stream_type& s, - typename stream_type::endpoint_type ep, - Handler_&& h) - : async_op_base( - std::forward(h), s.get_executor()) - , impl_(s.impl_) - , pg0_(impl_->read.pending) - , pg1_(impl_->write.pending) - { - if(state().timer.expiry() != stream_base::never()) - impl_->write.timer.async_wait( - net::bind_executor( - this->get_executor(), - timeout_handler{ - state(), - impl_, - state().tick})); - - impl_->socket.async_connect( - ep, std::move(*this)); - // *this is now moved-from - } - - template< - class Endpoints, class Condition, - class Handler_> - basic_stream_connect_op( - stream_type& s, - Endpoints const& eps, - Condition const& cond, - Handler_&& h) - : async_op_base( - std::forward(h), s.get_executor()) - , impl_(s.impl_) - , pg0_(impl_->read.pending) - , pg1_(impl_->write.pending) - { - if(state().timer.expiry() != stream_base::never()) - impl_->write.timer.async_wait( - net::bind_executor( - this->get_executor(), - timeout_handler{ - state(), - impl_, - state().tick})); - - net::async_connect(impl_->socket, - eps, cond, std::move(*this)); - // *this is now moved-from - } - - template< - class Iterator, class Condition, - class Handler_> - basic_stream_connect_op( - stream_type& s, - Iterator begin, Iterator end, - Condition const& cond, - Handler_&& h) - : async_op_base( - std::forward(h), s.get_executor()) - , impl_(s.impl_) - , pg0_(impl_->read.pending) - , pg1_(impl_->write.pending) - { - if(state().timer.expiry() != stream_base::never()) - impl_->write.timer.async_wait( - net::bind_executor( - this->get_executor(), - timeout_handler{ - state(), - impl_, - state().tick})); - - net::async_connect(impl_->socket, - begin, end, cond, std::move(*this)); - // *this is now moved-from - } - - template + template void - operator()(error_code ec, Args&&... args) + operator()( + ReadHandler&& h, + basic_stream& s, + Buffers const& b + ) { - if(state().timer.expiry() != stream_base::never()) - { - ++state().tick; - - // try cancelling timer - auto const n = - impl_->write.timer.cancel(); - if(n == 0) - { - // timeout handler invoked? - if(state().timeout) - { - // yes, socket already closed - ec = beast::error::timeout; - state().timeout = false; - } - } - else - { - BOOST_ASSERT(n == 1); - BOOST_ASSERT(! state().timeout); - } - } - - pg0_.reset(); - pg1_.reset(); - this->invoke_now(ec, std::forward(args)...); + // If you get an error on the following line it means + // that your handler does not meet the documented type + // requirements for a ReadHandler. + static_assert( + detail::is_invocable::value, + "ReadHandler type requirements not met"); + async_op< + true, + Buffers, + typename std::decay::type>( + std::forward(h), s, b); } }; -} // detail +struct run_write_op +{ + template + void + operator()( + WriteHandler&& h, + basic_stream& s, + Buffers const& b) + { + // If you get an error on the following line it means + // that your handler does not meet the documented type + // requirements for a WriteHandler. + static_assert( + detail::is_invocable::value, + "WriteHandler type requirements not met"); + async_op< + false, + Buffers, + typename std::decay::type>( + std::forward(h), s, b); + } +}; + +}; //------------------------------------------------------------------------------ @@ -668,8 +714,9 @@ async_connect( detail::basic_stream_connect_op< Protocol, Executor, RatePolicy, BOOST_ASIO_HANDLER_TYPE( - ConnectHandler, void(error_code))>(*this, - ep, std::forward(handler)); + ConnectHandler, void(error_code))>( + std::forward(handler), + *this, ep); return init.result.get(); } @@ -685,12 +732,13 @@ async_read_some( static_assert(net::is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence requirements not met"); - BOOST_BEAST_HANDLER_INIT( - ReadHandler, void(error_code, std::size_t)); - async_op( - *this, buffers, std::move(init.completion_handler)); - return init.result.get(); + return net::async_initiate< + ReadHandler, + void(error_code, std::size_t)>( + ops::run_read_op{}, + handler, + *this, + buffers); } template @@ -705,12 +753,13 @@ async_write_some( static_assert(net::is_const_buffer_sequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); - BOOST_BEAST_HANDLER_INIT( - WriteHandler, void(error_code, std::size_t)); - async_op( - *this, buffers, std::move(init.completion_handler)); - return init.result.get(); + return net::async_initiate< + WriteHandler, + void(error_code, std::size_t)>( + ops::run_write_op{}, + handler, + *this, + buffers); } //------------------------------------------------------------------------------ @@ -733,8 +782,8 @@ async_connect( Protocol, Executor, RatePolicy, BOOST_ASIO_HANDLER_TYPE(RangeConnectHandler, void(error_code, typename Protocol::endpoint))>( - stream, endpoints, detail::any_endpoint{}, - std::move(init.completion_handler)); + std::move(init.completion_handler), stream, + endpoints, detail::any_endpoint{}); return init.result.get(); } @@ -758,8 +807,8 @@ async_connect( Protocol, Executor, RatePolicy, BOOST_ASIO_HANDLER_TYPE(RangeConnectHandler, void(error_code, typename Protocol::endpoint))>( - stream, endpoints, connect_condition, - std::move(init.completion_handler)); + std::move(init.completion_handler), stream, + endpoints, connect_condition); return init.result.get(); } @@ -780,8 +829,8 @@ async_connect( Protocol, Executor, RatePolicy, BOOST_ASIO_HANDLER_TYPE(IteratorConnectHandler, void(error_code, Iterator))>( - stream, begin, end, detail::any_endpoint{}, - std::move(init.completion_handler)); + std::move(init.completion_handler), stream, + begin, end, detail::any_endpoint{}); return init.result.get(); } @@ -804,8 +853,8 @@ async_connect( Protocol, Executor, RatePolicy, BOOST_ASIO_HANDLER_TYPE(IteratorConnectHandler, void(error_code, Iterator))>( - stream, begin, end, connect_condition, - std::move(init.completion_handler)); + std::move(init.completion_handler), stream, + begin, end, connect_condition); return init.result.get(); } diff --git a/test/beast/core/basic_stream.cpp b/test/beast/core/basic_stream.cpp index a3534b06..4410ba0e 100644 --- a/test/beast/core/basic_stream.cpp +++ b/test/beast/core/basic_stream.cpp @@ -157,7 +157,7 @@ struct test_acceptor a.set_option( net::socket_base::reuse_address(true)); a.bind(ep); - a.listen(0); + a.listen(1); ep = a.local_endpoint(); a.async_accept( [](error_code, net::ip::tcp::socket) diff --git a/test/beast/core/error.cpp b/test/beast/core/error.cpp index 915096e5..5d5ecf1c 100644 --- a/test/beast/core/error.cpp +++ b/test/beast/core/error.cpp @@ -49,7 +49,7 @@ public: } }; -BEAST_DEFINE_TESTSUITE(beast,websocket,error); +BEAST_DEFINE_TESTSUITE(beast,core,error); } // beast } // boost