From b46953f1bdc17a3dde7b951961cf75dde9a8c9b9 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sat, 5 Jan 2019 19:40:46 -0800 Subject: [PATCH] Use async_op_base: Composed operation implementations use async_op_base and stable_async_op_base, to eliminate redundant boilerplate. --- CHANGELOG.md | 1 + .../_experimental/core/impl/flat_stream.hpp | 165 +++---- .../_experimental/http/impl/icy_stream.hpp | 339 ++++++--------- .../boost/beast/core/buffered_read_stream.hpp | 20 +- .../boost/beast/core/detail/async_op_base.hpp | 13 + .../beast/core/impl/buffered_read_stream.hpp | 179 +++----- .../boost/beast/http/impl/file_body_win32.ipp | 217 ++++----- include/boost/beast/http/impl/read.ipp | 91 +--- include/boost/beast/websocket/impl/accept.ipp | 314 +++++-------- include/boost/beast/websocket/impl/close.ipp | 411 ++++++++---------- .../boost/beast/websocket/impl/handshake.ipp | 144 ++---- include/boost/beast/websocket/impl/ping.ipp | 163 +++---- include/boost/beast/websocket/impl/read.ipp | 196 +++------ .../boost/beast/websocket/impl/teardown.ipp | 184 +++----- include/boost/beast/websocket/impl/write.ipp | 72 +-- include/boost/beast/websocket/teardown.hpp | 1 + 16 files changed, 880 insertions(+), 1630 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dc2f493..b48a6745 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Version 202 * Update coverage badge images * Tidy up basic_stream_socket docs * Refactor async_op_base +* Use async_op_base -------------------------------------------------------------------------------- diff --git a/include/boost/beast/_experimental/core/impl/flat_stream.hpp b/include/boost/beast/_experimental/core/impl/flat_stream.hpp index 637433c4..90656f6c 100644 --- a/include/boost/beast/_experimental/core/impl/flat_stream.hpp +++ b/include/boost/beast/_experimental/core/impl/flat_stream.hpp @@ -11,14 +11,12 @@ #define BOOST_BEAST_CORE_IMPL_FLAT_STREAM_HPP #include +#include +#include #include -#include -#include #include #include -#include -#include -#include +#include namespace boost { namespace beast { @@ -26,16 +24,12 @@ namespace beast { template template class flat_stream::write_op - : public net::coroutine + : public detail::async_op_base> + + , public net::coroutine { - using alloc_type = typename -#if defined(BOOST_NO_CXX11_ALLOCATOR) - net::associated_allocator_t::template - rebind::other; -#else - std::allocator_traits> - ::template rebind_alloc; -#endif + using alloc_type = std::allocator; struct deleter { @@ -58,123 +52,62 @@ class flat_stream::write_op flat_stream& s_; ConstBufferSequence b_; std::unique_ptr p_; - Handler h_; public: - template + template write_op( flat_stream& s, ConstBufferSequence const& b, - DeducedHandler&& h) - : s_(s) + Handler_&& h) + : detail::async_op_base>( + s.get_executor(), + std::forward(h)) + , s_(s) , b_(b) - , p_(nullptr, deleter{ - (net::get_associated_allocator)(h)}) - , h_(std::forward(h)) + , p_(nullptr, deleter{alloc_type{}}) { + (*this)({}, 0); } void operator()( boost::system::error_code ec, - std::size_t bytes_transferred); - - // - - using allocator_type = - net::associated_allocator_t; - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval().get_executor())>; - - allocator_type - get_allocator() const noexcept + std::size_t bytes_transferred) { - return net::get_associated_allocator(h_); - } - - executor_type - get_executor() const noexcept - { - return net::get_associated_executor( - h_, s_.get_executor()); - } - - template - friend - void asio_handler_invoke(Function&& f, write_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); - } - - friend - void* asio_handler_allocate( - std::size_t size, write_op* op) - { - using net::asio_handler_allocate; - return asio_handler_allocate( - size, std::addressof(op->h_)); - } - - friend - void asio_handler_deallocate( - void* p, std::size_t size, write_op* op) - { - using net::asio_handler_deallocate; - asio_handler_deallocate( - p, size, std::addressof(op->h_)); - } - - friend - bool asio_handler_is_continuation(write_op* op) - { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->h_)); + BOOST_ASIO_CORO_REENTER(*this) + { + BOOST_ASIO_CORO_YIELD + { + auto const result = + coalesce(b_, coalesce_limit); + if(result.needs_coalescing) + { + p_.get_deleter().size = result.size; + p_.reset(p_.get_deleter().alloc.allocate( + p_.get_deleter().size)); + net::buffer_copy( + net::buffer( + p_.get(), p_.get_deleter().size), + b_, result.size); + s_.stream_.async_write_some( + net::buffer( + p_.get(), p_.get_deleter().size), + std::move(*this)); + } + else + { + s_.stream_.async_write_some( + boost::beast::buffers_prefix( + result.size, b_), std::move(*this)); + } + } + p_.reset(); + this->invoke(ec, bytes_transferred); + } } }; -template -template -void -flat_stream:: -write_op:: -operator()( - error_code ec, - std::size_t bytes_transferred) -{ - BOOST_ASIO_CORO_REENTER(*this) - { - BOOST_ASIO_CORO_YIELD - { - auto const result = coalesce(b_, coalesce_limit); - if(result.needs_coalescing) - { - p_.get_deleter().size = result.size; - p_.reset(p_.get_deleter().alloc.allocate( - p_.get_deleter().size)); - net::buffer_copy( - net::buffer( - p_.get(), p_.get_deleter().size), - b_, result.size); - s_.stream_.async_write_some( - net::buffer( - p_.get(), p_.get_deleter().size), - std::move(*this)); - } - else - { - s_.stream_.async_write_some( - boost::beast::buffers_prefix(result.size, b_), - std::move(*this)); - } - } - p_.reset(); - h_(ec, bytes_transferred); - } -} - //------------------------------------------------------------------------------ template @@ -298,7 +231,7 @@ async_write_some( WriteHandler, void(error_code, std::size_t)); write_op{ - *this, buffers, std::move(init.completion_handler)}({}, 0); + *this, buffers, std::move(init.completion_handler)}; return init.result.get(); } diff --git a/include/boost/beast/_experimental/http/impl/icy_stream.hpp b/include/boost/beast/_experimental/http/impl/icy_stream.hpp index d7eb6960..5d314684 100644 --- a/include/boost/beast/_experimental/http/impl/icy_stream.hpp +++ b/include/boost/beast/_experimental/http/impl/icy_stream.hpp @@ -15,10 +15,13 @@ #include #include #include +#include #include -#include +#include #include #include +#include +#include #include #include #include @@ -118,26 +121,21 @@ public: template template class icy_stream::read_op - : public net::coroutine + : public beast::detail::stable_async_op_base> + , public net::coroutine { - using alloc_type = typename -#if defined(BOOST_NO_CXX11_ALLOCATOR) - net::associated_allocator_t::template - rebind::other; -#else - std::allocator_traits>::template rebind_alloc; -#endif - + // VFALCO We need a stable reference to `b` + // to pass to asio's read functions. + // struct data { - icy_stream& s; + icy_stream& s; buffers_adaptor b; bool match = false; data( - Handler const&, - icy_stream& s_, + icy_stream& s_, MutableBufferSequence const& b_) : s(s_) , b(b_) @@ -145,219 +143,153 @@ class icy_stream::read_op } }; - handler_ptr d_; + data& d_; public: - read_op(read_op&&) = default; - read_op(read_op const&) = delete; - - template + template read_op( - DeducedHandler&& h, - icy_stream& s, + Handler_&& h, + icy_stream& s, MutableBufferSequence const& b) - : d_(std::forward(h), s, b) + : beast::detail::stable_async_op_base>( + s.get_executor(), std::forward(h)) + , d_(beast::detail::allocate_stable(*this, s, b)) { + (*this)({}, 0); } void operator()( boost::system::error_code ec, - std::size_t bytes_transferred); - - // - - using allocator_type = - net::associated_allocator_t; - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval().get_executor())>; - - allocator_type - get_allocator() const noexcept + std::size_t bytes_transferred) { - return net::get_associated_allocator(d_.handler()); - } - - executor_type - get_executor() const noexcept - { - return net::get_associated_executor( - d_.handler(), d_->s.get_executor()); - } - - template - friend - void asio_handler_invoke( - Function&& f, read_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke( - f, std::addressof(op->d_.handler())); - } - - friend - void* asio_handler_allocate( - std::size_t size, read_op* op) - { - using net::asio_handler_allocate; - return asio_handler_allocate( - size, std::addressof(op->d_.handler())); - } - - friend - void asio_handler_deallocate( - void* p, std::size_t size, read_op* op) - { - using net::asio_handler_deallocate; - asio_handler_deallocate( - p, size, std::addressof(op->d_.handler())); - } - - friend - bool asio_handler_is_continuation(read_op* op) - { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->d_.handler())); - } -}; - -template -template -void -icy_stream:: -read_op:: -operator()( - error_code ec, - std::size_t bytes_transferred) -{ - using iterator = net::buffers_iterator< - typename beast::detail::dynamic_buffer_ref< - buffers_adaptor>::const_buffers_type>; - auto& d = *d_; - BOOST_ASIO_CORO_REENTER(*this) - { - if(d.b.max_size() == 0) + using iterator = net::buffers_iterator< + typename beast::detail::dynamic_buffer_ref< + buffers_adaptor>::const_buffers_type>; + BOOST_ASIO_CORO_REENTER(*this) { - BOOST_ASIO_CORO_YIELD - net::post(d.s.get_executor(), - beast::bind_handler(std::move(*this), ec, 0)); - goto upcall; - } - if(! d.s.detect_) - { - if(d.s.copy_ > 0) - { - auto const n = net::buffer_copy( - d.b.prepare(std::min( - d.s.copy_, d.b.max_size())), - net::buffer(d.s.buf_)); - d.b.commit(n); - d.s.copy_ = static_cast( - d.s.copy_ - n); - if(d.s.copy_ > 0) - std::memmove( - d.s.buf_, - &d.s.buf_[n], - d.s.copy_); - } - if(d.b.size() < d.b.max_size()) + if(d_.b.max_size() == 0) { BOOST_ASIO_CORO_YIELD - d.s.next_layer().async_read_some( - d.b.prepare(d.b.max_size() - d.b.size()), - std::move(*this)); - d.b.commit(bytes_transferred); + net::post(d_.s.get_executor(), + beast::bind_handler(std::move(*this), ec, 0)); + goto upcall; + } + if(! d_.s.detect_) + { + if(d_.s.copy_ > 0) + { + auto const n = net::buffer_copy( + d_.b.prepare(std::min( + d_.s.copy_, d_.b.max_size())), + net::buffer(d_.s.buf_)); + d_.b.commit(n); + d_.s.copy_ = static_cast( + d_.s.copy_ - n); + if(d_.s.copy_ > 0) + std::memmove( + d_.s.buf_, + &d_.s.buf_[n], + d_.s.copy_); + } + if(d_.b.size() < d_.b.max_size()) + { + BOOST_ASIO_CORO_YIELD + d_.s.next_layer().async_read_some( + d_.b.prepare(d_.b.max_size() - d_.b.size()), + std::move(*this)); + d_.b.commit(bytes_transferred); + } + bytes_transferred = d_.b.size(); + goto upcall; + } + + d_.s.detect_ = false; + if(d_.b.max_size() < 8) + { + BOOST_ASIO_CORO_YIELD + net::async_read( + d_.s.next_layer(), + net::buffer(d_.s.buf_, 3), + std::move(*this)); + if(ec) + goto upcall; + auto n = bytes_transferred; + BOOST_ASSERT(n == 3); + if( + d_.s.buf_[0] != 'I' || + d_.s.buf_[1] != 'C' || + d_.s.buf_[2] != 'Y') + { + net::buffer_copy( + d_.b.value(), + net::buffer(d_.s.buf_, n)); + if(d_.b.max_size() < 3) + { + d_.s.copy_ = static_cast( + 3 - d_.b.max_size()); + std::memmove( + d_.s.buf_, + &d_.s.buf_[d_.b.max_size()], + d_.s.copy_); + + } + bytes_transferred = (std::min)( + n, d_.b.max_size()); + goto upcall; + } + d_.s.copy_ = static_cast( + buffer_copy( + net::buffer(d_.s.buf_), + icy_stream::version() + d_.b.max_size())); + bytes_transferred = buffer_copy( + d_.b.value(), + icy_stream::version()); + goto upcall; } - bytes_transferred = d.b.size(); - goto upcall; - } - d.s.detect_ = false; - if(d.b.max_size() < 8) - { BOOST_ASIO_CORO_YIELD - net::async_read( - d.s.next_layer(), - net::buffer(d.s.buf_, 3), + net::async_read_until( + d_.s.next_layer(), + beast::detail::ref(d_.b), + detail::match_icy(d_.match), std::move(*this)); if(ec) goto upcall; - auto n = bytes_transferred; - BOOST_ASSERT(n == 3); - if( - d.s.buf_[0] != 'I' || - d.s.buf_[1] != 'C' || - d.s.buf_[2] != 'Y') { - net::buffer_copy( - d.b.value(), - net::buffer(d.s.buf_, n)); - if(d.b.max_size() < 3) + auto n = bytes_transferred; + BOOST_ASSERT(n == d_.b.size()); + if(! d_.match) + goto upcall; + if(d_.b.size() + 5 > d_.b.max_size()) { - d.s.copy_ = static_cast( - 3 - d.b.max_size()); - std::memmove( - d.s.buf_, - &d.s.buf_[d.b.max_size()], - d.s.copy_); - + d_.s.copy_ = static_cast( + n + 5 - d_.b.max_size()); + std::copy( + net::buffers_begin(d_.b.value()) + n - d_.s.copy_, + net::buffers_begin(d_.b.value()) + n, + d_.s.buf_); + n = d_.b.max_size() - 5; + } + { + buffers_suffix> dest( + boost::in_place_init, d_.b.value()); + dest.consume(5); + detail::buffer_shift( + beast::buffers_prefix(n, dest), + beast::buffers_prefix(n, d_.b.value())); + net::buffer_copy(d_.b.value(), icy_stream::version()); + n += 5; + bytes_transferred = n; } - bytes_transferred = (std::min)( - n, d.b.max_size()); - goto upcall; } - d.s.copy_ = static_cast( - buffer_copy( - net::buffer(d.s.buf_), - icy_stream::version() + d.b.max_size())); - bytes_transferred = buffer_copy( - d.b.value(), - icy_stream::version()); - goto upcall; + upcall: + this->invoke(ec, bytes_transferred); } - - BOOST_ASIO_CORO_YIELD - net::async_read_until( - d.s.next_layer(), - beast::detail::ref(d.b), - detail::match_icy(d.match), - std::move(*this)); - if(ec) - goto upcall; - { - auto n = bytes_transferred; - BOOST_ASSERT(n == d.b.size()); - if(! d.match) - goto upcall; - if(d.b.size() + 5 > d.b.max_size()) - { - d.s.copy_ = static_cast( - n + 5 - d.b.max_size()); - std::copy( - net::buffers_begin(d.b.value()) + n - d.s.copy_, - net::buffers_begin(d.b.value()) + n, - d.s.buf_); - n = d.b.max_size() - 5; - } - { - buffers_suffix> dest( - boost::in_place_init, d.b.value()); - dest.consume(5); - detail::buffer_shift( - beast::buffers_prefix(n, dest), - beast::buffers_prefix(n, d.b.value())); - net::buffer_copy(d.b.value(), icy_stream::version()); - n += 5; - bytes_transferred = n; - } - } - upcall: - d_.invoke(ec, bytes_transferred); } -} +}; //------------------------------------------------------------------------------ @@ -524,9 +456,8 @@ async_read_some( read_op< MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE( - ReadHandler, void(error_code, std::size_t))>{ - std::move(init.completion_handler), *this, buffers}( - {}, 0); + ReadHandler, void(error_code, std::size_t))>( + std::move(init.completion_handler), *this, buffers); return init.result.get(); } diff --git a/include/boost/beast/core/buffered_read_stream.hpp b/include/boost/beast/core/buffered_read_stream.hpp index 3ad17f7e..c66237ec 100644 --- a/include/boost/beast/core/buffered_read_stream.hpp +++ b/include/boost/beast/core/buffered_read_stream.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -163,27 +164,18 @@ public: return next_layer_.lowest_layer(); } + using executor_type = + detail::get_executor_type; + /** Get the executor associated with the object. This function may be used to obtain the executor object that the stream uses to dispatch handlers for asynchronous operations. @return A copy of the executor that stream will use to dispatch handlers. - - @note This function participates in overload resolution only if - `NextLayer` has a member function named `get_executor`. */ -#if BOOST_BEAST_DOXYGEN - __implementation_defined__ -#else - template< - class T = next_layer_type, - class = typename std::enable_if< - has_get_executor::value>::type> - auto -#endif - get_executor() noexcept -> - decltype(std::declval().get_executor()) + executor_type + get_executor() noexcept { return next_layer_.get_executor(); } diff --git a/include/boost/beast/core/detail/async_op_base.hpp b/include/boost/beast/core/detail/async_op_base.hpp index 2d349077..1aeeda2a 100644 --- a/include/boost/beast/core/detail/async_op_base.hpp +++ b/include/boost/beast/core/detail/async_op_base.hpp @@ -186,6 +186,19 @@ public: return h_; } + /** Returns ownership of the handler associated with this object + + This function is used to transfer ownership of the handler to + the caller, by move-construction. After the move, the only + valid operations on the base object are move construction and + destruction. + */ + Handler + release_handler() + { + return std::move(h_); + } + protected: /** Constructor diff --git a/include/boost/beast/core/impl/buffered_read_stream.hpp b/include/boost/beast/core/impl/buffered_read_stream.hpp index a2de635e..7f9e3fb9 100644 --- a/include/boost/beast/core/impl/buffered_read_stream.hpp +++ b/include/boost/beast/core/impl/buffered_read_stream.hpp @@ -12,16 +12,10 @@ #include #include -#include #include #include -#include -#include -#include -#include -#include -#include -#include +#include +#include #include #include @@ -32,141 +26,79 @@ template template class buffered_read_stream< Stream, DynamicBuffer>::read_some_op + : public detail::async_op_base< + Handler, detail::get_executor_type> { buffered_read_stream& s_; - net::executor_work_guard().get_executor())> wg_; MutableBufferSequence b_; - Handler h_; int step_ = 0; public: read_some_op(read_some_op&&) = default; read_some_op(read_some_op const&) = delete; - template - read_some_op(DeducedHandler&& h, + template + read_some_op( + Handler_&& h, buffered_read_stream& s, - MutableBufferSequence const& b) - : s_(s) - , wg_(s_.get_executor()) + MutableBufferSequence const& b) + : detail::async_op_base< + Handler, detail::get_executor_type>( + s.get_executor(), std::forward(h)) + , s_(s) , b_(b) - , h_(std::forward(h)) { + (*this)({}, 0); } void - operator()(error_code ec, - std::size_t bytes_transferred); - - // - - using allocator_type = - net::associated_allocator_t; - - using executor_type = - net::associated_executor_t().get_executor())>; - - allocator_type - get_allocator() const noexcept + operator()( + error_code ec, + std::size_t bytes_transferred) { - return net::get_associated_allocator(h_); - } + switch(step_) + { + case 0: + if(s_.buffer_.size() == 0) + { + if(s_.capacity_ == 0) + { + // read (unbuffered) + step_ = 1; + return s_.next_layer_.async_read_some( + b_, std::move(*this)); + } + // read + step_ = 2; + return s_.next_layer_.async_read_some( + s_.buffer_.prepare(read_size( + s_.buffer_, s_.capacity_)), + std::move(*this)); + } + step_ = 3; + return net::post( + s_.get_executor(), + beast::bind_front_handler( + std::move(*this), ec, 0)); - executor_type - get_executor() const noexcept - { - return net::get_associated_executor( - h_, s_.get_executor()); - } + case 1: + // upcall + break; - template - friend - void asio_handler_invoke( - Function&& f, read_some_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); - } + case 2: + s_.buffer_.commit(bytes_transferred); + BOOST_FALLTHROUGH; - friend - void* asio_handler_allocate( - std::size_t size, read_some_op* op) - { - using net::asio_handler_allocate; - return asio_handler_allocate( - size, std::addressof(op->h_)); - } - - friend - void asio_handler_deallocate( - void* p, std::size_t size, read_some_op* op) - { - using net::asio_handler_deallocate; - asio_handler_deallocate( - p, size, std::addressof(op->h_)); - } - - friend - bool asio_handler_is_continuation(read_some_op* op) - { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->h_)); + case 3: + bytes_transferred = + net::buffer_copy(b_, s_.buffer_.data()); + s_.buffer_.consume(bytes_transferred); + break; + } + this->invoke(ec, bytes_transferred); } }; -template -template -void -buffered_read_stream:: -read_some_op:: -operator()( - error_code ec, std::size_t bytes_transferred) -{ - switch(step_) - { - case 0: - if(s_.buffer_.size() == 0) - { - if(s_.capacity_ == 0) - { - // read (unbuffered) - step_ = 1; - return s_.next_layer_.async_read_some( - b_, std::move(*this)); - } - // read - step_ = 2; - return s_.next_layer_.async_read_some( - s_.buffer_.prepare(read_size( - s_.buffer_, s_.capacity_)), - std::move(*this)); - } - step_ = 3; - return net::post( - s_.get_executor(), - beast::bind_front_handler( - std::move(*this), ec, 0)); - - case 1: - // upcall - break; - - case 2: - s_.buffer_.commit(bytes_transferred); - BOOST_FALLTHROUGH; - - case 3: - bytes_transferred = - net::buffer_copy(b_, s_.buffer_.data()); - s_.buffer_.consume(bytes_transferred); - break; - } - h_(ec, bytes_transferred); -} - //------------------------------------------------------------------------------ template @@ -271,9 +203,8 @@ async_read_some( BOOST_BEAST_HANDLER_INIT( ReadHandler, void(error_code, std::size_t)); read_some_op{ - std::move(init.completion_handler), *this, buffers}( - error_code{}, 0); + ReadHandler, void(error_code, std::size_t))>( + std::move(init.completion_handler), *this, buffers); return init.result.get(); } diff --git a/include/boost/beast/http/impl/file_body_win32.ipp b/include/boost/beast/http/impl/file_body_win32.ipp index 2d96ee9e..2f6c9325 100644 --- a/include/boost/beast/http/impl/file_body_win32.ipp +++ b/include/boost/beast/http/impl/file_body_win32.ipp @@ -15,15 +15,11 @@ #include #include #include +#include #include #include -#include -#include #include #include -#include -#include -#include #include #include #include @@ -337,163 +333,110 @@ template< class Protocol, class Handler, bool isRequest, class Fields> class write_some_win32_op + : public beast::detail::async_op_base< + Handler, typename net::basic_stream_socket< + Protocol>::executor_type> { net::basic_stream_socket& sock_; - net::executor_work_guard&>().get_executor())> wg_; serializer, Fields>& sr_; std::size_t bytes_transferred_ = 0; - Handler h_; bool header_ = false; public: - write_some_win32_op(write_some_win32_op&&) = default; - write_some_win32_op(write_some_win32_op const&) = delete; - - template + template write_some_win32_op( - DeducedHandler&& h, + Handler_&& h, net::basic_stream_socket& s, serializer,Fields>& sr) - : sock_(s) - , wg_(sock_.get_executor()) + : beast::detail::async_op_base< + Handler, typename net::basic_stream_socket< + Protocol>::executor_type>( + s.get_executor(), + std::forward(h)) + , sock_(s) , sr_(sr) - , h_(std::forward(h)) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(h_); - } - - using executor_type = - net::associated_executor_t&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - h_, sock_.get_executor()); - } - void - operator()(); + operator()() + { + if(! sr_.is_header_done()) + { + header_ = true; + sr_.split(true); + return detail::async_write_some_impl( + sock_, sr_, std::move(*this)); + } + if(sr_.get().chunked()) + { + return detail::async_write_some_impl( + sock_, sr_, std::move(*this)); + } + auto& w = sr_.writer_impl(); + boost::winapi::DWORD_ const nNumberOfBytesToWrite = + static_cast( + (std::min)( + (std::min)(w.body_.last_ - w.pos_, sr_.limit()), + (std::numeric_limits::max)())); + net::windows::overlapped_ptr overlapped{ + sock_.get_executor().context(), std::move(*this)}; + // Note that we have moved *this, so we cannot access + // the handler since it is now moved-from. We can still + // access simple things like references and built-in types. + auto& ov = *overlapped.get(); + ov.Offset = lowPart(w.pos_); + ov.OffsetHigh = highPart(w.pos_); + auto const bSuccess = ::TransmitFile( + sock_.native_handle(), + sr_.get().body().file_.native_handle(), + nNumberOfBytesToWrite, + 0, + overlapped.get(), + nullptr, + 0); + auto const dwError = boost::winapi::GetLastError(); + if(! bSuccess && dwError != + boost::winapi::ERROR_IO_PENDING_) + { + // VFALCO This needs review, is 0 the right number? + // completed immediately (with error?) + overlapped.complete(error_code{static_cast(dwError), + system_category()}, 0); + return; + } + overlapped.release(); + } void operator()( error_code ec, - std::size_t bytes_transferred = 0); - - friend - bool asio_handler_is_continuation(write_some_win32_op* op) + std::size_t bytes_transferred = 0) { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->h_)); - } - - template - friend - void asio_handler_invoke(Function&& f, write_some_win32_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); + bytes_transferred_ += bytes_transferred; + if(! ec) + { + if(header_) + { + header_ = false; + return (*this)(); + } + auto& w = sr_.writer_impl(); + w.pos_ += bytes_transferred; + BOOST_ASSERT(w.pos_ <= w.body_.last_); + if(w.pos_ >= w.body_.last_) + { + sr_.next(ec, null_lambda{}); + BOOST_ASSERT(! ec); + BOOST_ASSERT(sr_.is_done()); + } + } + this->invoke(ec, bytes_transferred_); } }; -template< - class Protocol, class Handler, - bool isRequest, class Fields> -void -write_some_win32_op< - Protocol, Handler, isRequest, Fields>:: -operator()() -{ - if(! sr_.is_header_done()) - { - header_ = true; - sr_.split(true); - return detail::async_write_some_impl( - sock_, sr_, std::move(*this)); - } - if(sr_.get().chunked()) - { - return detail::async_write_some_impl( - sock_, sr_, std::move(*this)); - } - auto& w = sr_.writer_impl(); - boost::winapi::DWORD_ const nNumberOfBytesToWrite = - static_cast( - (std::min)( - (std::min)(w.body_.last_ - w.pos_, sr_.limit()), - (std::numeric_limits::max)())); - net::windows::overlapped_ptr overlapped{ - sock_.get_executor().context(), std::move(*this)}; - // Note that we have moved *this, so we cannot access - // the handler since it is now moved-from. We can still - // access simple things like references and built-in types. - auto& ov = *overlapped.get(); - ov.Offset = lowPart(w.pos_); - ov.OffsetHigh = highPart(w.pos_); - auto const bSuccess = ::TransmitFile( - sock_.native_handle(), - sr_.get().body().file_.native_handle(), - nNumberOfBytesToWrite, - 0, - overlapped.get(), - nullptr, - 0); - auto const dwError = boost::winapi::GetLastError(); - if(! bSuccess && dwError != - boost::winapi::ERROR_IO_PENDING_) - { - // VFALCO This needs review, is 0 the right number? - // completed immediately (with error?) - overlapped.complete(error_code{static_cast(dwError), - system_category()}, 0); - return; - } - overlapped.release(); -} - -template< - class Protocol, class Handler, - bool isRequest, class Fields> -void -write_some_win32_op< - Protocol, Handler, isRequest, Fields>:: -operator()( - error_code ec, std::size_t bytes_transferred) -{ - bytes_transferred_ += bytes_transferred; - if(! ec) - { - if(header_) - { - header_ = false; - return (*this)(); - } - auto& w = sr_.writer_impl(); - w.pos_ += bytes_transferred; - BOOST_ASSERT(w.pos_ <= w.body_.last_); - if(w.pos_ >= w.body_.last_) - { - sr_.next(ec, null_lambda{}); - BOOST_ASSERT(! ec); - BOOST_ASSERT(sr_.is_done()); - } - } - h_(ec, bytes_transferred_); -} - #endif } // detail diff --git a/include/boost/beast/http/impl/read.ipp b/include/boost/beast/http/impl/read.ipp index 597a3b56..3103a84f 100644 --- a/include/boost/beast/http/impl/read.ipp +++ b/include/boost/beast/http/impl/read.ipp @@ -15,8 +15,9 @@ #include #include #include +#include +#include #include -#include #include namespace boost { @@ -149,7 +150,9 @@ template< bool isRequest, class Body, class Allocator, class Handler> class read_msg_op - : public net::coroutine + : public beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { using parser_type = parser; @@ -164,7 +167,6 @@ class read_msg_op parser_type p; data( - Handler const&, Stream& s_, message_type& m_) : s(s_) @@ -174,92 +176,35 @@ class read_msg_op } }; - handler_ptr d_; + data& d_; public: - read_msg_op(read_msg_op&&) = default; - read_msg_op(read_msg_op const&) = delete; - - template + template read_msg_op( Stream& s, DynamicBuffer& b, message_type& m, - DeducedHandler&& h) - : d_(std::forward(h), s, m) + Handler_&& h) + : beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type>( + s.get_executor(), std::forward(h)) + , d_(beast::detail::allocate_stable( + *this, s, m)) { - http::async_read(s, b, d_->p, std::move(*this)); + http::async_read(d_.s, b, d_.p, std::move(*this)); } void operator()( error_code ec, - std::size_t bytes_transferred); - - // - - using allocator_type = - net::associated_allocator_t; - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval().get_executor())>; - - allocator_type - get_allocator() const noexcept + std::size_t bytes_transferred) { - return net::get_associated_allocator(d_.handler()); - } - - executor_type - get_executor() const noexcept - { - return net::get_associated_executor( - d_.handler(), d_->s.get_executor()); - } - - friend - void* asio_handler_allocate( - std::size_t size, read_msg_op* op) - { - using net::asio_handler_allocate; - return asio_handler_allocate( - size, std::addressof(op->d_.handler())); - } - - friend - void asio_handler_deallocate( - void* p, std::size_t size, read_msg_op* op) - { - using net::asio_handler_deallocate; - asio_handler_deallocate( - p, size, std::addressof(op->d_.handler())); - } - - template - friend - void asio_handler_invoke(Function&& f, read_msg_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->d_.handler())); + if(! ec) + d_.m = d_.p.release(); + this->invoke(ec, bytes_transferred); } }; -template -void -read_msg_op:: -operator()( - error_code ec, - std::size_t bytes_transferred) -{ - auto& d = *d_; - if(! ec) - d.m = d.p.release(); - d_.invoke(ec, bytes_transferred); -} - } // detail //------------------------------------------------------------------------------ diff --git a/include/boost/beast/websocket/impl/accept.ipp b/include/boost/beast/websocket/impl/accept.ipp index 0e481a7a..257a2340 100644 --- a/include/boost/beast/websocket/impl/accept.ipp +++ b/include/boost/beast/websocket/impl/accept.ipp @@ -17,15 +17,11 @@ #include #include #include -#include +#include #include +#include #include #include -#include -#include -#include -#include -#include #include #include #include @@ -40,115 +36,70 @@ namespace websocket { template template class stream::response_op - : public net::coroutine + : public beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { struct data { stream& ws; - net::executor_work_guard&>().get_executor())> wg; error_code result; response_type res; template data( - Handler const&, stream& ws_, http::request> const& req, Decorator const& decorator) : ws(ws_) - , wg(ws.get_executor()) , res(ws_.build_response(req, decorator, result)) { } }; - handler_ptr d_; + data& d_; public: - response_op(response_op&&) = default; - response_op(response_op const&) = delete; - - template - response_op(DeducedHandler&& h, - stream& ws, Args&&... args) - : d_(std::forward(h), - ws, std::forward(args)...) + template< + class Handler_, + class... Args> + response_op( + Handler_&& h, + stream& ws, + Args&&... args) + : beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type>( + ws.get_executor(), std::forward(h)) + , d_(beast::detail::allocate_stable( + *this, ws, std::forward(args)...)) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(d_.handler()); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval< - stream&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - d_.handler(), d_->ws.get_executor()); - } - void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0); - - friend - bool asio_handler_is_continuation(response_op* op) + std::size_t bytes_transferred = 0) { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->d_.handler())); - } + boost::ignore_unused(bytes_transferred); - template - friend - void asio_handler_invoke(Function&& f, response_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->d_.handler())); + BOOST_ASIO_CORO_REENTER(*this) + { + // Send response + BOOST_ASIO_CORO_YIELD + http::async_write(d_.ws.next_layer(), + d_.res, std::move(*this)); + if(! ec) + ec = d_.result; + if(! ec) + { + d_.ws.do_pmd_config(d_.res); + d_.ws.open(role_type::server); + } + this->invoke(ec); + } } }; -template -template -void -stream:: -response_op:: -operator()( - error_code ec, - std::size_t) -{ - auto& d = *d_; - BOOST_ASIO_CORO_REENTER(*this) - { - // Send response - BOOST_ASIO_CORO_YIELD - http::async_write(d.ws.next_layer(), - d.res, std::move(*this)); - if(! ec) - ec = d.result; - if(! ec) - { - d.ws.do_pmd_config(d.res); - d.ws.open(role_type::server); - } - { - auto wg = std::move(d.wg); - d_.invoke(ec); - } - } -} - //------------------------------------------------------------------------------ // read and respond to an upgrade request @@ -156,158 +107,107 @@ operator()( template template class stream::accept_op - : public net::coroutine + : public beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { struct data { stream& ws; - net::executor_work_guard&>().get_executor())> wg; Decorator decorator; http::request_parser p; + data( - Handler const&, stream& ws_, Decorator const& decorator_) : ws(ws_) - , wg(ws.get_executor()) , decorator(decorator_) { } }; - handler_ptr d_; + data& d_; public: - accept_op(accept_op&&) = default; - accept_op(accept_op const&) = delete; - - template - accept_op(DeducedHandler&& h, - stream& ws, Args&&... args) - : d_(std::forward(h), - ws, std::forward(args)...) + template< + class Handler_, + class... Args> + accept_op( + Handler_&& h, + stream& ws, + Args&&... args) + : beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type>( + ws.get_executor(), std::forward(h)) + , d_(beast::detail::allocate_stable( + *this, ws, std::forward(args)...)) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(d_.handler()); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - d_.handler(), d_->ws.get_executor()); - } - template - void run(Buffers const& buffers); + void run(Buffers const& buffers) + { + using net::buffer_copy; + using net::buffer_size; + error_code ec; + auto const mb = beast::detail::dynamic_buffer_prepare( + d_.ws.rd_buf_, buffer_size(buffers), ec, + error::buffer_overflow); + if(ec) + return (*this)(ec); + d_.ws.rd_buf_.commit(buffer_copy(*mb, buffers)); + (*this)(ec); + } void operator()( error_code ec = {}, - std::size_t bytes_used = 0); - - friend - bool asio_handler_is_continuation(accept_op* op) + std::size_t bytes_used = 0) { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->d_.handler())); - } + boost::ignore_unused(bytes_used); - template - friend - void asio_handler_invoke(Function&& f, accept_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->d_.handler())); + BOOST_ASIO_CORO_REENTER(*this) + { + if(ec) + { + BOOST_ASIO_CORO_YIELD + net::post( + d_.ws.get_executor(), + beast::bind_front_handler(std::move(*this), ec)); + } + else + { + BOOST_ASIO_CORO_YIELD + http::async_read( + d_.ws.next_layer(), d_.ws.rd_buf_, + d_.p, std::move(*this)); + if(ec == http::error::end_of_stream) + ec = error::closed; + if(! ec) + { + // Arguments from our state must be + // moved to the stack before releasing + // the handler. + auto& ws = d_.ws; + auto const req = d_.p.release(); + auto const decorator = d_.decorator; + #if 1 + return response_op{ + this->release_handler(), + ws, req, decorator}(ec); + #else + // VFALCO This *should* work but breaks + // coroutine invariants in the unit test. + // Also it calls reset() when it shouldn't. + return ws.async_accept_ex( + req, decorator, this->release_handler()); + #endif + } + } + this->invoke(ec); + } } }; -template -template -template -void -stream:: -accept_op:: -run(Buffers const& buffers) -{ - using net::buffer_copy; - using net::buffer_size; - auto& d = *d_; - error_code ec; - auto const mb = beast::detail::dynamic_buffer_prepare( - d.ws.rd_buf_, buffer_size(buffers), ec, - error::buffer_overflow); - if(ec) - return (*this)(ec); - d.ws.rd_buf_.commit(buffer_copy(*mb, buffers)); - (*this)(ec); -} - -template -template -void -stream:: -accept_op:: -operator()(error_code ec, std::size_t) -{ - auto& d = *d_; - BOOST_ASIO_CORO_REENTER(*this) - { - if(ec) - { - BOOST_ASIO_CORO_YIELD - net::post( - d.ws.get_executor(), - beast::bind_front_handler(std::move(*this), ec)); - } - else - { - BOOST_ASIO_CORO_YIELD - http::async_read( - d.ws.next_layer(), d.ws.rd_buf_, - d.p, std::move(*this)); - if(ec == http::error::end_of_stream) - ec = error::closed; - if(! ec) - { - // Arguments from our state must be - // moved to the stack before releasing - // the handler. - auto& ws = d.ws; - auto const req = d.p.release(); - auto const decorator = d.decorator; - auto wg = std::move(d.wg); - #if 1 - return response_op{ - d_.release_handler(), - ws, req, decorator}(ec); - #else - // VFALCO This *should* work but breaks - // coroutine invariants in the unit test. - // Also it calls reset() when it shouldn't. - return ws.async_accept_ex( - req, decorator, d_.release_handler()); - #endif - } - } - { - auto wg = std::move(d.wg); - d_.invoke(ec); - } - } -} - //------------------------------------------------------------------------------ template diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index d1bb330f..59dcf421 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -11,16 +11,12 @@ #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP #include -#include #include #include +#include +#include #include -#include -#include #include -#include -#include -#include #include #include #include @@ -39,23 +35,21 @@ namespace websocket { template template class stream::close_op - : public net::coroutine + : public beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { struct state { stream& ws; - net::executor_work_guard&>().get_executor())> wg; detail::frame_buffer fb; error_code ev; bool cont = false; state( - Handler const&, stream& ws_, close_reason const& cr) : ws(ws_) - , wg(ws.get_executor()) { // Serialize the close frame ws.template write_close< @@ -63,255 +57,206 @@ class stream::close_op } }; - handler_ptr d_; + state& d_; public: static constexpr int id = 4; // for soft_mutex - close_op(close_op&&) = default; - close_op(close_op const&) = delete; - - template + template close_op( - DeducedHandler&& h, + Handler_&& h, stream& ws, close_reason const& cr) - : d_(std::forward(h), ws, cr) + : beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type>( + ws.get_executor(), std::forward(h)) + , d_(beast::detail::allocate_stable( + *this, ws, cr)) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(d_.handler()); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - d_.handler(), d_->ws.get_executor()); - } - void operator()( error_code ec = {}, std::size_t bytes_transferred = 0, - bool cont = true); - - friend - bool asio_handler_is_continuation(close_op* op) + bool cont = true) { - using net::asio_handler_is_continuation; - return op->d_->cont || asio_handler_is_continuation( - std::addressof(op->d_.handler())); - } - - template - friend - void asio_handler_invoke(Function&& f, close_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, - std::addressof(op->d_.handler())); - } -}; - -template -template -void -stream:: -close_op:: -operator()( - error_code ec, - std::size_t bytes_transferred, - bool cont) -{ - using beast::detail::clamp; - auto& d = *d_; - d.cont = cont; - BOOST_ASIO_CORO_REENTER(*this) - { - // Attempt to acquire write block - if(! d.ws.wr_block_.try_lock(this)) + using beast::detail::clamp; + d_.cont = cont; + BOOST_ASIO_CORO_REENTER(*this) { - // Suspend - BOOST_ASIO_CORO_YIELD - d.ws.paused_close_.emplace(std::move(*this)); + // Attempt to acquire write block + if(! d_.ws.wr_block_.try_lock(this)) + { + // Suspend + BOOST_ASIO_CORO_YIELD + d_.ws.paused_close_.emplace(std::move(*this)); - // Acquire the write block - d.ws.wr_block_.lock(this); + // Acquire the write block + d_.ws.wr_block_.lock(this); - // Resume - BOOST_ASIO_CORO_YIELD - net::post( - d.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); - } - - // Make sure the stream is open - if(! d.ws.check_open(ec)) - goto upcall; - - // Can't call close twice - BOOST_ASSERT(! d.ws.wr_close_); - - // Change status to closing - BOOST_ASSERT(d.ws.status_ == status::open); - d.ws.status_ = status::closing; - - // Send close frame - d.ws.wr_close_ = true; - BOOST_ASIO_CORO_YIELD - net::async_write(d.ws.stream_, - d.fb.data(), std::move(*this)); - if(! d.ws.check_ok(ec)) - goto upcall; - - if(d.ws.rd_close_) - { - // This happens when the read_op gets a close frame - // at the same time close_op is sending the close frame. - // The read_op will be suspended on the write block. - goto teardown; - } - - // Maybe suspend - if(! d.ws.rd_block_.try_lock(this)) - { - // Suspend - BOOST_ASIO_CORO_YIELD - d.ws.paused_r_close_.emplace(std::move(*this)); - - // Acquire the read block - d.ws.rd_block_.lock(this); - - // Resume - BOOST_ASIO_CORO_YIELD - net::post( - d.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d.ws.rd_block_.is_locked(this)); + // Resume + BOOST_ASIO_CORO_YIELD + net::post( + d_.ws.get_executor(), std::move(*this)); + BOOST_ASSERT(d_.ws.wr_block_.is_locked(this)); + } // Make sure the stream is open - BOOST_ASSERT(d.ws.status_ != status::open); - BOOST_ASSERT(d.ws.status_ != status::closed); - if( d.ws.status_ == status::failed) + if(! d_.ws.check_open(ec)) goto upcall; - BOOST_ASSERT(! d.ws.rd_close_); - } + // Can't call close twice + BOOST_ASSERT(! d_.ws.wr_close_); - // Drain - if(d.ws.rd_remain_ > 0) - goto read_payload; - for(;;) - { - // Read frame header - while(! d.ws.parse_fh( - d.ws.rd_fh_, d.ws.rd_buf_, d.ev)) - { - if(d.ev) - goto teardown; - BOOST_ASIO_CORO_YIELD - d.ws.stream_.async_read_some( - d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, - d.ws.rd_buf_.max_size())), - std::move(*this)); - if(! d.ws.check_ok(ec)) - goto upcall; - d.ws.rd_buf_.commit(bytes_transferred); - } - if(detail::is_control(d.ws.rd_fh_.op)) - { - // Process control frame - if(d.ws.rd_fh_.op == detail::opcode::close) - { - BOOST_ASSERT(! d.ws.rd_close_); - d.ws.rd_close_ = true; - auto const mb = buffers_prefix( - clamp(d.ws.rd_fh_.len), - d.ws.rd_buf_.data()); - if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask) - detail::mask_inplace(mb, d.ws.rd_key_); - detail::read_close(d.ws.cr_, mb, d.ev); - if(d.ev) - goto teardown; - d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len)); - goto teardown; - } - d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len)); - } - else - { - read_payload: - while(d.ws.rd_buf_.size() < d.ws.rd_remain_) - { - d.ws.rd_remain_ -= d.ws.rd_buf_.size(); - d.ws.rd_buf_.consume(d.ws.rd_buf_.size()); - BOOST_ASIO_CORO_YIELD - d.ws.stream_.async_read_some( - d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, - d.ws.rd_buf_.max_size())), - std::move(*this)); - if(! d.ws.check_ok(ec)) - goto upcall; - d.ws.rd_buf_.commit(bytes_transferred); - } - BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_); - d.ws.rd_buf_.consume(clamp(d.ws.rd_remain_)); - d.ws.rd_remain_ = 0; - } - } + // Change status to closing + BOOST_ASSERT(d_.ws.status_ == status::open); + d_.ws.status_ = status::closing; - teardown: - // Teardown - BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); - using beast::websocket::async_teardown; - BOOST_ASIO_CORO_YIELD - async_teardown(d.ws.role_, - d.ws.stream_, std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); - if(ec == net::error::eof) - { - // Rationale: - // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error - ec.assign(0, ec.category()); - } - if(! ec) - ec = d.ev; - if(ec) - d.ws.status_ = status::failed; - else - d.ws.status_ = status::closed; - d.ws.close(); - - upcall: - BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); - d.ws.wr_block_.unlock(this); - if(d.ws.rd_block_.try_unlock(this)) - d.ws.paused_r_rd_.maybe_invoke(); - d.ws.paused_rd_.maybe_invoke() || - d.ws.paused_ping_.maybe_invoke() || - d.ws.paused_wr_.maybe_invoke(); - if(! d.cont) - { + // Send close frame + d_.ws.wr_close_ = true; BOOST_ASIO_CORO_YIELD - net::post( - d.ws.get_executor(), - beast::bind_front_handler(std::move(*this), ec)); - } - { - auto wg = std::move(d.wg); - d_.invoke(ec); + net::async_write(d_.ws.stream_, + d_.fb.data(), std::move(*this)); + if(! d_.ws.check_ok(ec)) + goto upcall; + + if(d_.ws.rd_close_) + { + // This happens when the read_op gets a close frame + // at the same time close_op is sending the close frame. + // The read_op will be suspended on the write block. + goto teardown; + } + + // Maybe suspend + if(! d_.ws.rd_block_.try_lock(this)) + { + // Suspend + BOOST_ASIO_CORO_YIELD + d_.ws.paused_r_close_.emplace(std::move(*this)); + + // Acquire the read block + d_.ws.rd_block_.lock(this); + + // Resume + BOOST_ASIO_CORO_YIELD + net::post( + d_.ws.get_executor(), std::move(*this)); + BOOST_ASSERT(d_.ws.rd_block_.is_locked(this)); + + // Make sure the stream is open + BOOST_ASSERT(d_.ws.status_ != status::open); + BOOST_ASSERT(d_.ws.status_ != status::closed); + if( d_.ws.status_ == status::failed) + goto upcall; + + BOOST_ASSERT(! d_.ws.rd_close_); + } + + // Drain + if(d_.ws.rd_remain_ > 0) + goto read_payload; + for(;;) + { + // Read frame header + while(! d_.ws.parse_fh( + d_.ws.rd_fh_, d_.ws.rd_buf_, d_.ev)) + { + if(d_.ev) + goto teardown; + BOOST_ASIO_CORO_YIELD + d_.ws.stream_.async_read_some( + d_.ws.rd_buf_.prepare(read_size(d_.ws.rd_buf_, + d_.ws.rd_buf_.max_size())), + std::move(*this)); + if(! d_.ws.check_ok(ec)) + goto upcall; + d_.ws.rd_buf_.commit(bytes_transferred); + } + if(detail::is_control(d_.ws.rd_fh_.op)) + { + // Process control frame + if(d_.ws.rd_fh_.op == detail::opcode::close) + { + BOOST_ASSERT(! d_.ws.rd_close_); + d_.ws.rd_close_ = true; + auto const mb = buffers_prefix( + clamp(d_.ws.rd_fh_.len), + d_.ws.rd_buf_.data()); + if(d_.ws.rd_fh_.len > 0 && d_.ws.rd_fh_.mask) + detail::mask_inplace(mb, d_.ws.rd_key_); + detail::read_close(d_.ws.cr_, mb, d_.ev); + if(d_.ev) + goto teardown; + d_.ws.rd_buf_.consume(clamp(d_.ws.rd_fh_.len)); + goto teardown; + } + d_.ws.rd_buf_.consume(clamp(d_.ws.rd_fh_.len)); + } + else + { + read_payload: + while(d_.ws.rd_buf_.size() < d_.ws.rd_remain_) + { + d_.ws.rd_remain_ -= d_.ws.rd_buf_.size(); + d_.ws.rd_buf_.consume(d_.ws.rd_buf_.size()); + BOOST_ASIO_CORO_YIELD + d_.ws.stream_.async_read_some( + d_.ws.rd_buf_.prepare(read_size(d_.ws.rd_buf_, + d_.ws.rd_buf_.max_size())), + std::move(*this)); + if(! d_.ws.check_ok(ec)) + goto upcall; + d_.ws.rd_buf_.commit(bytes_transferred); + } + BOOST_ASSERT(d_.ws.rd_buf_.size() >= d_.ws.rd_remain_); + d_.ws.rd_buf_.consume(clamp(d_.ws.rd_remain_)); + d_.ws.rd_remain_ = 0; + } + } + + teardown: + // Teardown + BOOST_ASSERT(d_.ws.wr_block_.is_locked(this)); + using beast::websocket::async_teardown; + BOOST_ASIO_CORO_YIELD + async_teardown(d_.ws.role_, + d_.ws.stream_, std::move(*this)); + BOOST_ASSERT(d_.ws.wr_block_.is_locked(this)); + if(ec == net::error::eof) + { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec = {}; + } + if(! ec) + ec = d_.ev; + if(ec) + d_.ws.status_ = status::failed; + else + d_.ws.status_ = status::closed; + d_.ws.close(); + + upcall: + BOOST_ASSERT(d_.ws.wr_block_.is_locked(this)); + d_.ws.wr_block_.unlock(this); + if(d_.ws.rd_block_.try_unlock(this)) + d_.ws.paused_r_rd_.maybe_invoke(); + d_.ws.paused_rd_.maybe_invoke() || + d_.ws.paused_ping_.maybe_invoke() || + d_.ws.paused_wr_.maybe_invoke(); + if(! d_.cont) + { + BOOST_ASIO_CORO_YIELD + net::post( + d_.ws.get_executor(), + beast::bind_front_handler(std::move(*this), ec)); + } + this->invoke(ec); } } -} +}; //------------------------------------------------------------------------------ diff --git a/include/boost/beast/websocket/impl/handshake.ipp b/include/boost/beast/websocket/impl/handshake.ipp index 4c78b3f6..3b50c426 100644 --- a/include/boost/beast/websocket/impl/handshake.ipp +++ b/include/boost/beast/websocket/impl/handshake.ipp @@ -15,14 +15,10 @@ #include #include #include -#include #include -#include -#include +#include +#include #include -#include -#include -#include #include #include #include @@ -38,13 +34,13 @@ namespace websocket { template template class stream::handshake_op - : public net::coroutine + : public beast::detail::stable_async_op_base> + , public net::coroutine { struct data { stream& ws; - net::executor_work_guard&>().get_executor())> wg; response_type* res_p; detail::sec_ws_key_type key; http::request req; @@ -52,14 +48,12 @@ class stream::handshake_op template data( - Handler const&, - stream& ws_, + stream& ws_, response_type* res_p_, string_view host, string_view target, Decorator const& decorator) : ws(ws_) - , wg(ws.get_executor()) , res_p(res_p_) , req(ws.build_request(key, host, target, decorator)) @@ -68,103 +62,61 @@ class stream::handshake_op } }; - handler_ptr d_; + data& d_; public: - handshake_op(handshake_op&&) = default; - handshake_op(handshake_op const&) = delete; - - template - handshake_op(DeducedHandler&& h, - stream& ws, Args&&... args) - : d_(std::forward(h), - ws, std::forward(args)...) + template< + class Handler_, + class... Args> + handshake_op( + Handler_&& h, + stream& ws, Args&&... args) + : beast::detail::stable_async_op_base>( + ws.get_executor(), std::forward(h)) + , d_(beast::detail::allocate_stable( + *this, ws, std::forward(args)...)) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(d_.handler()); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - d_.handler(), d_->ws.get_executor()); - } - void operator()( error_code ec = {}, - std::size_t bytes_used = 0); - - friend - bool asio_handler_is_continuation(handshake_op* op) + std::size_t bytes_used = 0) { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->d_.handler())); - } + boost::ignore_unused(bytes_used); - template - friend - void asio_handler_invoke(Function&& f, handshake_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, - std::addressof(op->d_.handler())); - } -}; - -template -template -void -stream:: -handshake_op:: -operator()(error_code ec, std::size_t) -{ - auto& d = *d_; - BOOST_ASIO_CORO_REENTER(*this) - { - // Send HTTP Upgrade - d.ws.do_pmd_config(d.req); - BOOST_ASIO_CORO_YIELD - http::async_write(d.ws.stream_, - d.req, std::move(*this)); - if(ec) - goto upcall; - - // VFALCO We could pre-serialize the request to - // a single buffer, send that instead, - // and delete the buffer here. The buffer - // could be a variable block at the end - // of handler_ptr's allocation. - - // Read HTTP response - BOOST_ASIO_CORO_YIELD - http::async_read(d.ws.next_layer(), - d.ws.rd_buf_, d.res, - std::move(*this)); - if(ec) - goto upcall; - d.ws.on_response(d.res, d.key, ec); - if(d.res_p) - swap(d.res, *d.res_p); - upcall: + BOOST_ASIO_CORO_REENTER(*this) { - auto wg = std::move(d.wg); - d_.invoke(ec); + // Send HTTP Upgrade + d_.ws.do_pmd_config(d_.req); + BOOST_ASIO_CORO_YIELD + http::async_write(d_.ws.stream_, + d_.req, std::move(*this)); + if(ec) + goto upcall; + + // VFALCO We could pre-serialize the request to + // a single buffer, send that instead, + // and delete the buffer here. The buffer + // could be a variable block at the end + // of handler_ptr's allocation. + + // Read HTTP response + BOOST_ASIO_CORO_YIELD + http::async_read(d_.ws.next_layer(), + d_.ws.rd_buf_, d_.res, + std::move(*this)); + if(ec) + goto upcall; + d_.ws.on_response(d_.res, d_.key, ec); + if(d_.res_p) + swap(d_.res, *d_.res_p); + upcall: + this->invoke(ec); } } -} +}; template template diff --git a/include/boost/beast/websocket/impl/ping.ipp b/include/boost/beast/websocket/impl/ping.ipp index 20e85dcc..dfb2a6a4 100644 --- a/include/boost/beast/websocket/impl/ping.ipp +++ b/include/boost/beast/websocket/impl/ping.ipp @@ -13,14 +13,10 @@ #include #include #include -#include +#include +#include #include -#include -#include #include -#include -#include -#include #include #include #include @@ -37,22 +33,20 @@ namespace websocket { template template class stream::ping_op - : public net::coroutine + : public beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { struct state { stream& ws; - net::executor_work_guard&>().get_executor())> wg; detail::frame_buffer fb; state( - Handler const&, stream& ws_, detail::opcode op, ping_data const& payload) : ws(ws_) - , wg(ws.get_executor()) { // Serialize the control frame ws.template write_ping< @@ -61,127 +55,82 @@ class stream::ping_op } }; - handler_ptr d_; + state& d_; public: static constexpr int id = 3; // for soft_mutex - ping_op(ping_op&&) = default; - ping_op(ping_op const&) = delete; - - template + template ping_op( - DeducedHandler&& h, + Handler_&& h, stream& ws, detail::opcode op, ping_data const& payload) - : d_(std::forward(h), - ws, op, payload) + : beast::detail::stable_async_op_base< + Handler, beast::detail::get_executor_type>( + ws.get_executor(), std::forward(h)) + , d_(beast::detail::allocate_stable( + *this, ws, op, payload)) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(d_.handler()); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - d_.handler(), d_->ws.get_executor()); - } - void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0); - - friend - bool asio_handler_is_continuation(ping_op* op) + std::size_t bytes_transferred = 0) { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->d_.handler())); - } + boost::ignore_unused(bytes_transferred); - template - friend - void asio_handler_invoke(Function&& f, ping_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke( - f, std::addressof(op->d_.handler())); - } -}; - -template -template -void -stream:: -ping_op:: -operator()(error_code ec, std::size_t) -{ - auto& d = *d_; - BOOST_ASIO_CORO_REENTER(*this) - { - // Maybe suspend - if(d.ws.wr_block_.try_lock(this)) + BOOST_ASIO_CORO_REENTER(*this) { - // Make sure the stream is open - if(! d.ws.check_open(ec)) + // Maybe suspend + if(d_.ws.wr_block_.try_lock(this)) { + // Make sure the stream is open + if(! d_.ws.check_open(ec)) + { + BOOST_ASIO_CORO_YIELD + net::post( + d_.ws.get_executor(), + beast::bind_front_handler(std::move(*this), ec)); + goto upcall; + } + } + else + { + // Suspend + BOOST_ASIO_CORO_YIELD + d_.ws.paused_ping_.emplace(std::move(*this)); + + // Acquire the write block + d_.ws.wr_block_.lock(this); + + // Resume BOOST_ASIO_CORO_YIELD net::post( - d.ws.get_executor(), - beast::bind_front_handler(std::move(*this), ec)); - goto upcall; + d_.ws.get_executor(), std::move(*this)); + BOOST_ASSERT(d_.ws.wr_block_.is_locked(this)); + + // Make sure the stream is open + if(! d_.ws.check_open(ec)) + goto upcall; } - } - else - { - // Suspend + + // Send ping frame BOOST_ASIO_CORO_YIELD - d.ws.paused_ping_.emplace(std::move(*this)); - - // Acquire the write block - d.ws.wr_block_.lock(this); - - // Resume - BOOST_ASIO_CORO_YIELD - net::post( - d.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); - - // Make sure the stream is open - if(! d.ws.check_open(ec)) + net::async_write(d_.ws.stream_, + d_.fb.data(), std::move(*this)); + if(! d_.ws.check_ok(ec)) goto upcall; - } - // Send ping frame - BOOST_ASIO_CORO_YIELD - net::async_write(d.ws.stream_, - d.fb.data(), std::move(*this)); - if(! d.ws.check_ok(ec)) - goto upcall; - - upcall: - d.ws.wr_block_.unlock(this); - d.ws.paused_close_.maybe_invoke() || - d.ws.paused_rd_.maybe_invoke() || - d.ws.paused_wr_.maybe_invoke(); - { - auto wg = std::move(d.wg); - d_.invoke(ec); + upcall: + d_.ws.wr_block_.unlock(this); + d_.ws.paused_close_.maybe_invoke() || + d_.ws.paused_rd_.maybe_invoke() || + d_.ws.paused_wr_.maybe_invoke(); + this->invoke(ec); } } -} +}; //------------------------------------------------------------------------------ diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index 88dba0cc..bbb67a78 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -16,15 +16,12 @@ #include #include #include +#include #include #include #include -#include -#include +#include #include -#include -#include -#include #include #include #include @@ -80,12 +77,11 @@ template< class MutableBufferSequence, class Handler> class stream::read_some_op - : public net::coroutine + : public beast::detail::async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { - Handler h_; - stream& ws_; - net::executor_work_guard&>().get_executor())> wg_; + stream& ws_; MutableBufferSequence bs_; buffers_suffix cb_; std::size_t bytes_written_ = 0; @@ -97,68 +93,25 @@ class stream::read_some_op public: static constexpr int id = 1; // for soft_mutex - read_some_op(read_some_op&&) = default; - read_some_op(read_some_op const&) = delete; - - template + template read_some_op( - DeducedHandler&& h, + Handler_&& h, stream& ws, MutableBufferSequence const& bs) - : h_(std::forward(h)) + : beast::detail::async_op_base< + Handler, beast::detail::get_executor_type>( + ws.get_executor(), std::forward(h)) , ws_(ws) - , wg_(ws_.get_executor()) , bs_(bs) , cb_(bs) , code_(close_code::none) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(h_); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - h_, ws_.get_executor()); - } - - Handler& - handler() - { - return h_; - } - void operator()( error_code ec = {}, std::size_t bytes_transferred = 0, bool cont = true); - - friend - bool asio_handler_is_continuation(read_some_op* op) - { - using net::asio_handler_is_continuation; - return op->cont_ || asio_handler_is_continuation( - std::addressof(op->h_)); - } - - template - friend - void asio_handler_invoke(Function&& f, read_some_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); - } }; template @@ -708,7 +661,7 @@ operator()( beast::bind_front_handler(std::move(*this), ec, bytes_written_)); } - h_(ec, bytes_written_); + this->invoke(ec, bytes_written_); } } @@ -719,34 +672,28 @@ template< class DynamicBuffer, class Handler> class stream::read_op - : public net::coroutine + : public beast::detail::async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { - Handler h_; stream& ws_; - net::executor_work_guard&>().get_executor())> wg_; DynamicBuffer& b_; std::size_t limit_; std::size_t bytes_written_ = 0; bool some_; public: - using allocator_type = - net::associated_allocator_t; - - read_op(read_op&&) = default; - read_op(read_op const&) = delete; - - template + template read_op( - DeducedHandler&& h, + Handler_&& h, stream& ws, DynamicBuffer& b, std::size_t limit, bool some) - : h_(std::forward(h)) + : beast::detail::async_op_base< + Handler, beast::detail::get_executor_type>( + ws.get_executor(), std::forward(h)) , ws_(ws) - , wg_(ws_.get_executor()) , b_(b) , limit_(limit ? limit : ( std::numeric_limits::max)()) @@ -754,84 +701,43 @@ public: { } - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(h_); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - h_, ws_.get_executor()); - } - void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0); - - friend - bool asio_handler_is_continuation(read_op* op) + std::size_t bytes_transferred = 0) { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->h_)); - } - - template - friend - void asio_handler_invoke(Function&& f, read_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); + using beast::detail::clamp; + BOOST_ASIO_CORO_REENTER(*this) + { + do + { + BOOST_ASIO_CORO_YIELD + { + auto mb = beast::detail::dynamic_buffer_prepare(b_, + clamp(ws_.read_size_hint(b_), limit_), + ec, error::buffer_overflow); + if(ec) + net::post( + ws_.get_executor(), + beast::bind_front_handler( + std::move(*this), ec, 0)); + else + read_some_op(std::move(*this), ws_, *mb)( + {}, 0, false); + } + if(ec) + goto upcall; + b_.commit(bytes_transferred); + bytes_written_ += bytes_transferred; + } + while(! some_ && ! ws_.is_message_done()); + upcall: + this->invoke(ec, bytes_written_); + } } }; -template -template -void -stream:: -read_op:: -operator()( - error_code ec, - std::size_t bytes_transferred) -{ - using beast::detail::clamp; - BOOST_ASIO_CORO_REENTER(*this) - { - do - { - BOOST_ASIO_CORO_YIELD - { - auto mb = beast::detail::dynamic_buffer_prepare(b_, - clamp(ws_.read_size_hint(b_), limit_), - ec, error::buffer_overflow); - if(ec) - net::post( - ws_.get_executor(), - beast::bind_front_handler( - std::move(*this), ec, 0)); - else - read_some_op(std::move(*this), ws_, *mb)( - {}, 0, false); - return; - } - if(ec) - break; - b_.commit(bytes_transferred); - bytes_written_ += bytes_transferred; - } - while(! some_ && ! ws_.is_message_done()); - h_(ec, bytes_written_); - } -} - //------------------------------------------------------------------------------ template diff --git a/include/boost/beast/websocket/impl/teardown.ipp b/include/boost/beast/websocket/impl/teardown.ipp index ed03672e..8422aed3 100644 --- a/include/boost/beast/websocket/impl/teardown.ipp +++ b/include/boost/beast/websocket/impl/teardown.ipp @@ -12,12 +12,9 @@ #include #include -#include -#include +#include +#include #include -#include -#include -#include #include #include @@ -28,146 +25,105 @@ namespace websocket { namespace detail { template -class teardown_tcp_op : public net::coroutine +class teardown_tcp_op + : public beast::detail::async_op_base< + Handler, beast::detail::get_executor_type< + net::ip::tcp::socket>> + , public net::coroutine { - using socket_type = - net::ip::tcp::socket; + using socket_type = net::ip::tcp::socket; - Handler h_; socket_type& s_; - net::executor_work_guard().get_executor())> wg_; role_type role_; bool nb_; public: - teardown_tcp_op(teardown_tcp_op&& other) = default; - teardown_tcp_op(teardown_tcp_op const& other) = default; - - template + template teardown_tcp_op( - DeducedHandler&& h, + Handler_&& h, socket_type& s, role_type role) - : h_(std::forward(h)) + : beast::detail::async_op_base< + Handler, beast::detail::get_executor_type< + net::ip::tcp::socket>>(s.get_executor(), + std::forward(h)) , s_(s) - , wg_(s_.get_executor()) , role_(role) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(h_); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - h_, s_.get_executor()); - } - void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0); - - friend - bool asio_handler_is_continuation(teardown_tcp_op* op) + std::size_t bytes_transferred = 0) { - using net::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->h_)); - } - - template - friend - void asio_handler_invoke(Function&& f, teardown_tcp_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); - } -}; - -template -void -teardown_tcp_op:: -operator()(error_code ec, std::size_t bytes_transferred) -{ - using net::buffer; - using tcp = net::ip::tcp; - BOOST_ASIO_CORO_REENTER(*this) - { - nb_ = s_.non_blocking(); - s_.non_blocking(true, ec); - if(! ec) - { - if(role_ == role_type::server) - s_.shutdown(tcp::socket::shutdown_send, ec); - } - if(ec) - { - BOOST_ASIO_CORO_YIELD - net::post( - s_.get_executor(), - beast::bind_front_handler(std::move(*this), ec, 0)); - goto upcall; - } - for(;;) + using net::buffer; + using tcp = net::ip::tcp; + BOOST_ASIO_CORO_REENTER(*this) { + nb_ = s_.non_blocking(); + s_.non_blocking(true, ec); + if(! ec) { - char buf[2048]; - s_.read_some( - net::buffer(buf), ec); - } - if(ec == net::error::would_block) - { - BOOST_ASIO_CORO_YIELD - s_.async_wait( - net::ip::tcp::socket::wait_read, - std::move(*this)); - continue; + if(role_ == role_type::server) + s_.shutdown(tcp::socket::shutdown_send, ec); } if(ec) { - if(ec != net::error::eof) - goto upcall; - ec = {}; - break; + BOOST_ASIO_CORO_YIELD + net::post( + s_.get_executor(), + beast::bind_front_handler(std::move(*this), ec, 0)); + goto upcall; } - if(bytes_transferred == 0) + for(;;) { - // happens sometimes - break; + { + char buf[2048]; + s_.read_some( + net::buffer(buf), ec); + } + if(ec == net::error::would_block) + { + BOOST_ASIO_CORO_YIELD + s_.async_wait( + net::ip::tcp::socket::wait_read, + std::move(*this)); + continue; + } + if(ec) + { + if(ec != net::error::eof) + goto upcall; + ec = {}; + break; + } + if(bytes_transferred == 0) + { + // happens sometimes + // https://github.com/boostorg/beast/issues/1373 + break; + } } + if(role_ == role_type::client) + s_.shutdown(tcp::socket::shutdown_send, ec); + if(ec) + goto upcall; + s_.close(ec); + upcall: + { + error_code ignored; + s_.non_blocking(nb_, ignored); + } + this->invoke(ec); } - if(role_ == role_type::client) - s_.shutdown(tcp::socket::shutdown_send, ec); - if(ec) - goto upcall; - s_.close(ec); - upcall: - { - error_code ignored; - s_.non_blocking(nb_, ignored); - } - h_(ec); } -} +}; } // detail //------------------------------------------------------------------------------ -inline void teardown( role_type role, @@ -195,6 +151,7 @@ teardown( if(bytes_transferred == 0) { // happens sometimes + // https://github.com/boostorg/beast/issues/1373 break; } } @@ -207,7 +164,6 @@ teardown( } template -inline void async_teardown( role_type role, @@ -218,9 +174,9 @@ async_teardown( TeardownHandler, void(error_code)>::value, "TeardownHandler requirements not met"); detail::teardown_tcp_op::type>{std::forward< + TeardownHandler>::type>(std::forward< TeardownHandler>(handler), socket, - role}(); + role)(); } } // websocket diff --git a/include/boost/beast/websocket/impl/write.ipp b/include/boost/beast/websocket/impl/write.ipp index 08dce136..67a85353 100644 --- a/include/boost/beast/websocket/impl/write.ipp +++ b/include/boost/beast/websocket/impl/write.ipp @@ -17,15 +17,12 @@ #include #include #include +#include #include #include +#include #include -#include -#include #include -#include -#include -#include #include #include #include @@ -139,12 +136,11 @@ do_context_takeover_write(role_type role) template template class stream::write_some_op - : public net::coroutine + : public beast::detail::async_op_base< + Handler, beast::detail::get_executor_type> + , public net::coroutine { - Handler h_; - stream& ws_; - net::executor_work_guard&>().get_executor())> wg_; + stream& ws_; buffers_suffix cb_; detail::frame_header fh_; detail::prepared_key key_; @@ -159,69 +155,25 @@ class stream::write_some_op public: static constexpr int id = 2; // for soft_mutex - write_some_op(write_some_op&&) = default; - write_some_op(write_some_op const&) = delete; - - template + template write_some_op( - DeducedHandler&& h, + Handler_&& h, stream& ws, bool fin, Buffers const& bs) - : h_(std::forward(h)) + : beast::detail::async_op_base< + Handler, beast::detail::get_executor_type>( + ws.get_executor(), std::forward(h)) , ws_(ws) - , wg_(ws_.get_executor()) , cb_(bs) , fin_(fin) { } - using allocator_type = - net::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return (net::get_associated_allocator)(h_); - } - - using executor_type = net::associated_executor_t< - Handler, decltype(std::declval&>().get_executor())>; - - executor_type - get_executor() const noexcept - { - return (net::get_associated_executor)( - h_, ws_.get_executor()); - } - - Handler& - handler() - { - return h_; - } - void operator()( error_code ec = {}, std::size_t bytes_transferred = 0, bool cont = true); - - friend - bool asio_handler_is_continuation(write_some_op* op) - { - using net::asio_handler_is_continuation; - return op->cont_ || asio_handler_is_continuation( - std::addressof(op->h_)); - } - - template - friend - void asio_handler_invoke(Function&& f, write_some_op* op) - { - using net::asio_handler_invoke; - asio_handler_invoke( - f, std::addressof(op->h_)); - } }; template @@ -582,7 +534,7 @@ operator()( std::move(*this), ec, bytes_transferred_)); } - h_(ec, bytes_transferred_); + this->invoke(ec, bytes_transferred_); } } diff --git a/include/boost/beast/websocket/teardown.hpp b/include/boost/beast/websocket/teardown.hpp index 1c373df5..19a6f487 100644 --- a/include/boost/beast/websocket/teardown.hpp +++ b/include/boost/beast/websocket/teardown.hpp @@ -125,6 +125,7 @@ namespace websocket { @param ec Set to the error if any occurred. */ +BOOST_BEAST_DECL void teardown( role_type role,