Use async_op_base:

Composed operation implementations use async_op_base and
stable_async_op_base, to eliminate redundant boilerplate.
This commit is contained in:
Vinnie Falco
2019-01-05 19:40:46 -08:00
parent 9e44ae7be5
commit b46953f1bd
16 changed files with 880 additions and 1630 deletions

View File

@@ -4,6 +4,7 @@ Version 202
* Update coverage badge images * Update coverage badge images
* Tidy up basic_stream_socket docs * Tidy up basic_stream_socket docs
* Refactor async_op_base * Refactor async_op_base
* Use async_op_base
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------

View File

@@ -11,14 +11,12 @@
#define BOOST_BEAST_CORE_IMPL_FLAT_STREAM_HPP #define BOOST_BEAST_CORE_IMPL_FLAT_STREAM_HPP
#include <boost/beast/core/buffers_prefix.hpp> #include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/beast/websocket/teardown.hpp> #include <boost/beast/websocket/teardown.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/handler_alloc_hook.hpp> #include <memory>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
namespace boost { namespace boost {
namespace beast { namespace beast {
@@ -26,16 +24,12 @@ namespace beast {
template<class NextLayer> template<class NextLayer>
template<class ConstBufferSequence, class Handler> template<class ConstBufferSequence, class Handler>
class flat_stream<NextLayer>::write_op class flat_stream<NextLayer>::write_op
: public net::coroutine : public detail::async_op_base<Handler,
detail::get_executor_type<flat_stream>>
, public net::coroutine
{ {
using alloc_type = typename using alloc_type = std::allocator<char>;
#if defined(BOOST_NO_CXX11_ALLOCATOR)
net::associated_allocator_t<Handler>::template
rebind<char>::other;
#else
std::allocator_traits<net::associated_allocator_t<Handler>>
::template rebind_alloc<char>;
#endif
struct deleter struct deleter
{ {
@@ -58,123 +52,62 @@ class flat_stream<NextLayer>::write_op
flat_stream<NextLayer>& s_; flat_stream<NextLayer>& s_;
ConstBufferSequence b_; ConstBufferSequence b_;
std::unique_ptr<char, deleter> p_; std::unique_ptr<char, deleter> p_;
Handler h_;
public: public:
template<class DeducedHandler> template<class Handler_>
write_op( write_op(
flat_stream<NextLayer>& s, flat_stream<NextLayer>& s,
ConstBufferSequence const& b, ConstBufferSequence const& b,
DeducedHandler&& h) Handler_&& h)
: s_(s) : detail::async_op_base<Handler,
detail::get_executor_type<flat_stream>>(
s.get_executor(),
std::forward<Handler_>(h))
, s_(s)
, b_(b) , b_(b)
, p_(nullptr, deleter{ , p_(nullptr, deleter{alloc_type{}})
(net::get_associated_allocator)(h)})
, h_(std::forward<DeducedHandler>(h))
{ {
(*this)({}, 0);
} }
void void
operator()( operator()(
boost::system::error_code ec, boost::system::error_code ec,
std::size_t bytes_transferred); std::size_t bytes_transferred)
//
using allocator_type =
net::associated_allocator_t<Handler>;
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<NextLayer&>().get_executor())>;
allocator_type
get_allocator() const noexcept
{ {
return net::get_associated_allocator(h_); BOOST_ASIO_CORO_REENTER(*this)
} {
BOOST_ASIO_CORO_YIELD
executor_type {
get_executor() const noexcept auto const result =
{ coalesce(b_, coalesce_limit);
return net::get_associated_executor( if(result.needs_coalescing)
h_, s_.get_executor()); {
} p_.get_deleter().size = result.size;
p_.reset(p_.get_deleter().alloc.allocate(
template<class Function> p_.get_deleter().size));
friend net::buffer_copy(
void asio_handler_invoke(Function&& f, write_op* op) net::buffer(
{ p_.get(), p_.get_deleter().size),
using net::asio_handler_invoke; b_, result.size);
asio_handler_invoke(f, std::addressof(op->h_)); s_.stream_.async_write_some(
} net::buffer(
p_.get(), p_.get_deleter().size),
friend std::move(*this));
void* asio_handler_allocate( }
std::size_t size, write_op* op) else
{ {
using net::asio_handler_allocate; s_.stream_.async_write_some(
return asio_handler_allocate( boost::beast::buffers_prefix(
size, std::addressof(op->h_)); result.size, b_), std::move(*this));
} }
}
friend p_.reset();
void asio_handler_deallocate( this->invoke(ec, bytes_transferred);
void* p, std::size_t size, write_op* op) }
{
using net::asio_handler_deallocate;
asio_handler_deallocate(
p, size, std::addressof(op->h_));
}
friend
bool asio_handler_is_continuation(write_op* op)
{
using net::asio_handler_is_continuation;
return asio_handler_is_continuation(
std::addressof(op->h_));
} }
}; };
template<class NextLayer>
template<class ConstBufferSequence, class Handler>
void
flat_stream<NextLayer>::
write_op<ConstBufferSequence, Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred)
{
BOOST_ASIO_CORO_REENTER(*this)
{
BOOST_ASIO_CORO_YIELD
{
auto const result = coalesce(b_, coalesce_limit);
if(result.needs_coalescing)
{
p_.get_deleter().size = result.size;
p_.reset(p_.get_deleter().alloc.allocate(
p_.get_deleter().size));
net::buffer_copy(
net::buffer(
p_.get(), p_.get_deleter().size),
b_, result.size);
s_.stream_.async_write_some(
net::buffer(
p_.get(), p_.get_deleter().size),
std::move(*this));
}
else
{
s_.stream_.async_write_some(
boost::beast::buffers_prefix(result.size, b_),
std::move(*this));
}
}
p_.reset();
h_(ec, bytes_transferred);
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template<class NextLayer> template<class NextLayer>
@@ -298,7 +231,7 @@ async_write_some(
WriteHandler, void(error_code, std::size_t)); WriteHandler, void(error_code, std::size_t));
write_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE( write_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code, std::size_t))>{ WriteHandler, void(error_code, std::size_t))>{
*this, buffers, std::move(init.completion_handler)}({}, 0); *this, buffers, std::move(init.completion_handler)};
return init.result.get(); return init.result.get();
} }

View File

@@ -15,10 +15,13 @@
#include <boost/beast/core/buffers_adaptor.hpp> #include <boost/beast/core/buffers_adaptor.hpp>
#include <boost/beast/core/buffers_prefix.hpp> #include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/buffers_suffix.hpp> #include <boost/beast/core/buffers_suffix.hpp>
#include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/buffers_ref.hpp> #include <boost/beast/core/detail/buffers_ref.hpp>
#include <boost/beast/core/detail/stream_algorithm.hpp> #include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/beast/core/handler_ptr.hpp> #include <boost/beast/core/handler_ptr.hpp>
#include <boost/asio/buffers_iterator.hpp> #include <boost/asio/buffers_iterator.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/read.hpp> #include <boost/asio/read.hpp>
#include <boost/asio/read_until.hpp> #include <boost/asio/read_until.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
@@ -118,26 +121,21 @@ public:
template<class NextLayer> template<class NextLayer>
template<class MutableBufferSequence, class Handler> template<class MutableBufferSequence, class Handler>
class icy_stream<NextLayer>::read_op class icy_stream<NextLayer>::read_op
: public net::coroutine : public beast::detail::stable_async_op_base<Handler,
beast::detail::get_executor_type<icy_stream>>
, public net::coroutine
{ {
using alloc_type = typename // VFALCO We need a stable reference to `b`
#if defined(BOOST_NO_CXX11_ALLOCATOR) // to pass to asio's read functions.
net::associated_allocator_t<Handler>::template //
rebind<char>::other;
#else
std::allocator_traits<net::associated_allocator_t<
Handler>>::template rebind_alloc<char>;
#endif
struct data struct data
{ {
icy_stream<NextLayer>& s; icy_stream& s;
buffers_adaptor<MutableBufferSequence> b; buffers_adaptor<MutableBufferSequence> b;
bool match = false; bool match = false;
data( data(
Handler const&, icy_stream& s_,
icy_stream<NextLayer>& s_,
MutableBufferSequence const& b_) MutableBufferSequence const& b_)
: s(s_) : s(s_)
, b(b_) , b(b_)
@@ -145,219 +143,153 @@ class icy_stream<NextLayer>::read_op
} }
}; };
handler_ptr<data, Handler> d_; data& d_;
public: public:
read_op(read_op&&) = default; template<class Handler_>
read_op(read_op const&) = delete;
template<class DeducedHandler, class... Args>
read_op( read_op(
DeducedHandler&& h, Handler_&& h,
icy_stream<NextLayer>& s, icy_stream& s,
MutableBufferSequence const& b) MutableBufferSequence const& b)
: d_(std::forward<DeducedHandler>(h), s, b) : beast::detail::stable_async_op_base<Handler,
beast::detail::get_executor_type<icy_stream>>(
s.get_executor(), std::forward<Handler_>(h))
, d_(beast::detail::allocate_stable<data>(*this, s, b))
{ {
(*this)({}, 0);
} }
void void
operator()( operator()(
boost::system::error_code ec, boost::system::error_code ec,
std::size_t bytes_transferred); std::size_t bytes_transferred)
//
using allocator_type =
net::associated_allocator_t<Handler>;
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<NextLayer&>().get_executor())>;
allocator_type
get_allocator() const noexcept
{ {
return net::get_associated_allocator(d_.handler()); using iterator = net::buffers_iterator<
} typename beast::detail::dynamic_buffer_ref<
buffers_adaptor<MutableBufferSequence>>::const_buffers_type>;
executor_type BOOST_ASIO_CORO_REENTER(*this)
get_executor() const noexcept
{
return net::get_associated_executor(
d_.handler(), d_->s.get_executor());
}
template<class Function>
friend
void asio_handler_invoke(
Function&& f, read_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(
f, std::addressof(op->d_.handler()));
}
friend
void* asio_handler_allocate(
std::size_t size, read_op* op)
{
using net::asio_handler_allocate;
return asio_handler_allocate(
size, std::addressof(op->d_.handler()));
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, read_op* op)
{
using net::asio_handler_deallocate;
asio_handler_deallocate(
p, size, std::addressof(op->d_.handler()));
}
friend
bool asio_handler_is_continuation(read_op* op)
{
using net::asio_handler_is_continuation;
return asio_handler_is_continuation(
std::addressof(op->d_.handler()));
}
};
template<class NextLayer>
template<class MutableBufferSequence, class Handler>
void
icy_stream<NextLayer>::
read_op<MutableBufferSequence, Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred)
{
using iterator = net::buffers_iterator<
typename beast::detail::dynamic_buffer_ref<
buffers_adaptor<MutableBufferSequence>>::const_buffers_type>;
auto& d = *d_;
BOOST_ASIO_CORO_REENTER(*this)
{
if(d.b.max_size() == 0)
{ {
BOOST_ASIO_CORO_YIELD if(d_.b.max_size() == 0)
net::post(d.s.get_executor(),
beast::bind_handler(std::move(*this), ec, 0));
goto upcall;
}
if(! d.s.detect_)
{
if(d.s.copy_ > 0)
{
auto const n = net::buffer_copy(
d.b.prepare(std::min<std::size_t>(
d.s.copy_, d.b.max_size())),
net::buffer(d.s.buf_));
d.b.commit(n);
d.s.copy_ = static_cast<unsigned char>(
d.s.copy_ - n);
if(d.s.copy_ > 0)
std::memmove(
d.s.buf_,
&d.s.buf_[n],
d.s.copy_);
}
if(d.b.size() < d.b.max_size())
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
d.s.next_layer().async_read_some( net::post(d_.s.get_executor(),
d.b.prepare(d.b.max_size() - d.b.size()), beast::bind_handler(std::move(*this), ec, 0));
std::move(*this)); goto upcall;
d.b.commit(bytes_transferred); }
if(! d_.s.detect_)
{
if(d_.s.copy_ > 0)
{
auto const n = net::buffer_copy(
d_.b.prepare(std::min<std::size_t>(
d_.s.copy_, d_.b.max_size())),
net::buffer(d_.s.buf_));
d_.b.commit(n);
d_.s.copy_ = static_cast<unsigned char>(
d_.s.copy_ - n);
if(d_.s.copy_ > 0)
std::memmove(
d_.s.buf_,
&d_.s.buf_[n],
d_.s.copy_);
}
if(d_.b.size() < d_.b.max_size())
{
BOOST_ASIO_CORO_YIELD
d_.s.next_layer().async_read_some(
d_.b.prepare(d_.b.max_size() - d_.b.size()),
std::move(*this));
d_.b.commit(bytes_transferred);
}
bytes_transferred = d_.b.size();
goto upcall;
}
d_.s.detect_ = false;
if(d_.b.max_size() < 8)
{
BOOST_ASIO_CORO_YIELD
net::async_read(
d_.s.next_layer(),
net::buffer(d_.s.buf_, 3),
std::move(*this));
if(ec)
goto upcall;
auto n = bytes_transferred;
BOOST_ASSERT(n == 3);
if(
d_.s.buf_[0] != 'I' ||
d_.s.buf_[1] != 'C' ||
d_.s.buf_[2] != 'Y')
{
net::buffer_copy(
d_.b.value(),
net::buffer(d_.s.buf_, n));
if(d_.b.max_size() < 3)
{
d_.s.copy_ = static_cast<unsigned char>(
3 - d_.b.max_size());
std::memmove(
d_.s.buf_,
&d_.s.buf_[d_.b.max_size()],
d_.s.copy_);
}
bytes_transferred = (std::min)(
n, d_.b.max_size());
goto upcall;
}
d_.s.copy_ = static_cast<unsigned char>(
buffer_copy(
net::buffer(d_.s.buf_),
icy_stream::version() + d_.b.max_size()));
bytes_transferred = buffer_copy(
d_.b.value(),
icy_stream::version());
goto upcall;
} }
bytes_transferred = d.b.size();
goto upcall;
}
d.s.detect_ = false;
if(d.b.max_size() < 8)
{
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_read( net::async_read_until(
d.s.next_layer(), d_.s.next_layer(),
net::buffer(d.s.buf_, 3), beast::detail::ref(d_.b),
detail::match_icy<iterator>(d_.match),
std::move(*this)); std::move(*this));
if(ec) if(ec)
goto upcall; goto upcall;
auto n = bytes_transferred;
BOOST_ASSERT(n == 3);
if(
d.s.buf_[0] != 'I' ||
d.s.buf_[1] != 'C' ||
d.s.buf_[2] != 'Y')
{ {
net::buffer_copy( auto n = bytes_transferred;
d.b.value(), BOOST_ASSERT(n == d_.b.size());
net::buffer(d.s.buf_, n)); if(! d_.match)
if(d.b.max_size() < 3) goto upcall;
if(d_.b.size() + 5 > d_.b.max_size())
{ {
d.s.copy_ = static_cast<unsigned char>( d_.s.copy_ = static_cast<unsigned char>(
3 - d.b.max_size()); n + 5 - d_.b.max_size());
std::memmove( std::copy(
d.s.buf_, net::buffers_begin(d_.b.value()) + n - d_.s.copy_,
&d.s.buf_[d.b.max_size()], net::buffers_begin(d_.b.value()) + n,
d.s.copy_); d_.s.buf_);
n = d_.b.max_size() - 5;
}
{
buffers_suffix<beast::detail::buffers_ref<
MutableBufferSequence>> dest(
boost::in_place_init, d_.b.value());
dest.consume(5);
detail::buffer_shift(
beast::buffers_prefix(n, dest),
beast::buffers_prefix(n, d_.b.value()));
net::buffer_copy(d_.b.value(), icy_stream::version());
n += 5;
bytes_transferred = n;
} }
bytes_transferred = (std::min)(
n, d.b.max_size());
goto upcall;
} }
d.s.copy_ = static_cast<unsigned char>( upcall:
buffer_copy( this->invoke(ec, bytes_transferred);
net::buffer(d.s.buf_),
icy_stream::version() + d.b.max_size()));
bytes_transferred = buffer_copy(
d.b.value(),
icy_stream::version());
goto upcall;
} }
BOOST_ASIO_CORO_YIELD
net::async_read_until(
d.s.next_layer(),
beast::detail::ref(d.b),
detail::match_icy<iterator>(d.match),
std::move(*this));
if(ec)
goto upcall;
{
auto n = bytes_transferred;
BOOST_ASSERT(n == d.b.size());
if(! d.match)
goto upcall;
if(d.b.size() + 5 > d.b.max_size())
{
d.s.copy_ = static_cast<unsigned char>(
n + 5 - d.b.max_size());
std::copy(
net::buffers_begin(d.b.value()) + n - d.s.copy_,
net::buffers_begin(d.b.value()) + n,
d.s.buf_);
n = d.b.max_size() - 5;
}
{
buffers_suffix<beast::detail::buffers_ref<
MutableBufferSequence>> dest(
boost::in_place_init, d.b.value());
dest.consume(5);
detail::buffer_shift(
beast::buffers_prefix(n, dest),
beast::buffers_prefix(n, d.b.value()));
net::buffer_copy(d.b.value(), icy_stream::version());
n += 5;
bytes_transferred = n;
}
}
upcall:
d_.invoke(ec, bytes_transferred);
} }
} };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -524,9 +456,8 @@ async_read_some(
read_op< read_op<
MutableBufferSequence, MutableBufferSequence,
BOOST_ASIO_HANDLER_TYPE( BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{ ReadHandler, void(error_code, std::size_t))>(
std::move(init.completion_handler), *this, buffers}( std::move(init.completion_handler), *this, buffers);
{}, 0);
return init.result.get(); return init.result.get();
} }

View File

@@ -14,6 +14,7 @@
#include <boost/beast/core/error.hpp> #include <boost/beast/core/error.hpp>
#include <boost/beast/core/multi_buffer.hpp> #include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/asio/async_result.hpp> #include <boost/asio/async_result.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
@@ -163,27 +164,18 @@ public:
return next_layer_.lowest_layer(); return next_layer_.lowest_layer();
} }
using executor_type =
detail::get_executor_type<next_layer_type>;
/** Get the executor associated with the object. /** Get the executor associated with the object.
This function may be used to obtain the executor object that the stream This function may be used to obtain the executor object that the stream
uses to dispatch handlers for asynchronous operations. uses to dispatch handlers for asynchronous operations.
@return A copy of the executor that stream will use to dispatch handlers. @return A copy of the executor that stream will use to dispatch handlers.
@note This function participates in overload resolution only if
`NextLayer` has a member function named `get_executor`.
*/ */
#if BOOST_BEAST_DOXYGEN executor_type
__implementation_defined__ get_executor() noexcept
#else
template<
class T = next_layer_type,
class = typename std::enable_if<
has_get_executor<next_layer_type>::value>::type>
auto
#endif
get_executor() noexcept ->
decltype(std::declval<T&>().get_executor())
{ {
return next_layer_.get_executor(); return next_layer_.get_executor();
} }

View File

@@ -186,6 +186,19 @@ public:
return h_; return h_;
} }
/** Returns ownership of the handler associated with this object
This function is used to transfer ownership of the handler to
the caller, by move-construction. After the move, the only
valid operations on the base object are move construction and
destruction.
*/
Handler
release_handler()
{
return std::move(h_);
}
protected: protected:
/** Constructor /** Constructor

View File

@@ -12,16 +12,10 @@
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/error.hpp> #include <boost/beast/core/error.hpp>
#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/read_size.hpp> #include <boost/beast/core/read_size.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/asio/associated_allocator.hpp> #include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_alloc_hook.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
@@ -32,141 +26,79 @@ template<class Stream, class DynamicBuffer>
template<class MutableBufferSequence, class Handler> template<class MutableBufferSequence, class Handler>
class buffered_read_stream< class buffered_read_stream<
Stream, DynamicBuffer>::read_some_op Stream, DynamicBuffer>::read_some_op
: public detail::async_op_base<
Handler, detail::get_executor_type<buffered_read_stream>>
{ {
buffered_read_stream& s_; buffered_read_stream& s_;
net::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
MutableBufferSequence b_; MutableBufferSequence b_;
Handler h_;
int step_ = 0; int step_ = 0;
public: public:
read_some_op(read_some_op&&) = default; read_some_op(read_some_op&&) = default;
read_some_op(read_some_op const&) = delete; read_some_op(read_some_op const&) = delete;
template<class DeducedHandler, class... Args> template<class Handler_>
read_some_op(DeducedHandler&& h, read_some_op(
Handler_&& h,
buffered_read_stream& s, buffered_read_stream& s,
MutableBufferSequence const& b) MutableBufferSequence const& b)
: s_(s) : detail::async_op_base<
, wg_(s_.get_executor()) Handler, detail::get_executor_type<buffered_read_stream>>(
s.get_executor(), std::forward<Handler_>(h))
, s_(s)
, b_(b) , b_(b)
, h_(std::forward<DeducedHandler>(h))
{ {
(*this)({}, 0);
} }
void void
operator()(error_code ec, operator()(
std::size_t bytes_transferred); error_code ec,
std::size_t bytes_transferred)
//
using allocator_type =
net::associated_allocator_t<Handler>;
using executor_type =
net::associated_executor_t<Handler, decltype(
std::declval<buffered_read_stream&>().get_executor())>;
allocator_type
get_allocator() const noexcept
{ {
return net::get_associated_allocator(h_); switch(step_)
} {
case 0:
if(s_.buffer_.size() == 0)
{
if(s_.capacity_ == 0)
{
// read (unbuffered)
step_ = 1;
return s_.next_layer_.async_read_some(
b_, std::move(*this));
}
// read
step_ = 2;
return s_.next_layer_.async_read_some(
s_.buffer_.prepare(read_size(
s_.buffer_, s_.capacity_)),
std::move(*this));
}
step_ = 3;
return net::post(
s_.get_executor(),
beast::bind_front_handler(
std::move(*this), ec, 0));
executor_type case 1:
get_executor() const noexcept // upcall
{ break;
return net::get_associated_executor(
h_, s_.get_executor());
}
template<class Function> case 2:
friend s_.buffer_.commit(bytes_transferred);
void asio_handler_invoke( BOOST_FALLTHROUGH;
Function&& f, read_some_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(f, std::addressof(op->h_));
}
friend case 3:
void* asio_handler_allocate( bytes_transferred =
std::size_t size, read_some_op* op) net::buffer_copy(b_, s_.buffer_.data());
{ s_.buffer_.consume(bytes_transferred);
using net::asio_handler_allocate; break;
return asio_handler_allocate( }
size, std::addressof(op->h_)); this->invoke(ec, bytes_transferred);
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, read_some_op* op)
{
using net::asio_handler_deallocate;
asio_handler_deallocate(
p, size, std::addressof(op->h_));
}
friend
bool asio_handler_is_continuation(read_some_op* op)
{
using net::asio_handler_is_continuation;
return asio_handler_is_continuation(
std::addressof(op->h_));
} }
}; };
template<class Stream, class DynamicBuffer>
template<class MutableBufferSequence, class Handler>
void
buffered_read_stream<Stream, DynamicBuffer>::
read_some_op<MutableBufferSequence, Handler>::
operator()(
error_code ec, std::size_t bytes_transferred)
{
switch(step_)
{
case 0:
if(s_.buffer_.size() == 0)
{
if(s_.capacity_ == 0)
{
// read (unbuffered)
step_ = 1;
return s_.next_layer_.async_read_some(
b_, std::move(*this));
}
// read
step_ = 2;
return s_.next_layer_.async_read_some(
s_.buffer_.prepare(read_size(
s_.buffer_, s_.capacity_)),
std::move(*this));
}
step_ = 3;
return net::post(
s_.get_executor(),
beast::bind_front_handler(
std::move(*this), ec, 0));
case 1:
// upcall
break;
case 2:
s_.buffer_.commit(bytes_transferred);
BOOST_FALLTHROUGH;
case 3:
bytes_transferred =
net::buffer_copy(b_, s_.buffer_.data());
s_.buffer_.consume(bytes_transferred);
break;
}
h_(ec, bytes_transferred);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template<class Stream, class DynamicBuffer> template<class Stream, class DynamicBuffer>
@@ -271,9 +203,8 @@ async_read_some(
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
ReadHandler, void(error_code, std::size_t)); ReadHandler, void(error_code, std::size_t));
read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE( read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{ ReadHandler, void(error_code, std::size_t))>(
std::move(init.completion_handler), *this, buffers}( std::move(init.completion_handler), *this, buffers);
error_code{}, 0);
return init.result.get(); return init.result.get();
} }

View File

@@ -15,15 +15,11 @@
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/buffers_range.hpp> #include <boost/beast/core/buffers_range.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/clamp.hpp> #include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/http/serializer.hpp> #include <boost/beast/http/serializer.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/async_result.hpp> #include <boost/asio/async_result.hpp>
#include <boost/asio/basic_stream_socket.hpp> #include <boost/asio/basic_stream_socket.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/windows/overlapped_ptr.hpp> #include <boost/asio/windows/overlapped_ptr.hpp>
#include <boost/make_unique.hpp> #include <boost/make_unique.hpp>
#include <boost/smart_ptr/make_shared_array.hpp> #include <boost/smart_ptr/make_shared_array.hpp>
@@ -337,163 +333,110 @@ template<
class Protocol, class Handler, class Protocol, class Handler,
bool isRequest, class Fields> bool isRequest, class Fields>
class write_some_win32_op class write_some_win32_op
: public beast::detail::async_op_base<
Handler, typename net::basic_stream_socket<
Protocol>::executor_type>
{ {
net::basic_stream_socket<Protocol>& sock_; net::basic_stream_socket<Protocol>& sock_;
net::executor_work_guard<decltype(std::declval<
net::basic_stream_socket<Protocol>&>().get_executor())> wg_;
serializer<isRequest, serializer<isRequest,
basic_file_body<file_win32>, Fields>& sr_; basic_file_body<file_win32>, Fields>& sr_;
std::size_t bytes_transferred_ = 0; std::size_t bytes_transferred_ = 0;
Handler h_;
bool header_ = false; bool header_ = false;
public: public:
write_some_win32_op(write_some_win32_op&&) = default; template<class Handler_>
write_some_win32_op(write_some_win32_op const&) = delete;
template<class DeducedHandler>
write_some_win32_op( write_some_win32_op(
DeducedHandler&& h, Handler_&& h,
net::basic_stream_socket<Protocol>& s, net::basic_stream_socket<Protocol>& s,
serializer<isRequest, serializer<isRequest,
basic_file_body<file_win32>,Fields>& sr) basic_file_body<file_win32>,Fields>& sr)
: sock_(s) : beast::detail::async_op_base<
, wg_(sock_.get_executor()) Handler, typename net::basic_stream_socket<
Protocol>::executor_type>(
s.get_executor(),
std::forward<Handler_>(h))
, sock_(s)
, sr_(sr) , sr_(sr)
, h_(std::forward<DeducedHandler>(h))
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(h_);
}
using executor_type =
net::associated_executor_t<Handler, decltype(std::declval<
net::basic_stream_socket<Protocol>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
h_, sock_.get_executor());
}
void void
operator()(); operator()()
{
if(! sr_.is_header_done())
{
header_ = true;
sr_.split(true);
return detail::async_write_some_impl(
sock_, sr_, std::move(*this));
}
if(sr_.get().chunked())
{
return detail::async_write_some_impl(
sock_, sr_, std::move(*this));
}
auto& w = sr_.writer_impl();
boost::winapi::DWORD_ const nNumberOfBytesToWrite =
static_cast<boost::winapi::DWORD_>(
(std::min<std::uint64_t>)(
(std::min<std::uint64_t>)(w.body_.last_ - w.pos_, sr_.limit()),
(std::numeric_limits<boost::winapi::DWORD_>::max)()));
net::windows::overlapped_ptr overlapped{
sock_.get_executor().context(), std::move(*this)};
// Note that we have moved *this, so we cannot access
// the handler since it is now moved-from. We can still
// access simple things like references and built-in types.
auto& ov = *overlapped.get();
ov.Offset = lowPart(w.pos_);
ov.OffsetHigh = highPart(w.pos_);
auto const bSuccess = ::TransmitFile(
sock_.native_handle(),
sr_.get().body().file_.native_handle(),
nNumberOfBytesToWrite,
0,
overlapped.get(),
nullptr,
0);
auto const dwError = boost::winapi::GetLastError();
if(! bSuccess && dwError !=
boost::winapi::ERROR_IO_PENDING_)
{
// VFALCO This needs review, is 0 the right number?
// completed immediately (with error?)
overlapped.complete(error_code{static_cast<int>(dwError),
system_category()}, 0);
return;
}
overlapped.release();
}
void void
operator()( operator()(
error_code ec, error_code ec,
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0)
friend
bool asio_handler_is_continuation(write_some_win32_op* op)
{ {
using net::asio_handler_is_continuation; bytes_transferred_ += bytes_transferred;
return asio_handler_is_continuation( if(! ec)
std::addressof(op->h_)); {
} if(header_)
{
template<class Function> header_ = false;
friend return (*this)();
void asio_handler_invoke(Function&& f, write_some_win32_op* op) }
{ auto& w = sr_.writer_impl();
using net::asio_handler_invoke; w.pos_ += bytes_transferred;
asio_handler_invoke(f, std::addressof(op->h_)); BOOST_ASSERT(w.pos_ <= w.body_.last_);
if(w.pos_ >= w.body_.last_)
{
sr_.next(ec, null_lambda{});
BOOST_ASSERT(! ec);
BOOST_ASSERT(sr_.is_done());
}
}
this->invoke(ec, bytes_transferred_);
} }
}; };
template<
class Protocol, class Handler,
bool isRequest, class Fields>
void
write_some_win32_op<
Protocol, Handler, isRequest, Fields>::
operator()()
{
if(! sr_.is_header_done())
{
header_ = true;
sr_.split(true);
return detail::async_write_some_impl(
sock_, sr_, std::move(*this));
}
if(sr_.get().chunked())
{
return detail::async_write_some_impl(
sock_, sr_, std::move(*this));
}
auto& w = sr_.writer_impl();
boost::winapi::DWORD_ const nNumberOfBytesToWrite =
static_cast<boost::winapi::DWORD_>(
(std::min<std::uint64_t>)(
(std::min<std::uint64_t>)(w.body_.last_ - w.pos_, sr_.limit()),
(std::numeric_limits<boost::winapi::DWORD_>::max)()));
net::windows::overlapped_ptr overlapped{
sock_.get_executor().context(), std::move(*this)};
// Note that we have moved *this, so we cannot access
// the handler since it is now moved-from. We can still
// access simple things like references and built-in types.
auto& ov = *overlapped.get();
ov.Offset = lowPart(w.pos_);
ov.OffsetHigh = highPart(w.pos_);
auto const bSuccess = ::TransmitFile(
sock_.native_handle(),
sr_.get().body().file_.native_handle(),
nNumberOfBytesToWrite,
0,
overlapped.get(),
nullptr,
0);
auto const dwError = boost::winapi::GetLastError();
if(! bSuccess && dwError !=
boost::winapi::ERROR_IO_PENDING_)
{
// VFALCO This needs review, is 0 the right number?
// completed immediately (with error?)
overlapped.complete(error_code{static_cast<int>(dwError),
system_category()}, 0);
return;
}
overlapped.release();
}
template<
class Protocol, class Handler,
bool isRequest, class Fields>
void
write_some_win32_op<
Protocol, Handler, isRequest, Fields>::
operator()(
error_code ec, std::size_t bytes_transferred)
{
bytes_transferred_ += bytes_transferred;
if(! ec)
{
if(header_)
{
header_ = false;
return (*this)();
}
auto& w = sr_.writer_impl();
w.pos_ += bytes_transferred;
BOOST_ASSERT(w.pos_ <= w.body_.last_);
if(w.pos_ >= w.body_.last_)
{
sr_.next(ec, null_lambda{});
BOOST_ASSERT(! ec);
BOOST_ASSERT(sr_.is_done());
}
}
h_(ec, bytes_transferred_);
}
#endif #endif
} // detail } // detail

View File

@@ -15,8 +15,9 @@
#include <boost/beast/http/parser.hpp> #include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp> #include <boost/beast/http/read.hpp>
#include <boost/beast/core/handler_ptr.hpp> #include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/beast/core/detail/read.hpp> #include <boost/beast/core/detail/read.hpp>
#include <boost/beast/core/detail/stream_algorithm.hpp>
#include <boost/asio/error.hpp> #include <boost/asio/error.hpp>
namespace boost { namespace boost {
@@ -149,7 +150,9 @@ template<
bool isRequest, class Body, class Allocator, bool isRequest, class Body, class Allocator,
class Handler> class Handler>
class read_msg_op class read_msg_op
: public net::coroutine : public beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<Stream>>
, public net::coroutine
{ {
using parser_type = using parser_type =
parser<isRequest, Body, Allocator>; parser<isRequest, Body, Allocator>;
@@ -164,7 +167,6 @@ class read_msg_op
parser_type p; parser_type p;
data( data(
Handler const&,
Stream& s_, Stream& s_,
message_type& m_) message_type& m_)
: s(s_) : s(s_)
@@ -174,92 +176,35 @@ class read_msg_op
} }
}; };
handler_ptr<data, Handler> d_; data& d_;
public: public:
read_msg_op(read_msg_op&&) = default; template<class Handler_>
read_msg_op(read_msg_op const&) = delete;
template<class DeducedHandler>
read_msg_op( read_msg_op(
Stream& s, Stream& s,
DynamicBuffer& b, DynamicBuffer& b,
message_type& m, message_type& m,
DeducedHandler&& h) Handler_&& h)
: d_(std::forward<DeducedHandler>(h), s, m) : beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<Stream>>(
s.get_executor(), std::forward<Handler_>(h))
, d_(beast::detail::allocate_stable<data>(
*this, s, m))
{ {
http::async_read(s, b, d_->p, std::move(*this)); http::async_read(d_.s, b, d_.p, std::move(*this));
} }
void void
operator()( operator()(
error_code ec, error_code ec,
std::size_t bytes_transferred); std::size_t bytes_transferred)
//
using allocator_type =
net::associated_allocator_t<Handler>;
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<Stream&>().get_executor())>;
allocator_type
get_allocator() const noexcept
{ {
return net::get_associated_allocator(d_.handler()); if(! ec)
} d_.m = d_.p.release();
this->invoke(ec, bytes_transferred);
executor_type
get_executor() const noexcept
{
return net::get_associated_executor(
d_.handler(), d_->s.get_executor());
}
friend
void* asio_handler_allocate(
std::size_t size, read_msg_op* op)
{
using net::asio_handler_allocate;
return asio_handler_allocate(
size, std::addressof(op->d_.handler()));
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, read_msg_op* op)
{
using net::asio_handler_deallocate;
asio_handler_deallocate(
p, size, std::addressof(op->d_.handler()));
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, read_msg_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(f, std::addressof(op->d_.handler()));
} }
}; };
template<class Stream, class DynamicBuffer,
bool isRequest, class Body, class Allocator,
class Handler>
void
read_msg_op<Stream, DynamicBuffer,
isRequest, Body, Allocator, Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred)
{
auto& d = *d_;
if(! ec)
d.m = d.p.release();
d_.invoke(ec, bytes_transferred);
}
} // detail } // detail
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -17,15 +17,11 @@
#include <boost/beast/http/string_body.hpp> #include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/write.hpp> #include <boost/beast/http/write.hpp>
#include <boost/beast/core/buffers_prefix.hpp> #include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/handler_ptr.hpp> #include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/buffer.hpp> #include <boost/beast/core/detail/buffer.hpp>
#include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/beast/core/detail/type_traits.hpp> #include <boost/beast/core/detail/type_traits.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
@@ -40,115 +36,70 @@ namespace websocket {
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
template<class Handler> template<class Handler>
class stream<NextLayer, deflateSupported>::response_op class stream<NextLayer, deflateSupported>::response_op
: public net::coroutine : public beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
struct data struct data
{ {
stream<NextLayer, deflateSupported>& ws; stream<NextLayer, deflateSupported>& ws;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
error_code result; error_code result;
response_type res; response_type res;
template<class Body, class Allocator, class Decorator> template<class Body, class Allocator, class Decorator>
data( data(
Handler const&,
stream<NextLayer, deflateSupported>& ws_, stream<NextLayer, deflateSupported>& ws_,
http::request<Body, http::request<Body,
http::basic_fields<Allocator>> const& req, http::basic_fields<Allocator>> const& req,
Decorator const& decorator) Decorator const& decorator)
: ws(ws_) : ws(ws_)
, wg(ws.get_executor())
, res(ws_.build_response(req, decorator, result)) , res(ws_.build_response(req, decorator, result))
{ {
} }
}; };
handler_ptr<data, Handler> d_; data& d_;
public: public:
response_op(response_op&&) = default; template<
response_op(response_op const&) = delete; class Handler_,
class... Args>
template<class DeducedHandler, class... Args> response_op(
response_op(DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, Args&&... args) stream<NextLayer, deflateSupported>& ws,
: d_(std::forward<DeducedHandler>(h), Args&&... args)
ws, std::forward<Args>(args)...) : beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, d_(beast::detail::allocate_stable<data>(
*this, ws, std::forward<Args>(args)...))
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(d_.handler());
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
d_.handler(), d_->ws.get_executor());
}
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0)
friend
bool asio_handler_is_continuation(response_op* op)
{ {
using net::asio_handler_is_continuation; boost::ignore_unused(bytes_transferred);
return asio_handler_is_continuation(
std::addressof(op->d_.handler()));
}
template<class Function> BOOST_ASIO_CORO_REENTER(*this)
friend {
void asio_handler_invoke(Function&& f, response_op* op) // Send response
{ BOOST_ASIO_CORO_YIELD
using net::asio_handler_invoke; http::async_write(d_.ws.next_layer(),
asio_handler_invoke(f, std::addressof(op->d_.handler())); d_.res, std::move(*this));
if(! ec)
ec = d_.result;
if(! ec)
{
d_.ws.do_pmd_config(d_.res);
d_.ws.open(role_type::server);
}
this->invoke(ec);
}
} }
}; };
template<class NextLayer, bool deflateSupported>
template<class Handler>
void
stream<NextLayer, deflateSupported>::
response_op<Handler>::
operator()(
error_code ec,
std::size_t)
{
auto& d = *d_;
BOOST_ASIO_CORO_REENTER(*this)
{
// Send response
BOOST_ASIO_CORO_YIELD
http::async_write(d.ws.next_layer(),
d.res, std::move(*this));
if(! ec)
ec = d.result;
if(! ec)
{
d.ws.do_pmd_config(d.res);
d.ws.open(role_type::server);
}
{
auto wg = std::move(d.wg);
d_.invoke(ec);
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// read and respond to an upgrade request // read and respond to an upgrade request
@@ -156,158 +107,107 @@ operator()(
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
template<class Decorator, class Handler> template<class Decorator, class Handler>
class stream<NextLayer, deflateSupported>::accept_op class stream<NextLayer, deflateSupported>::accept_op
: public net::coroutine : public beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
struct data struct data
{ {
stream<NextLayer, deflateSupported>& ws; stream<NextLayer, deflateSupported>& ws;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
Decorator decorator; Decorator decorator;
http::request_parser<http::empty_body> p; http::request_parser<http::empty_body> p;
data( data(
Handler const&,
stream<NextLayer, deflateSupported>& ws_, stream<NextLayer, deflateSupported>& ws_,
Decorator const& decorator_) Decorator const& decorator_)
: ws(ws_) : ws(ws_)
, wg(ws.get_executor())
, decorator(decorator_) , decorator(decorator_)
{ {
} }
}; };
handler_ptr<data, Handler> d_; data& d_;
public: public:
accept_op(accept_op&&) = default; template<
accept_op(accept_op const&) = delete; class Handler_,
class... Args>
template<class DeducedHandler, class... Args> accept_op(
accept_op(DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, Args&&... args) stream<NextLayer, deflateSupported>& ws,
: d_(std::forward<DeducedHandler>(h), Args&&... args)
ws, std::forward<Args>(args)...) : beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, d_(beast::detail::allocate_stable<data>(
*this, ws, std::forward<Args>(args)...))
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(d_.handler());
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
d_.handler(), d_->ws.get_executor());
}
template<class Buffers> template<class Buffers>
void run(Buffers const& buffers); void run(Buffers const& buffers)
{
using net::buffer_copy;
using net::buffer_size;
error_code ec;
auto const mb = beast::detail::dynamic_buffer_prepare(
d_.ws.rd_buf_, buffer_size(buffers), ec,
error::buffer_overflow);
if(ec)
return (*this)(ec);
d_.ws.rd_buf_.commit(buffer_copy(*mb, buffers));
(*this)(ec);
}
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_used = 0); std::size_t bytes_used = 0)
friend
bool asio_handler_is_continuation(accept_op* op)
{ {
using net::asio_handler_is_continuation; boost::ignore_unused(bytes_used);
return asio_handler_is_continuation(
std::addressof(op->d_.handler()));
}
template<class Function> BOOST_ASIO_CORO_REENTER(*this)
friend {
void asio_handler_invoke(Function&& f, accept_op* op) if(ec)
{ {
using net::asio_handler_invoke; BOOST_ASIO_CORO_YIELD
asio_handler_invoke(f, std::addressof(op->d_.handler())); net::post(
d_.ws.get_executor(),
beast::bind_front_handler(std::move(*this), ec));
}
else
{
BOOST_ASIO_CORO_YIELD
http::async_read(
d_.ws.next_layer(), d_.ws.rd_buf_,
d_.p, std::move(*this));
if(ec == http::error::end_of_stream)
ec = error::closed;
if(! ec)
{
// Arguments from our state must be
// moved to the stack before releasing
// the handler.
auto& ws = d_.ws;
auto const req = d_.p.release();
auto const decorator = d_.decorator;
#if 1
return response_op<Handler>{
this->release_handler(),
ws, req, decorator}(ec);
#else
// VFALCO This *should* work but breaks
// coroutine invariants in the unit test.
// Also it calls reset() when it shouldn't.
return ws.async_accept_ex(
req, decorator, this->release_handler());
#endif
}
}
this->invoke(ec);
}
} }
}; };
template<class NextLayer, bool deflateSupported>
template<class Decorator, class Handler>
template<class Buffers>
void
stream<NextLayer, deflateSupported>::
accept_op<Decorator, Handler>::
run(Buffers const& buffers)
{
using net::buffer_copy;
using net::buffer_size;
auto& d = *d_;
error_code ec;
auto const mb = beast::detail::dynamic_buffer_prepare(
d.ws.rd_buf_, buffer_size(buffers), ec,
error::buffer_overflow);
if(ec)
return (*this)(ec);
d.ws.rd_buf_.commit(buffer_copy(*mb, buffers));
(*this)(ec);
}
template<class NextLayer, bool deflateSupported>
template<class Decorator, class Handler>
void
stream<NextLayer, deflateSupported>::
accept_op<Decorator, Handler>::
operator()(error_code ec, std::size_t)
{
auto& d = *d_;
BOOST_ASIO_CORO_REENTER(*this)
{
if(ec)
{
BOOST_ASIO_CORO_YIELD
net::post(
d.ws.get_executor(),
beast::bind_front_handler(std::move(*this), ec));
}
else
{
BOOST_ASIO_CORO_YIELD
http::async_read(
d.ws.next_layer(), d.ws.rd_buf_,
d.p, std::move(*this));
if(ec == http::error::end_of_stream)
ec = error::closed;
if(! ec)
{
// Arguments from our state must be
// moved to the stack before releasing
// the handler.
auto& ws = d.ws;
auto const req = d.p.release();
auto const decorator = d.decorator;
auto wg = std::move(d.wg);
#if 1
return response_op<Handler>{
d_.release_handler(),
ws, req, decorator}(ec);
#else
// VFALCO This *should* work but breaks
// coroutine invariants in the unit test.
// Also it calls reset() when it shouldn't.
return ws.async_accept_ex(
req, decorator, d_.release_handler());
#endif
}
}
{
auto wg = std::move(d.wg);
d_.invoke(ec);
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>

View File

@@ -11,16 +11,12 @@
#define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
#include <boost/beast/websocket/teardown.hpp> #include <boost/beast/websocket/teardown.hpp>
#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/flat_static_buffer.hpp> #include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include <memory> #include <memory>
@@ -39,23 +35,21 @@ namespace websocket {
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
template<class Handler> template<class Handler>
class stream<NextLayer, deflateSupported>::close_op class stream<NextLayer, deflateSupported>::close_op
: public net::coroutine : public beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
struct state struct state
{ {
stream<NextLayer, deflateSupported>& ws; stream<NextLayer, deflateSupported>& ws;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
detail::frame_buffer fb; detail::frame_buffer fb;
error_code ev; error_code ev;
bool cont = false; bool cont = false;
state( state(
Handler const&,
stream<NextLayer, deflateSupported>& ws_, stream<NextLayer, deflateSupported>& ws_,
close_reason const& cr) close_reason const& cr)
: ws(ws_) : ws(ws_)
, wg(ws.get_executor())
{ {
// Serialize the close frame // Serialize the close frame
ws.template write_close< ws.template write_close<
@@ -63,255 +57,206 @@ class stream<NextLayer, deflateSupported>::close_op
} }
}; };
handler_ptr<state, Handler> d_; state& d_;
public: public:
static constexpr int id = 4; // for soft_mutex static constexpr int id = 4; // for soft_mutex
close_op(close_op&&) = default; template<class Handler_>
close_op(close_op const&) = delete;
template<class DeducedHandler>
close_op( close_op(
DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, stream<NextLayer, deflateSupported>& ws,
close_reason const& cr) close_reason const& cr)
: d_(std::forward<DeducedHandler>(h), ws, cr) : beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, d_(beast::detail::allocate_stable<state>(
*this, ws, cr))
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(d_.handler());
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
d_.handler(), d_->ws.get_executor());
}
void void
operator()( operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0, std::size_t bytes_transferred = 0,
bool cont = true); bool cont = true)
friend
bool asio_handler_is_continuation(close_op* op)
{ {
using net::asio_handler_is_continuation; using beast::detail::clamp;
return op->d_->cont || asio_handler_is_continuation( d_.cont = cont;
std::addressof(op->d_.handler())); BOOST_ASIO_CORO_REENTER(*this)
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, close_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(f,
std::addressof(op->d_.handler()));
}
};
template<class NextLayer, bool deflateSupported>
template<class Handler>
void
stream<NextLayer, deflateSupported>::
close_op<Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred,
bool cont)
{
using beast::detail::clamp;
auto& d = *d_;
d.cont = cont;
BOOST_ASIO_CORO_REENTER(*this)
{
// Attempt to acquire write block
if(! d.ws.wr_block_.try_lock(this))
{ {
// Suspend // Attempt to acquire write block
BOOST_ASIO_CORO_YIELD if(! d_.ws.wr_block_.try_lock(this))
d.ws.paused_close_.emplace(std::move(*this)); {
// Suspend
BOOST_ASIO_CORO_YIELD
d_.ws.paused_close_.emplace(std::move(*this));
// Acquire the write block // Acquire the write block
d.ws.wr_block_.lock(this); d_.ws.wr_block_.lock(this);
// Resume // Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(
d.ws.get_executor(), std::move(*this)); d_.ws.get_executor(), std::move(*this));
BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
} }
// Make sure the stream is open
if(! d.ws.check_open(ec))
goto upcall;
// Can't call close twice
BOOST_ASSERT(! d.ws.wr_close_);
// Change status to closing
BOOST_ASSERT(d.ws.status_ == status::open);
d.ws.status_ = status::closing;
// Send close frame
d.ws.wr_close_ = true;
BOOST_ASIO_CORO_YIELD
net::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
if(! d.ws.check_ok(ec))
goto upcall;
if(d.ws.rd_close_)
{
// This happens when the read_op gets a close frame
// at the same time close_op is sending the close frame.
// The read_op will be suspended on the write block.
goto teardown;
}
// Maybe suspend
if(! d.ws.rd_block_.try_lock(this))
{
// Suspend
BOOST_ASIO_CORO_YIELD
d.ws.paused_r_close_.emplace(std::move(*this));
// Acquire the read block
d.ws.rd_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
d.ws.get_executor(), std::move(*this));
BOOST_ASSERT(d.ws.rd_block_.is_locked(this));
// Make sure the stream is open // Make sure the stream is open
BOOST_ASSERT(d.ws.status_ != status::open); if(! d_.ws.check_open(ec))
BOOST_ASSERT(d.ws.status_ != status::closed);
if( d.ws.status_ == status::failed)
goto upcall; goto upcall;
BOOST_ASSERT(! d.ws.rd_close_); // Can't call close twice
} BOOST_ASSERT(! d_.ws.wr_close_);
// Drain // Change status to closing
if(d.ws.rd_remain_ > 0) BOOST_ASSERT(d_.ws.status_ == status::open);
goto read_payload; d_.ws.status_ = status::closing;
for(;;)
{
// Read frame header
while(! d.ws.parse_fh(
d.ws.rd_fh_, d.ws.rd_buf_, d.ev))
{
if(d.ev)
goto teardown;
BOOST_ASIO_CORO_YIELD
d.ws.stream_.async_read_some(
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())),
std::move(*this));
if(! d.ws.check_ok(ec))
goto upcall;
d.ws.rd_buf_.commit(bytes_transferred);
}
if(detail::is_control(d.ws.rd_fh_.op))
{
// Process control frame
if(d.ws.rd_fh_.op == detail::opcode::close)
{
BOOST_ASSERT(! d.ws.rd_close_);
d.ws.rd_close_ = true;
auto const mb = buffers_prefix(
clamp(d.ws.rd_fh_.len),
d.ws.rd_buf_.data());
if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask)
detail::mask_inplace(mb, d.ws.rd_key_);
detail::read_close(d.ws.cr_, mb, d.ev);
if(d.ev)
goto teardown;
d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
goto teardown;
}
d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
}
else
{
read_payload:
while(d.ws.rd_buf_.size() < d.ws.rd_remain_)
{
d.ws.rd_remain_ -= d.ws.rd_buf_.size();
d.ws.rd_buf_.consume(d.ws.rd_buf_.size());
BOOST_ASIO_CORO_YIELD
d.ws.stream_.async_read_some(
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())),
std::move(*this));
if(! d.ws.check_ok(ec))
goto upcall;
d.ws.rd_buf_.commit(bytes_transferred);
}
BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_);
d.ws.rd_buf_.consume(clamp(d.ws.rd_remain_));
d.ws.rd_remain_ = 0;
}
}
teardown: // Send close frame
// Teardown d_.ws.wr_close_ = true;
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
using beast::websocket::async_teardown;
BOOST_ASIO_CORO_YIELD
async_teardown(d.ws.role_,
d.ws.stream_, std::move(*this));
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
if(ec == net::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
if(! ec)
ec = d.ev;
if(ec)
d.ws.status_ = status::failed;
else
d.ws.status_ = status::closed;
d.ws.close();
upcall:
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
d.ws.wr_block_.unlock(this);
if(d.ws.rd_block_.try_unlock(this))
d.ws.paused_r_rd_.maybe_invoke();
d.ws.paused_rd_.maybe_invoke() ||
d.ws.paused_ping_.maybe_invoke() ||
d.ws.paused_wr_.maybe_invoke();
if(! d.cont)
{
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::async_write(d_.ws.stream_,
d.ws.get_executor(), d_.fb.data(), std::move(*this));
beast::bind_front_handler(std::move(*this), ec)); if(! d_.ws.check_ok(ec))
} goto upcall;
{
auto wg = std::move(d.wg); if(d_.ws.rd_close_)
d_.invoke(ec); {
// This happens when the read_op gets a close frame
// at the same time close_op is sending the close frame.
// The read_op will be suspended on the write block.
goto teardown;
}
// Maybe suspend
if(! d_.ws.rd_block_.try_lock(this))
{
// Suspend
BOOST_ASIO_CORO_YIELD
d_.ws.paused_r_close_.emplace(std::move(*this));
// Acquire the read block
d_.ws.rd_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(), std::move(*this));
BOOST_ASSERT(d_.ws.rd_block_.is_locked(this));
// Make sure the stream is open
BOOST_ASSERT(d_.ws.status_ != status::open);
BOOST_ASSERT(d_.ws.status_ != status::closed);
if( d_.ws.status_ == status::failed)
goto upcall;
BOOST_ASSERT(! d_.ws.rd_close_);
}
// Drain
if(d_.ws.rd_remain_ > 0)
goto read_payload;
for(;;)
{
// Read frame header
while(! d_.ws.parse_fh(
d_.ws.rd_fh_, d_.ws.rd_buf_, d_.ev))
{
if(d_.ev)
goto teardown;
BOOST_ASIO_CORO_YIELD
d_.ws.stream_.async_read_some(
d_.ws.rd_buf_.prepare(read_size(d_.ws.rd_buf_,
d_.ws.rd_buf_.max_size())),
std::move(*this));
if(! d_.ws.check_ok(ec))
goto upcall;
d_.ws.rd_buf_.commit(bytes_transferred);
}
if(detail::is_control(d_.ws.rd_fh_.op))
{
// Process control frame
if(d_.ws.rd_fh_.op == detail::opcode::close)
{
BOOST_ASSERT(! d_.ws.rd_close_);
d_.ws.rd_close_ = true;
auto const mb = buffers_prefix(
clamp(d_.ws.rd_fh_.len),
d_.ws.rd_buf_.data());
if(d_.ws.rd_fh_.len > 0 && d_.ws.rd_fh_.mask)
detail::mask_inplace(mb, d_.ws.rd_key_);
detail::read_close(d_.ws.cr_, mb, d_.ev);
if(d_.ev)
goto teardown;
d_.ws.rd_buf_.consume(clamp(d_.ws.rd_fh_.len));
goto teardown;
}
d_.ws.rd_buf_.consume(clamp(d_.ws.rd_fh_.len));
}
else
{
read_payload:
while(d_.ws.rd_buf_.size() < d_.ws.rd_remain_)
{
d_.ws.rd_remain_ -= d_.ws.rd_buf_.size();
d_.ws.rd_buf_.consume(d_.ws.rd_buf_.size());
BOOST_ASIO_CORO_YIELD
d_.ws.stream_.async_read_some(
d_.ws.rd_buf_.prepare(read_size(d_.ws.rd_buf_,
d_.ws.rd_buf_.max_size())),
std::move(*this));
if(! d_.ws.check_ok(ec))
goto upcall;
d_.ws.rd_buf_.commit(bytes_transferred);
}
BOOST_ASSERT(d_.ws.rd_buf_.size() >= d_.ws.rd_remain_);
d_.ws.rd_buf_.consume(clamp(d_.ws.rd_remain_));
d_.ws.rd_remain_ = 0;
}
}
teardown:
// Teardown
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
using beast::websocket::async_teardown;
BOOST_ASIO_CORO_YIELD
async_teardown(d_.ws.role_,
d_.ws.stream_, std::move(*this));
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
if(ec == net::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec = {};
}
if(! ec)
ec = d_.ev;
if(ec)
d_.ws.status_ = status::failed;
else
d_.ws.status_ = status::closed;
d_.ws.close();
upcall:
BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
d_.ws.wr_block_.unlock(this);
if(d_.ws.rd_block_.try_unlock(this))
d_.ws.paused_r_rd_.maybe_invoke();
d_.ws.paused_rd_.maybe_invoke() ||
d_.ws.paused_ping_.maybe_invoke() ||
d_.ws.paused_wr_.maybe_invoke();
if(! d_.cont)
{
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(),
beast::bind_front_handler(std::move(*this), ec));
}
this->invoke(ec);
} }
} }
} };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -15,14 +15,10 @@
#include <boost/beast/http/message.hpp> #include <boost/beast/http/message.hpp>
#include <boost/beast/http/read.hpp> #include <boost/beast/http/read.hpp>
#include <boost/beast/http/write.hpp> #include <boost/beast/http/write.hpp>
#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/asio/associated_allocator.hpp> #include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/asio/associated_executor.hpp> #include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include <memory> #include <memory>
@@ -38,13 +34,13 @@ namespace websocket {
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
template<class Handler> template<class Handler>
class stream<NextLayer, deflateSupported>::handshake_op class stream<NextLayer, deflateSupported>::handshake_op
: public net::coroutine : public beast::detail::stable_async_op_base<Handler,
beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
struct data struct data
{ {
stream<NextLayer, deflateSupported>& ws; stream<NextLayer, deflateSupported>& ws;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
response_type* res_p; response_type* res_p;
detail::sec_ws_key_type key; detail::sec_ws_key_type key;
http::request<http::empty_body> req; http::request<http::empty_body> req;
@@ -52,14 +48,12 @@ class stream<NextLayer, deflateSupported>::handshake_op
template<class Decorator> template<class Decorator>
data( data(
Handler const&, stream& ws_,
stream<NextLayer, deflateSupported>& ws_,
response_type* res_p_, response_type* res_p_,
string_view host, string_view host,
string_view target, string_view target,
Decorator const& decorator) Decorator const& decorator)
: ws(ws_) : ws(ws_)
, wg(ws.get_executor())
, res_p(res_p_) , res_p(res_p_)
, req(ws.build_request(key, , req(ws.build_request(key,
host, target, decorator)) host, target, decorator))
@@ -68,103 +62,61 @@ class stream<NextLayer, deflateSupported>::handshake_op
} }
}; };
handler_ptr<data, Handler> d_; data& d_;
public: public:
handshake_op(handshake_op&&) = default; template<
handshake_op(handshake_op const&) = delete; class Handler_,
class... Args>
template<class DeducedHandler, class... Args> handshake_op(
handshake_op(DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, Args&&... args) stream& ws, Args&&... args)
: d_(std::forward<DeducedHandler>(h), : beast::detail::stable_async_op_base<Handler,
ws, std::forward<Args>(args)...) beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, d_(beast::detail::allocate_stable<data>(
*this, ws, std::forward<Args>(args)...))
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(d_.handler());
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
d_.handler(), d_->ws.get_executor());
}
void void
operator()( operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_used = 0); std::size_t bytes_used = 0)
friend
bool asio_handler_is_continuation(handshake_op* op)
{ {
using net::asio_handler_is_continuation; boost::ignore_unused(bytes_used);
return asio_handler_is_continuation(
std::addressof(op->d_.handler()));
}
template<class Function> BOOST_ASIO_CORO_REENTER(*this)
friend
void asio_handler_invoke(Function&& f, handshake_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(f,
std::addressof(op->d_.handler()));
}
};
template<class NextLayer, bool deflateSupported>
template<class Handler>
void
stream<NextLayer, deflateSupported>::
handshake_op<Handler>::
operator()(error_code ec, std::size_t)
{
auto& d = *d_;
BOOST_ASIO_CORO_REENTER(*this)
{
// Send HTTP Upgrade
d.ws.do_pmd_config(d.req);
BOOST_ASIO_CORO_YIELD
http::async_write(d.ws.stream_,
d.req, std::move(*this));
if(ec)
goto upcall;
// VFALCO We could pre-serialize the request to
// a single buffer, send that instead,
// and delete the buffer here. The buffer
// could be a variable block at the end
// of handler_ptr's allocation.
// Read HTTP response
BOOST_ASIO_CORO_YIELD
http::async_read(d.ws.next_layer(),
d.ws.rd_buf_, d.res,
std::move(*this));
if(ec)
goto upcall;
d.ws.on_response(d.res, d.key, ec);
if(d.res_p)
swap(d.res, *d.res_p);
upcall:
{ {
auto wg = std::move(d.wg); // Send HTTP Upgrade
d_.invoke(ec); d_.ws.do_pmd_config(d_.req);
BOOST_ASIO_CORO_YIELD
http::async_write(d_.ws.stream_,
d_.req, std::move(*this));
if(ec)
goto upcall;
// VFALCO We could pre-serialize the request to
// a single buffer, send that instead,
// and delete the buffer here. The buffer
// could be a variable block at the end
// of handler_ptr's allocation.
// Read HTTP response
BOOST_ASIO_CORO_YIELD
http::async_read(d_.ws.next_layer(),
d_.ws.rd_buf_, d_.res,
std::move(*this));
if(ec)
goto upcall;
d_.ws.on_response(d_.res, d_.key, ec);
if(d_.res_p)
swap(d_.res, *d_.res_p);
upcall:
this->invoke(ec);
} }
} }
} };
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
template<class HandshakeHandler> template<class HandshakeHandler>

View File

@@ -13,14 +13,10 @@
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/handler_ptr.hpp> #include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/beast/websocket/detail/frame.hpp> #include <boost/beast/websocket/detail/frame.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include <memory> #include <memory>
@@ -37,22 +33,20 @@ namespace websocket {
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
template<class Handler> template<class Handler>
class stream<NextLayer, deflateSupported>::ping_op class stream<NextLayer, deflateSupported>::ping_op
: public net::coroutine : public beast::detail::stable_async_op_base<
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
struct state struct state
{ {
stream<NextLayer, deflateSupported>& ws; stream<NextLayer, deflateSupported>& ws;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
detail::frame_buffer fb; detail::frame_buffer fb;
state( state(
Handler const&,
stream<NextLayer, deflateSupported>& ws_, stream<NextLayer, deflateSupported>& ws_,
detail::opcode op, detail::opcode op,
ping_data const& payload) ping_data const& payload)
: ws(ws_) : ws(ws_)
, wg(ws.get_executor())
{ {
// Serialize the control frame // Serialize the control frame
ws.template write_ping< ws.template write_ping<
@@ -61,127 +55,82 @@ class stream<NextLayer, deflateSupported>::ping_op
} }
}; };
handler_ptr<state, Handler> d_; state& d_;
public: public:
static constexpr int id = 3; // for soft_mutex static constexpr int id = 3; // for soft_mutex
ping_op(ping_op&&) = default; template<class Handler_>
ping_op(ping_op const&) = delete;
template<class DeducedHandler>
ping_op( ping_op(
DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, stream<NextLayer, deflateSupported>& ws,
detail::opcode op, detail::opcode op,
ping_data const& payload) ping_data const& payload)
: d_(std::forward<DeducedHandler>(h), : beast::detail::stable_async_op_base<
ws, op, payload) Handler, beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, d_(beast::detail::allocate_stable<state>(
*this, ws, op, payload))
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(d_.handler());
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
d_.handler(), d_->ws.get_executor());
}
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0)
friend
bool asio_handler_is_continuation(ping_op* op)
{ {
using net::asio_handler_is_continuation; boost::ignore_unused(bytes_transferred);
return asio_handler_is_continuation(
std::addressof(op->d_.handler()));
}
template<class Function> BOOST_ASIO_CORO_REENTER(*this)
friend
void asio_handler_invoke(Function&& f, ping_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(
f, std::addressof(op->d_.handler()));
}
};
template<class NextLayer, bool deflateSupported>
template<class Handler>
void
stream<NextLayer, deflateSupported>::
ping_op<Handler>::
operator()(error_code ec, std::size_t)
{
auto& d = *d_;
BOOST_ASIO_CORO_REENTER(*this)
{
// Maybe suspend
if(d.ws.wr_block_.try_lock(this))
{ {
// Make sure the stream is open // Maybe suspend
if(! d.ws.check_open(ec)) if(d_.ws.wr_block_.try_lock(this))
{ {
// Make sure the stream is open
if(! d_.ws.check_open(ec))
{
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(),
beast::bind_front_handler(std::move(*this), ec));
goto upcall;
}
}
else
{
// Suspend
BOOST_ASIO_CORO_YIELD
d_.ws.paused_ping_.emplace(std::move(*this));
// Acquire the write block
d_.ws.wr_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(
d.ws.get_executor(), d_.ws.get_executor(), std::move(*this));
beast::bind_front_handler(std::move(*this), ec)); BOOST_ASSERT(d_.ws.wr_block_.is_locked(this));
goto upcall;
// Make sure the stream is open
if(! d_.ws.check_open(ec))
goto upcall;
} }
}
else // Send ping frame
{
// Suspend
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
d.ws.paused_ping_.emplace(std::move(*this)); net::async_write(d_.ws.stream_,
d_.fb.data(), std::move(*this));
// Acquire the write block if(! d_.ws.check_ok(ec))
d.ws.wr_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
d.ws.get_executor(), std::move(*this));
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
// Make sure the stream is open
if(! d.ws.check_open(ec))
goto upcall; goto upcall;
}
// Send ping frame upcall:
BOOST_ASIO_CORO_YIELD d_.ws.wr_block_.unlock(this);
net::async_write(d.ws.stream_, d_.ws.paused_close_.maybe_invoke() ||
d.fb.data(), std::move(*this)); d_.ws.paused_rd_.maybe_invoke() ||
if(! d.ws.check_ok(ec)) d_.ws.paused_wr_.maybe_invoke();
goto upcall; this->invoke(ec);
upcall:
d.ws.wr_block_.unlock(this);
d.ws.paused_close_.maybe_invoke() ||
d.ws.paused_rd_.maybe_invoke() ||
d.ws.paused_wr_.maybe_invoke();
{
auto wg = std::move(d.wg);
d_.invoke(ec);
} }
} }
} };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -16,15 +16,12 @@
#include <boost/beast/core/buffers_suffix.hpp> #include <boost/beast/core/buffers_suffix.hpp>
#include <boost/beast/core/flat_static_buffer.hpp> #include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/buffer.hpp> #include <boost/beast/core/detail/buffer.hpp>
#include <boost/beast/core/detail/clamp.hpp> #include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#include <boost/asio/associated_allocator.hpp> #include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
#include <boost/config.hpp> #include <boost/config.hpp>
@@ -80,12 +77,11 @@ template<
class MutableBufferSequence, class MutableBufferSequence,
class Handler> class Handler>
class stream<NextLayer, deflateSupported>::read_some_op class stream<NextLayer, deflateSupported>::read_some_op
: public net::coroutine : public beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
Handler h_; stream& ws_;
stream<NextLayer, deflateSupported>& ws_;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
MutableBufferSequence bs_; MutableBufferSequence bs_;
buffers_suffix<MutableBufferSequence> cb_; buffers_suffix<MutableBufferSequence> cb_;
std::size_t bytes_written_ = 0; std::size_t bytes_written_ = 0;
@@ -97,68 +93,25 @@ class stream<NextLayer, deflateSupported>::read_some_op
public: public:
static constexpr int id = 1; // for soft_mutex static constexpr int id = 1; // for soft_mutex
read_some_op(read_some_op&&) = default; template<class Handler_>
read_some_op(read_some_op const&) = delete;
template<class DeducedHandler>
read_some_op( read_some_op(
DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, stream<NextLayer, deflateSupported>& ws,
MutableBufferSequence const& bs) MutableBufferSequence const& bs)
: h_(std::forward<DeducedHandler>(h)) : beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, ws_(ws) , ws_(ws)
, wg_(ws_.get_executor())
, bs_(bs) , bs_(bs)
, cb_(bs) , cb_(bs)
, code_(close_code::none) , code_(close_code::none)
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(h_);
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
h_, ws_.get_executor());
}
Handler&
handler()
{
return h_;
}
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0, std::size_t bytes_transferred = 0,
bool cont = true); bool cont = true);
friend
bool asio_handler_is_continuation(read_some_op* op)
{
using net::asio_handler_is_continuation;
return op->cont_ || asio_handler_is_continuation(
std::addressof(op->h_));
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, read_some_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(f, std::addressof(op->h_));
}
}; };
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
@@ -708,7 +661,7 @@ operator()(
beast::bind_front_handler(std::move(*this), beast::bind_front_handler(std::move(*this),
ec, bytes_written_)); ec, bytes_written_));
} }
h_(ec, bytes_written_); this->invoke(ec, bytes_written_);
} }
} }
@@ -719,34 +672,28 @@ template<
class DynamicBuffer, class DynamicBuffer,
class Handler> class Handler>
class stream<NextLayer, deflateSupported>::read_op class stream<NextLayer, deflateSupported>::read_op
: public net::coroutine : public beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
Handler h_;
stream<NextLayer, deflateSupported>& ws_; stream<NextLayer, deflateSupported>& ws_;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
DynamicBuffer& b_; DynamicBuffer& b_;
std::size_t limit_; std::size_t limit_;
std::size_t bytes_written_ = 0; std::size_t bytes_written_ = 0;
bool some_; bool some_;
public: public:
using allocator_type = template<class Handler_>
net::associated_allocator_t<Handler>;
read_op(read_op&&) = default;
read_op(read_op const&) = delete;
template<class DeducedHandler>
read_op( read_op(
DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, stream<NextLayer, deflateSupported>& ws,
DynamicBuffer& b, DynamicBuffer& b,
std::size_t limit, std::size_t limit,
bool some) bool some)
: h_(std::forward<DeducedHandler>(h)) : beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, ws_(ws) , ws_(ws)
, wg_(ws_.get_executor())
, b_(b) , b_(b)
, limit_(limit ? limit : ( , limit_(limit ? limit : (
std::numeric_limits<std::size_t>::max)()) std::numeric_limits<std::size_t>::max)())
@@ -754,84 +701,43 @@ public:
{ {
} }
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(h_);
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
h_, ws_.get_executor());
}
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0)
friend
bool asio_handler_is_continuation(read_op* op)
{ {
using net::asio_handler_is_continuation; using beast::detail::clamp;
return asio_handler_is_continuation( BOOST_ASIO_CORO_REENTER(*this)
std::addressof(op->h_)); {
} do
{
template<class Function> BOOST_ASIO_CORO_YIELD
friend {
void asio_handler_invoke(Function&& f, read_op* op) auto mb = beast::detail::dynamic_buffer_prepare(b_,
{ clamp(ws_.read_size_hint(b_), limit_),
using net::asio_handler_invoke; ec, error::buffer_overflow);
asio_handler_invoke(f, std::addressof(op->h_)); if(ec)
net::post(
ws_.get_executor(),
beast::bind_front_handler(
std::move(*this), ec, 0));
else
read_some_op<typename
DynamicBuffer::mutable_buffers_type,
read_op>(std::move(*this), ws_, *mb)(
{}, 0, false);
}
if(ec)
goto upcall;
b_.commit(bytes_transferred);
bytes_written_ += bytes_transferred;
}
while(! some_ && ! ws_.is_message_done());
upcall:
this->invoke(ec, bytes_written_);
}
} }
}; };
template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer, class Handler>
void
stream<NextLayer, deflateSupported>::
read_op<DynamicBuffer, Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred)
{
using beast::detail::clamp;
BOOST_ASIO_CORO_REENTER(*this)
{
do
{
BOOST_ASIO_CORO_YIELD
{
auto mb = beast::detail::dynamic_buffer_prepare(b_,
clamp(ws_.read_size_hint(b_), limit_),
ec, error::buffer_overflow);
if(ec)
net::post(
ws_.get_executor(),
beast::bind_front_handler(
std::move(*this), ec, 0));
else
read_some_op<typename
DynamicBuffer::mutable_buffers_type,
read_op>(std::move(*this), ws_, *mb)(
{}, 0, false);
return;
}
if(ec)
break;
b_.commit(bytes_transferred);
bytes_written_ += bytes_transferred;
}
while(! some_ && ! ws_.is_message_done());
h_(ec, bytes_written_);
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>

View File

@@ -12,12 +12,9 @@
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/asio/associated_allocator.hpp> #include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/asio/associated_executor.hpp> #include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <memory> #include <memory>
@@ -28,146 +25,105 @@ namespace websocket {
namespace detail { namespace detail {
template<class Handler> template<class Handler>
class teardown_tcp_op : public net::coroutine class teardown_tcp_op
: public beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<
net::ip::tcp::socket>>
, public net::coroutine
{ {
using socket_type = using socket_type = net::ip::tcp::socket;
net::ip::tcp::socket;
Handler h_;
socket_type& s_; socket_type& s_;
net::executor_work_guard<decltype(std::declval<
socket_type&>().get_executor())> wg_;
role_type role_; role_type role_;
bool nb_; bool nb_;
public: public:
teardown_tcp_op(teardown_tcp_op&& other) = default; template<class Handler_>
teardown_tcp_op(teardown_tcp_op const& other) = default;
template<class DeducedHandler>
teardown_tcp_op( teardown_tcp_op(
DeducedHandler&& h, Handler_&& h,
socket_type& s, socket_type& s,
role_type role) role_type role)
: h_(std::forward<DeducedHandler>(h)) : beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<
net::ip::tcp::socket>>(s.get_executor(),
std::forward<Handler_>(h))
, s_(s) , s_(s)
, wg_(s_.get_executor())
, role_(role) , role_(role)
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(h_);
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<socket_type&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
h_, s_.get_executor());
}
void void
operator()( operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0)
friend
bool asio_handler_is_continuation(teardown_tcp_op* op)
{ {
using net::asio_handler_is_continuation; using net::buffer;
return asio_handler_is_continuation( using tcp = net::ip::tcp;
std::addressof(op->h_)); BOOST_ASIO_CORO_REENTER(*this)
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, teardown_tcp_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(f, std::addressof(op->h_));
}
};
template<class Handler>
void
teardown_tcp_op<Handler>::
operator()(error_code ec, std::size_t bytes_transferred)
{
using net::buffer;
using tcp = net::ip::tcp;
BOOST_ASIO_CORO_REENTER(*this)
{
nb_ = s_.non_blocking();
s_.non_blocking(true, ec);
if(! ec)
{
if(role_ == role_type::server)
s_.shutdown(tcp::socket::shutdown_send, ec);
}
if(ec)
{
BOOST_ASIO_CORO_YIELD
net::post(
s_.get_executor(),
beast::bind_front_handler(std::move(*this), ec, 0));
goto upcall;
}
for(;;)
{ {
nb_ = s_.non_blocking();
s_.non_blocking(true, ec);
if(! ec)
{ {
char buf[2048]; if(role_ == role_type::server)
s_.read_some( s_.shutdown(tcp::socket::shutdown_send, ec);
net::buffer(buf), ec);
}
if(ec == net::error::would_block)
{
BOOST_ASIO_CORO_YIELD
s_.async_wait(
net::ip::tcp::socket::wait_read,
std::move(*this));
continue;
} }
if(ec) if(ec)
{ {
if(ec != net::error::eof) BOOST_ASIO_CORO_YIELD
goto upcall; net::post(
ec = {}; s_.get_executor(),
break; beast::bind_front_handler(std::move(*this), ec, 0));
goto upcall;
} }
if(bytes_transferred == 0) for(;;)
{ {
// happens sometimes {
break; char buf[2048];
s_.read_some(
net::buffer(buf), ec);
}
if(ec == net::error::would_block)
{
BOOST_ASIO_CORO_YIELD
s_.async_wait(
net::ip::tcp::socket::wait_read,
std::move(*this));
continue;
}
if(ec)
{
if(ec != net::error::eof)
goto upcall;
ec = {};
break;
}
if(bytes_transferred == 0)
{
// happens sometimes
// https://github.com/boostorg/beast/issues/1373
break;
}
} }
if(role_ == role_type::client)
s_.shutdown(tcp::socket::shutdown_send, ec);
if(ec)
goto upcall;
s_.close(ec);
upcall:
{
error_code ignored;
s_.non_blocking(nb_, ignored);
}
this->invoke(ec);
} }
if(role_ == role_type::client)
s_.shutdown(tcp::socket::shutdown_send, ec);
if(ec)
goto upcall;
s_.close(ec);
upcall:
{
error_code ignored;
s_.non_blocking(nb_, ignored);
}
h_(ec);
} }
} };
} // detail } // detail
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
inline
void void
teardown( teardown(
role_type role, role_type role,
@@ -195,6 +151,7 @@ teardown(
if(bytes_transferred == 0) if(bytes_transferred == 0)
{ {
// happens sometimes // happens sometimes
// https://github.com/boostorg/beast/issues/1373
break; break;
} }
} }
@@ -207,7 +164,6 @@ teardown(
} }
template<class TeardownHandler> template<class TeardownHandler>
inline
void void
async_teardown( async_teardown(
role_type role, role_type role,
@@ -218,9 +174,9 @@ async_teardown(
TeardownHandler, void(error_code)>::value, TeardownHandler, void(error_code)>::value,
"TeardownHandler requirements not met"); "TeardownHandler requirements not met");
detail::teardown_tcp_op<typename std::decay< detail::teardown_tcp_op<typename std::decay<
TeardownHandler>::type>{std::forward< TeardownHandler>::type>(std::forward<
TeardownHandler>(handler), socket, TeardownHandler>(handler), socket,
role}(); role)();
} }
} // websocket } // websocket

View File

@@ -17,15 +17,12 @@
#include <boost/beast/core/buffers_suffix.hpp> #include <boost/beast/core/buffers_suffix.hpp>
#include <boost/beast/core/flat_static_buffer.hpp> #include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/async_op_base.hpp>
#include <boost/beast/core/detail/clamp.hpp> #include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#include <boost/beast/core/detail/get_executor_type.hpp>
#include <boost/beast/websocket/detail/frame.hpp> #include <boost/beast/websocket/detail/frame.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
#include <boost/config.hpp> #include <boost/config.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
@@ -139,12 +136,11 @@ do_context_takeover_write(role_type role)
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
template<class Buffers, class Handler> template<class Buffers, class Handler>
class stream<NextLayer, deflateSupported>::write_some_op class stream<NextLayer, deflateSupported>::write_some_op
: public net::coroutine : public beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<stream>>
, public net::coroutine
{ {
Handler h_; stream& ws_;
stream<NextLayer, deflateSupported>& ws_;
net::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
buffers_suffix<Buffers> cb_; buffers_suffix<Buffers> cb_;
detail::frame_header fh_; detail::frame_header fh_;
detail::prepared_key key_; detail::prepared_key key_;
@@ -159,69 +155,25 @@ class stream<NextLayer, deflateSupported>::write_some_op
public: public:
static constexpr int id = 2; // for soft_mutex static constexpr int id = 2; // for soft_mutex
write_some_op(write_some_op&&) = default; template<class Handler_>
write_some_op(write_some_op const&) = delete;
template<class DeducedHandler>
write_some_op( write_some_op(
DeducedHandler&& h, Handler_&& h,
stream<NextLayer, deflateSupported>& ws, stream<NextLayer, deflateSupported>& ws,
bool fin, bool fin,
Buffers const& bs) Buffers const& bs)
: h_(std::forward<DeducedHandler>(h)) : beast::detail::async_op_base<
Handler, beast::detail::get_executor_type<stream>>(
ws.get_executor(), std::forward<Handler_>(h))
, ws_(ws) , ws_(ws)
, wg_(ws_.get_executor())
, cb_(bs) , cb_(bs)
, fin_(fin) , fin_(fin)
{ {
} }
using allocator_type =
net::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (net::get_associated_allocator)(h_);
}
using executor_type = net::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (net::get_associated_executor)(
h_, ws_.get_executor());
}
Handler&
handler()
{
return h_;
}
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0, std::size_t bytes_transferred = 0,
bool cont = true); bool cont = true);
friend
bool asio_handler_is_continuation(write_some_op* op)
{
using net::asio_handler_is_continuation;
return op->cont_ || asio_handler_is_continuation(
std::addressof(op->h_));
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, write_some_op* op)
{
using net::asio_handler_invoke;
asio_handler_invoke(
f, std::addressof(op->h_));
}
}; };
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
@@ -582,7 +534,7 @@ operator()(
std::move(*this), std::move(*this),
ec, bytes_transferred_)); ec, bytes_transferred_));
} }
h_(ec, bytes_transferred_); this->invoke(ec, bytes_transferred_);
} }
} }

View File

@@ -125,6 +125,7 @@ namespace websocket {
@param ec Set to the error if any occurred. @param ec Set to the error if any occurred.
*/ */
BOOST_BEAST_DECL
void void
teardown( teardown(
role_type role, role_type role,