Refactor websocket::stream operations

This commit is contained in:
Vinnie Falco
2019-01-20 12:25:30 -08:00
parent 6af6a8da99
commit dfd08bf6ae
25 changed files with 1103 additions and 914 deletions

View File

@@ -1,3 +1,9 @@
Version 216:
* Refactor websocket::stream operations
--------------------------------------------------------------------------------
Version 215: Version 215:
* basic_stream uses boost::shared_ptr * basic_stream uses boost::shared_ptr

View File

@@ -335,7 +335,7 @@ read_some(MutableBufferSequence const& buffers,
// A request to read 0 bytes from a stream is a no-op. // A request to read 0 bytes from a stream is a no-op.
if(buffer_size(buffers) == 0) if(buffer_size(buffers) == 0)
{ {
ec.clear(); ec = {};
return 0; return 0;
} }
@@ -481,7 +481,7 @@ write_some(
// A request to write 0 bytes to a stream is a no-op. // A request to write 0 bytes to a stream is a no-op.
if(buffer_size(buffers) == 0) if(buffer_size(buffers) == 0)
{ {
ec.clear(); ec = {};
return 0; return 0;
} }
@@ -591,7 +591,7 @@ teardown(
s.in_->fc->fail(ec)) s.in_->fc->fail(ec))
ec = net::error::eof; ec = net::error::eof;
else else
ec.clear(); ec = {};
} }
template<class TeardownHandler> template<class TeardownHandler>
@@ -613,7 +613,7 @@ async_teardown(
s.in_->fc->fail(ec)) s.in_->fc->fail(ec))
ec = net::error::eof; ec = net::error::eof;
else else
ec.clear(); ec = {};
net::post( net::post(
s.get_executor(), s.get_executor(),

View File

@@ -210,7 +210,7 @@ public:
this parameter is optional and may be omitted. this parameter is optional and may be omitted.
*/ */
#if BOOST_BEAST_DOXYGEN #if BOOST_BEAST_DOXYGEN
template<class Handler> template<class Handler_>
async_op_base( async_op_base(
Handler&& handler, Handler&& handler,
Executor1 const& ex1, Executor1 const& ex1,

View File

@@ -7,8 +7,8 @@
// Official repository: https://github.com/boostorg/beast // Official repository: https://github.com/boostorg/beast
// //
#ifndef BOOST_BEAST_BIND_CONTINUATION_HPP #ifndef BOOST_BEAST_DETAIL_BIND_CONTINUATION_HPP
#define BOOST_BEAST_BIND_CONTINUATION_HPP #define BOOST_BEAST_DETAIL_BIND_CONTINUATION_HPP
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#include <boost/beast/core/detail/remap_post_to_defer.hpp> #include <boost/beast/core/detail/remap_post_to_defer.hpp>
@@ -19,6 +19,7 @@
namespace boost { namespace boost {
namespace beast { namespace beast {
namespace detail {
/** Mark a completion handler as a continuation. /** Mark a completion handler as a continuation.
@@ -29,6 +30,43 @@ namespace beast {
function represents a continuation of the current asynchronous function represents a continuation of the current asynchronous
flow of control. flow of control.
@param handler The handler to wrap.
@see
@li <a href="http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4242.html">[N4242] Executors and Asynchronous Operations, Revision 1</a>
*/
template<class CompletionHandler>
#if BOOST_BEAST_DOXYGEN
__implementation_defined__
#else
net::executor_binder<
typename std::decay<CompletionHandler>::type,
detail::remap_post_to_defer<
net::associated_executor_t<CompletionHandler>>>
#endif
bind_continuation(CompletionHandler&& handler)
{
return net::bind_executor(
detail::remap_post_to_defer<
net::associated_executor_t<CompletionHandler>>(
net::get_associated_executor(handler)),
std::forward<CompletionHandler>(handler));
}
/** Mark a completion handler as a continuation.
This function wraps a completion handler to associate it with an
executor whose `post` operation is remapped to the `defer` operation.
It is used by composed asynchronous operation implementations to
indicate that a completion handler submitted to an initiating
function represents a continuation of the current asynchronous
flow of control.
@param ex The executor to use
@param handler The handler to wrap
@see @see
@li <a href="http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4242.html">[N4242] Executors and Asynchronous Operations, Revision 1</a> @li <a href="http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4242.html">[N4242] Executors and Asynchronous Operations, Revision 1</a>
@@ -49,6 +87,7 @@ bind_continuation(
std::forward<CompletionHandler>(handler)); std::forward<CompletionHandler>(handler));
} }
} // detail
} // beast } // beast
} // boost } // boost

View File

@@ -494,7 +494,13 @@ struct impl_base<false>
struct stream_base struct stream_base
{ {
protected: protected:
bool secure_prng_ = true; enum class status
{
open,
closing,
closed,
failed
};
std::uint32_t std::uint32_t
create_mask() create_mask()
@@ -505,6 +511,7 @@ protected:
return key; return key;
} }
bool secure_prng_ = true;
}; };
} // detail } // detail

View File

@@ -547,7 +547,6 @@ async_accept(
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code)); AcceptHandler, void(error_code));
impl_->reset(); impl_->reset();
using net::asio_handler_is_continuation;
response_op< response_op<
BOOST_ASIO_HANDLER_TYPE( BOOST_ASIO_HANDLER_TYPE(
AcceptHandler, void(error_code))>{ AcceptHandler, void(error_code))>{
@@ -579,7 +578,6 @@ async_accept_ex(
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
AcceptHandler, void(error_code)); AcceptHandler, void(error_code));
impl_->reset(); impl_->reset();
using net::asio_handler_is_continuation;
response_op< response_op<
BOOST_ASIO_HANDLER_TYPE( BOOST_ASIO_HANDLER_TYPE(
AcceptHandler, void(error_code))>{ AcceptHandler, void(error_code))>{

View File

@@ -15,7 +15,7 @@
#include <boost/beast/core/async_op_base.hpp> #include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/flat_static_buffer.hpp> #include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/stream_traits.hpp> #include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/bind_continuation.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
@@ -39,25 +39,9 @@ class stream<NextLayer, deflateSupported>::close_op
Handler, beast::executor_type<stream>> Handler, beast::executor_type<stream>>
, public net::coroutine , public net::coroutine
{ {
struct state stream<NextLayer, deflateSupported>& ws_;
{ error_code ev_;
stream<NextLayer, deflateSupported>& ws; detail::frame_buffer& fb_;
detail::frame_buffer fb;
error_code ev;
bool cont = false;
state(
stream<NextLayer, deflateSupported>& ws_,
close_reason const& cr)
: ws(ws_)
{
// Serialize the close frame
ws.template write_close<
flat_static_buffer_base>(fb, cr);
}
};
state& d_;
public: public:
static constexpr int id = 4; // for soft_mutex static constexpr int id = 4; // for soft_mutex
@@ -70,9 +54,13 @@ public:
: stable_async_op_base< : stable_async_op_base<
Handler, beast::executor_type<stream>>( Handler, beast::executor_type<stream>>(
std::forward<Handler_>(h), ws.get_executor()) std::forward<Handler_>(h), ws.get_executor())
, d_(beast::allocate_stable<state>( , ws_(ws)
*this, ws, cr)) , fb_(beast::allocate_stable<detail::frame_buffer>(*this))
{ {
// Serialize the close frame
ws.template write_close<
flat_static_buffer_base>(fb_, cr);
(*this)({}, 0, false);
} }
void void
@@ -82,46 +70,36 @@ public:
bool cont = true) bool cont = true)
{ {
using beast::detail::clamp; using beast::detail::clamp;
d_.cont = cont; auto& impl = *ws_.impl_;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Attempt to acquire write block // Acquire the write lock
if(! d_.ws.impl_->wr_block.try_lock(this)) if(! impl.wr_block.try_lock(this))
{ {
// Suspend
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
d_.ws.impl_->paused_close.emplace(std::move(*this)); impl.paused_close.emplace(std::move(*this));
impl.wr_block.lock(this);
// Acquire the write block
d_.ws.impl_->wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(std::move(*this));
d_.ws.get_executor(), std::move(*this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this));
} }
if(impl.check_stop_now(ec))
// Make sure the stream is open
if(! d_.ws.impl_->check_open(ec))
goto upcall; goto upcall;
// Can't call close twice // Can't call close twice
BOOST_ASSERT(! d_.ws.impl_->wr_close); // TODO return a custom error code
BOOST_ASSERT(! impl.wr_close);
// Change status to closing
BOOST_ASSERT(d_.ws.impl_->status_ == status::open);
d_.ws.impl_->status_ = status::closing;
// Send close frame // Send close frame
d_.ws.impl_->wr_close = true; impl.wr_close = true;
impl.change_status(status::closing);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write(d_.ws.impl_->stream, net::async_write(impl.stream, fb_.data(),
d_.fb.data(), std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! d_.ws.impl_->check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
if(d_.ws.impl_->rd_close) if(impl.rd_close)
{ {
// This happens when the read_op gets a close frame // This happens when the read_op gets a close frame
// at the same time close_op is sending the close frame. // at the same time close_op is sending the close frame.
@@ -129,101 +107,94 @@ public:
goto teardown; goto teardown;
} }
// Maybe suspend // Acquire the read lock
if(! d_.ws.impl_->rd_block.try_lock(this)) if(! impl.rd_block.try_lock(this))
{ {
// Suspend
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
d_.ws.impl_->paused_r_close.emplace(std::move(*this)); impl.paused_r_close.emplace(std::move(*this));
impl.rd_block.lock(this);
// Acquire the read block
d_.ws.impl_->rd_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(std::move(*this));
d_.ws.get_executor(), std::move(*this)); BOOST_ASSERT(impl.rd_block.is_locked(this));
BOOST_ASSERT(d_.ws.impl_->rd_block.is_locked(this)); if(impl.check_stop_now(ec))
// Make sure the stream is open
BOOST_ASSERT(d_.ws.impl_->status_ != status::open);
BOOST_ASSERT(d_.ws.impl_->status_ != status::closed);
if( d_.ws.impl_->status_ == status::failed)
goto upcall; goto upcall;
BOOST_ASSERT(! impl.rd_close);
BOOST_ASSERT(! d_.ws.impl_->rd_close);
} }
// Drain // Read until a receiving a close frame
if(d_.ws.impl_->rd_remain > 0) // TODO There should be a timeout on this
if(impl.rd_remain > 0)
goto read_payload; goto read_payload;
for(;;) for(;;)
{ {
// Read frame header // Read frame header
while(! d_.ws.parse_fh( while(! ws_.parse_fh(
d_.ws.impl_->rd_fh, d_.ws.impl_->rd_buf, d_.ev)) impl.rd_fh, impl.rd_buf, ev_))
{ {
if(d_.ev) if(ev_)
goto teardown; goto teardown;
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
d_.ws.impl_->stream.async_read_some( impl.stream.async_read_some(
d_.ws.impl_->rd_buf.prepare(read_size(d_.ws.impl_->rd_buf, impl.rd_buf.prepare(read_size(
d_.ws.impl_->rd_buf.max_size())), impl.rd_buf, impl.rd_buf.max_size())),
std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! d_.ws.impl_->check_ok(ec)) impl.rd_buf.commit(bytes_transferred);
if(impl.check_stop_now(ec))
goto upcall; goto upcall;
d_.ws.impl_->rd_buf.commit(bytes_transferred);
} }
if(detail::is_control(d_.ws.impl_->rd_fh.op)) if(detail::is_control(impl.rd_fh.op))
{ {
// Process control frame // Discard ping or pong frame
if(d_.ws.impl_->rd_fh.op == detail::opcode::close) if(impl.rd_fh.op != detail::opcode::close)
{ {
BOOST_ASSERT(! d_.ws.impl_->rd_close); impl.rd_buf.consume(clamp(impl.rd_fh.len));
d_.ws.impl_->rd_close = true; continue;
auto const mb = buffers_prefix( }
clamp(d_.ws.impl_->rd_fh.len),
d_.ws.impl_->rd_buf.data()); // Process close frame
if(d_.ws.impl_->rd_fh.len > 0 && d_.ws.impl_->rd_fh.mask) // TODO Should we invoke the control callback?
detail::mask_inplace(mb, d_.ws.impl_->rd_key); BOOST_ASSERT(! impl.rd_close);
detail::read_close(d_.ws.impl_->cr, mb, d_.ev); impl.rd_close = true;
if(d_.ev) auto const mb = buffers_prefix(
goto teardown; clamp(impl.rd_fh.len),
d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_fh.len)); impl.rd_buf.data());
if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
detail::mask_inplace(mb, impl.rd_key);
detail::read_close(impl.cr, mb, ev_);
if(ev_)
goto teardown; goto teardown;
} impl.rd_buf.consume(clamp(impl.rd_fh.len));
d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_fh.len)); goto teardown;
} }
else
read_payload:
// Discard message frame
while(impl.rd_buf.size() < impl.rd_remain)
{ {
read_payload: impl.rd_remain -= impl.rd_buf.size();
while(d_.ws.impl_->rd_buf.size() < d_.ws.impl_->rd_remain) impl.rd_buf.consume(impl.rd_buf.size());
{ BOOST_ASIO_CORO_YIELD
d_.ws.impl_->rd_remain -= d_.ws.impl_->rd_buf.size(); impl.stream.async_read_some(
d_.ws.impl_->rd_buf.consume(d_.ws.impl_->rd_buf.size()); impl.rd_buf.prepare(read_size(
BOOST_ASIO_CORO_YIELD impl.rd_buf, impl.rd_buf.max_size())),
d_.ws.impl_->stream.async_read_some( beast::detail::bind_continuation(std::move(*this)));
d_.ws.impl_->rd_buf.prepare(read_size(d_.ws.impl_->rd_buf, impl.rd_buf.commit(bytes_transferred);
d_.ws.impl_->rd_buf.max_size())), if(impl.check_stop_now(ec))
std::move(*this)); goto upcall;
if(! d_.ws.impl_->check_ok(ec))
goto upcall;
d_.ws.impl_->rd_buf.commit(bytes_transferred);
}
BOOST_ASSERT(d_.ws.impl_->rd_buf.size() >= d_.ws.impl_->rd_remain);
d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_remain));
d_.ws.impl_->rd_remain = 0;
} }
BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
impl.rd_buf.consume(clamp(impl.rd_remain));
impl.rd_remain = 0;
} }
teardown: teardown:
// Teardown // Teardown
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
using beast::websocket::async_teardown; using beast::websocket::async_teardown;
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
async_teardown(d_.ws.impl_->role, async_teardown(impl.role, impl.stream,
d_.ws.impl_->stream, std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
if(ec == net::error::eof) if(ec == net::error::eof)
{ {
// Rationale: // Rationale:
@@ -231,22 +202,21 @@ public:
ec = {}; ec = {};
} }
if(! ec) if(! ec)
ec = d_.ev; ec = ev_;
if(ec) if(ec)
d_.ws.impl_->status_ = status::failed; impl.change_status(status::failed);
else else
d_.ws.impl_->status_ = status::closed; impl.change_status(status::closed);
d_.ws.impl_->close(); impl.close();
upcall: upcall:
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); impl.wr_block.unlock(this);
d_.ws.impl_->wr_block.unlock(this); impl.rd_block.try_unlock(this)
if(d_.ws.impl_->rd_block.try_unlock(this)) && impl.paused_r_rd.maybe_invoke();
d_.ws.impl_->paused_r_rd.maybe_invoke(); impl.paused_rd.maybe_invoke()
d_.ws.impl_->paused_rd.maybe_invoke() || || impl.paused_ping.maybe_invoke()
d_.ws.impl_->paused_ping.maybe_invoke() || || impl.paused_wr.maybe_invoke();
d_.ws.impl_->paused_wr.maybe_invoke(); this->invoke(cont, ec);
this->invoke(d_.cont, ec);
} }
} }
}; };
@@ -274,87 +244,97 @@ close(close_reason const& cr, error_code& ec)
static_assert(is_sync_stream<next_layer_type>::value, static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met"); "SyncStream requirements not met");
using beast::detail::clamp; using beast::detail::clamp;
auto& impl = *impl_;
ec = {}; ec = {};
// Make sure the stream is open if(impl.check_stop_now(ec))
if(! impl_->check_open(ec))
return; return;
// If rd_close_ is set then we already sent a close BOOST_ASSERT(! impl.rd_close);
BOOST_ASSERT(! impl_->rd_close);
BOOST_ASSERT(! impl_->wr_close); // Can't call close twice
impl_->wr_close = true; // TODO return a custom error code
BOOST_ASSERT(! impl.wr_close);
// Send close frame
{ {
impl.wr_close = true;
impl.change_status(status::closing);
detail::frame_buffer fb; detail::frame_buffer fb;
write_close<flat_static_buffer_base>(fb, cr); write_close<flat_static_buffer_base>(fb, cr);
net::write(impl_->stream, fb.data(), ec); net::write(impl.stream, fb.data(), ec);
if(impl.check_stop_now(ec))
return;
} }
if(! impl_->check_ok(ec))
return; // Read until a receiving a close frame
impl_->status_ = status::closing; error_code ev;
error_code result; if(impl.rd_remain > 0)
// Drain the connection
if(impl_->rd_remain > 0)
goto read_payload; goto read_payload;
for(;;) for(;;)
{ {
// Read frame header // Read frame header
while(! parse_fh( while(! parse_fh(
impl_->rd_fh, impl_->rd_buf, result)) impl.rd_fh, impl.rd_buf, ev))
{ {
if(result) if(ev)
return do_fail( {
close_code::none, result, ec); // Protocol violation
auto const bytes_transferred = return do_fail(close_code::none, ev, ec);
impl_->stream.read_some( }
impl_->rd_buf.prepare(read_size(impl_->rd_buf, impl.rd_buf.commit(impl.stream.read_some(
impl_->rd_buf.max_size())), ec); impl.rd_buf.prepare(read_size(
if(! impl_->check_ok(ec)) impl.rd_buf, impl.rd_buf.max_size())), ec));
if(impl.check_stop_now(ec))
return; return;
impl_->rd_buf.commit(bytes_transferred);
} }
if(detail::is_control(impl_->rd_fh.op))
if(detail::is_control(impl.rd_fh.op))
{ {
// Process control frame // Discard ping/pong frame
if(impl_->rd_fh.op == detail::opcode::close) if(impl.rd_fh.op != detail::opcode::close)
{ {
BOOST_ASSERT(! impl_->rd_close); impl.rd_buf.consume(clamp(impl.rd_fh.len));
impl_->rd_close = true; continue;
auto const mb = buffers_prefix(
clamp(impl_->rd_fh.len),
impl_->rd_buf.data());
if(impl_->rd_fh.len > 0 && impl_->rd_fh.mask)
detail::mask_inplace(mb, impl_->rd_key);
detail::read_close(impl_->cr, mb, result);
if(result)
{
// Protocol violation
return do_fail(
close_code::none, result, ec);
}
impl_->rd_buf.consume(clamp(impl_->rd_fh.len));
break;
} }
impl_->rd_buf.consume(clamp(impl_->rd_fh.len));
// Handle close frame
// TODO Should we invoke the control callback?
BOOST_ASSERT(! impl.rd_close);
impl.rd_close = true;
auto const mb = buffers_prefix(
clamp(impl.rd_fh.len),
impl.rd_buf.data());
if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
detail::mask_inplace(mb, impl.rd_key);
detail::read_close(impl.cr, mb, ev);
if(ev)
{
// Protocol violation
return do_fail(close_code::none, ev, ec);
}
impl.rd_buf.consume(clamp(impl.rd_fh.len));
break;
} }
else
read_payload:
// Discard message frame
while(impl.rd_buf.size() < impl.rd_remain)
{ {
read_payload: impl.rd_remain -= impl.rd_buf.size();
while(impl_->rd_buf.size() < impl_->rd_remain) impl.rd_buf.consume(impl.rd_buf.size());
{ impl.rd_buf.commit(
impl_->rd_remain -= impl_->rd_buf.size(); impl.stream.read_some(
impl_->rd_buf.consume(impl_->rd_buf.size()); impl.rd_buf.prepare(
auto const bytes_transferred = read_size(
impl_->stream.read_some( impl.rd_buf,
impl_->rd_buf.prepare(read_size(impl_->rd_buf, impl.rd_buf.max_size())),
impl_->rd_buf.max_size())), ec); ec));
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return; return;
impl_->rd_buf.commit(bytes_transferred);
}
BOOST_ASSERT(
impl_->rd_buf.size() >= impl_->rd_remain);
impl_->rd_buf.consume(clamp(impl_->rd_remain));
impl_->rd_remain = 0;
} }
BOOST_ASSERT(
impl.rd_buf.size() >= impl.rd_remain);
impl.rd_buf.consume(clamp(impl.rd_remain));
impl.rd_remain = 0;
} }
// _Close the WebSocket Connection_ // _Close the WebSocket Connection_
do_fail(close_code::none, error::closed, ec); do_fail(close_code::none, error::closed, ec);
@@ -374,9 +354,8 @@ async_close(close_reason const& cr, CloseHandler&& handler)
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
CloseHandler, void(error_code)); CloseHandler, void(error_code));
close_op<BOOST_ASIO_HANDLER_TYPE( close_op<BOOST_ASIO_HANDLER_TYPE(
CloseHandler, void(error_code))>{ CloseHandler, void(error_code))>(
std::move(init.completion_handler), *this, cr}( std::move(init.completion_handler), *this, cr);
{}, 0, false);
return init.result.get(); return init.result.get();
} }

View File

@@ -39,43 +39,35 @@ class stream<NextLayer, deflateSupported>::handshake_op
{ {
struct data struct data
{ {
stream<NextLayer, deflateSupported>& ws; // VFALCO This really should be two separate
response_type* res_p; // composed operations, to save on memory
detail::sec_ws_key_type key;
http::request<http::empty_body> req; http::request<http::empty_body> req;
response_type res; response_type res;
template<class Decorator>
data(
stream& ws_,
response_type* res_p_,
string_view host,
string_view target,
Decorator const& decorator)
: ws(ws_)
, res_p(res_p_)
, req(ws.build_request(key,
host, target, decorator))
{
ws.impl_->reset();
}
}; };
stream<NextLayer, deflateSupported>& ws_;
detail::sec_ws_key_type key_;
response_type* res_p_;
data& d_; data& d_;
public: public:
template< template<class Handler_, class Decorator>
class Handler_,
class... Args>
handshake_op( handshake_op(
Handler_&& h, Handler_&& h,
stream& ws, Args&&... args) stream& ws,
response_type* res_p,
string_view host, string_view target,
Decorator const& decorator)
: stable_async_op_base<Handler, : stable_async_op_base<Handler,
beast::executor_type<stream>>( beast::executor_type<stream>>(
std::forward<Handler_>(h), ws.get_executor()) std::forward<Handler_>(h), ws.get_executor())
, d_(beast::allocate_stable<data>( , ws_(ws)
*this, ws, std::forward<Args>(args)...)) , res_p_(res_p)
, d_(beast::allocate_stable<data>(*this))
{ {
d_.req = ws_.build_request(
key_, host, target, decorator);
ws_.impl_->reset(); // VFALCO I don't like this
} }
void void
@@ -84,31 +76,26 @@ public:
std::size_t bytes_used = 0) std::size_t bytes_used = 0)
{ {
boost::ignore_unused(bytes_used); boost::ignore_unused(bytes_used);
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Send HTTP Upgrade // Send HTTP Upgrade
d_.ws.impl_->do_pmd_config(d_.req); ws_.impl_->do_pmd_config(d_.req);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
http::async_write(d_.ws.impl_->stream, http::async_write(ws_.impl_->stream,
d_.req, std::move(*this)); d_.req, std::move(*this));
if(ec) if(ec)
goto upcall; goto upcall;
// VFALCO We could pre-serialize the request to
// a single buffer, send that instead,
// and delete the buffer here.
// Read HTTP response // Read HTTP response
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
http::async_read(d_.ws.next_layer(), http::async_read(ws_.next_layer(),
d_.ws.impl_->rd_buf, d_.res, ws_.impl_->rd_buf, d_.res,
std::move(*this)); std::move(*this));
if(ec) if(ec)
goto upcall; goto upcall;
d_.ws.on_response(d_.res, d_.key, ec); ws_.on_response(d_.res, key_, ec);
if(d_.res_p) if(res_p_)
swap(d_.res, *d_.res_p); swap(d_.res, *res_p_);
upcall: upcall:
this->invoke_now(ec); this->invoke_now(ec);
} }

View File

@@ -13,6 +13,7 @@
#include <boost/beast/core/async_op_base.hpp> #include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/stream_traits.hpp> #include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/core/detail/bind_continuation.hpp>
#include <boost/beast/websocket/detail/frame.hpp> #include <boost/beast/websocket/detail/frame.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
@@ -35,25 +36,8 @@ class stream<NextLayer, deflateSupported>::ping_op
Handler, beast::executor_type<stream>> Handler, beast::executor_type<stream>>
, public net::coroutine , public net::coroutine
{ {
struct state stream<NextLayer, deflateSupported>& ws_;
{ detail::frame_buffer& fb_;
stream<NextLayer, deflateSupported>& ws;
detail::frame_buffer fb;
state(
stream<NextLayer, deflateSupported>& ws_,
detail::opcode op,
ping_data const& payload)
: ws(ws_)
{
// Serialize the control frame
ws.template write_ping<
flat_static_buffer_base>(
fb, op, payload);
}
};
state& d_;
public: public:
static constexpr int id = 3; // for soft_mutex static constexpr int id = 3; // for soft_mutex
@@ -67,65 +51,51 @@ public:
: stable_async_op_base< : stable_async_op_base<
Handler, beast::executor_type<stream>>( Handler, beast::executor_type<stream>>(
std::forward<Handler_>(h), ws.get_executor()) std::forward<Handler_>(h), ws.get_executor())
, d_(beast::allocate_stable<state>( , ws_(ws)
*this, ws, op, payload)) , fb_(beast::allocate_stable<
detail::frame_buffer>(*this))
{ {
// Serialize the ping or pong frame
ws.template write_ping<
flat_static_buffer_base>(fb_, op, payload);
(*this)({}, 0, false);
} }
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0) std::size_t bytes_transferred = 0,
bool cont = true)
{ {
boost::ignore_unused(bytes_transferred); boost::ignore_unused(bytes_transferred);
auto& impl = *ws_.impl_;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Maybe suspend // Acquire the write lock
if(d_.ws.impl_->wr_block.try_lock(this)) if(! impl.wr_block.try_lock(this))
{ {
// Make sure the stream is open
if(! d_.ws.impl_->check_open(ec))
{
BOOST_ASIO_CORO_YIELD
net::post(
d_.ws.get_executor(),
beast::bind_front_handler(std::move(*this), ec));
goto upcall;
}
}
else
{
// Suspend
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
d_.ws.impl_->paused_ping.emplace(std::move(*this)); impl.paused_ping.emplace(std::move(*this));
impl.wr_block.lock(this);
// Acquire the write block
d_.ws.impl_->wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(std::move(*this));
d_.ws.get_executor(), std::move(*this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this));
// Make sure the stream is open
if(! d_.ws.impl_->check_open(ec))
goto upcall;
} }
if(impl.check_stop_now(ec))
goto upcall;
// Send ping frame // Send ping frame
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write(d_.ws.impl_->stream, net::async_write(impl.stream, fb_.data(),
d_.fb.data(), std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! d_.ws.impl_->check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
upcall: upcall:
d_.ws.impl_->wr_block.unlock(this); impl.wr_block.unlock(this);
d_.ws.impl_->paused_close.maybe_invoke() || impl.paused_close.maybe_invoke()
d_.ws.impl_->paused_rd.maybe_invoke() || || impl.paused_rd.maybe_invoke()
d_.ws.impl_->paused_wr.maybe_invoke(); || impl.paused_wr.maybe_invoke();
this->invoke_now(ec); this->invoke(cont, ec);
} }
} }
}; };
@@ -148,14 +118,13 @@ void
stream<NextLayer, deflateSupported>:: stream<NextLayer, deflateSupported>::
ping(ping_data const& payload, error_code& ec) ping(ping_data const& payload, error_code& ec)
{ {
// Make sure the stream is open if(impl_->check_stop_now(ec))
if(! impl_->check_open(ec))
return; return;
detail::frame_buffer fb; detail::frame_buffer fb;
write_ping<flat_static_buffer_base>( write_ping<flat_static_buffer_base>(
fb, detail::opcode::ping, payload); fb, detail::opcode::ping, payload);
net::write(impl_->stream, fb.data(), ec); net::write(impl_->stream, fb.data(), ec);
if(! impl_->check_ok(ec)) if(impl_->check_stop_now(ec))
return; return;
} }
@@ -175,14 +144,13 @@ void
stream<NextLayer, deflateSupported>:: stream<NextLayer, deflateSupported>::
pong(ping_data const& payload, error_code& ec) pong(ping_data const& payload, error_code& ec)
{ {
// Make sure the stream is open if(impl_->check_stop_now(ec))
if(! impl_->check_open(ec))
return; return;
detail::frame_buffer fb; detail::frame_buffer fb;
write_ping<flat_static_buffer_base>( write_ping<flat_static_buffer_base>(
fb, detail::opcode::pong, payload); fb, detail::opcode::pong, payload);
net::write(impl_->stream, fb.data(), ec); net::write(impl_->stream, fb.data(), ec);
if(! impl_->check_ok(ec)) if(impl_->check_stop_now(ec))
return; return;
} }
@@ -198,9 +166,9 @@ async_ping(ping_data const& payload, WriteHandler&& handler)
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
WriteHandler, void(error_code)); WriteHandler, void(error_code));
ping_op<BOOST_ASIO_HANDLER_TYPE( ping_op<BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code))>{ WriteHandler, void(error_code))>(
std::move(init.completion_handler), *this, std::move(init.completion_handler), *this,
detail::opcode::ping, payload}(); detail::opcode::ping, payload);
return init.result.get(); return init.result.get();
} }
@@ -216,9 +184,9 @@ async_pong(ping_data const& payload, WriteHandler&& handler)
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
WriteHandler, void(error_code)); WriteHandler, void(error_code));
ping_op<BOOST_ASIO_HANDLER_TYPE( ping_op<BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code))>{ WriteHandler, void(error_code))>(
std::move(init.completion_handler), *this, std::move(init.completion_handler), *this,
detail::opcode::pong, payload}(); detail::opcode::pong, payload);
return init.result.get(); return init.result.get();
} }

View File

@@ -19,6 +19,7 @@
#include <boost/beast/core/buffers_suffix.hpp> #include <boost/beast/core/buffers_suffix.hpp>
#include <boost/beast/core/flat_static_buffer.hpp> #include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/stream_traits.hpp> #include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/core/detail/bind_continuation.hpp>
#include <boost/beast/core/detail/buffer.hpp> #include <boost/beast/core/detail/buffer.hpp>
#include <boost/beast/core/detail/clamp.hpp> #include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
@@ -36,7 +37,7 @@ namespace boost {
namespace beast { namespace beast {
namespace websocket { namespace websocket {
/* Read some message frame data. /* Read some message data into a buffer sequence.
Also reads and handles control frames. Also reads and handles control frames.
*/ */
@@ -56,7 +57,6 @@ class stream<NextLayer, deflateSupported>::read_some_op
error_code result_; error_code result_;
close_code code_; close_code code_;
bool did_read_ = false; bool did_read_ = false;
bool cont_ = false;
public: public:
static constexpr int id = 1; // for soft_mutex static constexpr int id = 1; // for soft_mutex
@@ -74,6 +74,7 @@ public:
, cb_(bs) , cb_(bs)
, code_(close_code::none) , code_(close_code::none)
{ {
(*this)({}, 0, false);
} }
void operator()( void operator()(
@@ -83,12 +84,37 @@ public:
{ {
using beast::detail::clamp; using beast::detail::clamp;
auto& impl = *ws_.impl_; auto& impl = *ws_.impl_;
cont_ = cont;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Maybe suspend acquire_read_lock:
do_maybe_suspend: // Acquire the read lock
if(impl.rd_block.try_lock(this)) if(! impl.rd_block.try_lock(this))
{
do_suspend:
BOOST_ASIO_CORO_YIELD
impl.paused_r_rd.emplace(std::move(*this));
impl.rd_block.lock(this);
BOOST_ASIO_CORO_YIELD
net::post(std::move(*this));
BOOST_ASSERT(impl.rd_block.is_locked(this));
// VFALCO Is this check correct here?
BOOST_ASSERT(! ec && impl.check_stop_now(ec));
if(impl.check_stop_now(ec))
{
BOOST_ASSERT(ec == net::error::operation_aborted);
goto upcall;
}
// VFALCO Should never get here
// The only way to get read blocked is if
// a `close_op` wrote a close frame
BOOST_ASSERT(impl.wr_close);
BOOST_ASSERT(impl.status_ != status::open);
ec = net::error::operation_aborted;
goto upcall;
}
else
{ {
// Make sure the stream is not closed // Make sure the stream is not closed
if( impl.status_ == status::closed || if( impl.status_ == status::closed ||
@@ -98,29 +124,6 @@ public:
goto upcall; goto upcall;
} }
} }
else
{
do_suspend:
// Suspend
BOOST_ASIO_CORO_YIELD
impl.paused_r_rd.emplace(std::move(*this));
// Acquire the read block
impl.rd_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
net::post(
ws_.get_executor(), std::move(*this));
BOOST_ASSERT(impl.rd_block.is_locked(this));
// The only way to get read blocked is if
// a `close_op` wrote a close frame
BOOST_ASSERT(impl.wr_close);
BOOST_ASSERT(impl.status_ != status::open);
ec = net::error::operation_aborted;
goto upcall;
}
// if status_ == status::closing, we want to suspend // if status_ == status::closing, we want to suspend
// the read operation until the close completes, // the read operation until the close completes,
@@ -155,7 +158,7 @@ public:
impl.rd_buf, impl.rd_buf.max_size())), impl.rd_buf, impl.rd_buf.max_size())),
std::move(*this)); std::move(*this));
BOOST_ASSERT(impl.rd_block.is_locked(this)); BOOST_ASSERT(impl.rd_block.is_locked(this));
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
impl.rd_buf.commit(bytes_transferred); impl.rd_buf.commit(bytes_transferred);
@@ -189,13 +192,12 @@ public:
{ {
if(impl.ctrl_cb) if(impl.ctrl_cb)
{ {
if(! cont_) if(! cont)
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(std::move(*this));
ws_.get_executor(), BOOST_ASSERT(cont);
std::move(*this)); // VFALCO call check_stop_now() here?
BOOST_ASSERT(cont_);
} }
} }
{ {
@@ -224,54 +226,48 @@ public:
impl.rd_block.unlock(this); impl.rd_block.unlock(this);
impl.paused_r_close.maybe_invoke(); impl.paused_r_close.maybe_invoke();
// Maybe suspend // Acquire the write lock
if(! impl.wr_block.try_lock(this)) if(! impl.wr_block.try_lock(this))
{ {
// Suspend
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
impl.paused_rd.emplace(std::move(*this)); impl.paused_rd.emplace(std::move(*this));
// Acquire the write block
impl.wr_block.lock(this); impl.wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(std::move(*this));
ws_.get_executor(), std::move(*this));
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
if(impl.check_stop_now(ec))
// Make sure the stream is open
if(! impl.check_open(ec))
goto upcall; goto upcall;
} }
// Send pong // Send pong
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, net::async_write(
impl.rd_fb.data(), std::move(*this)); impl.stream, impl.rd_fb.data(),
beast::detail::bind_continuation(std::move(*this)));
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
impl.wr_block.unlock(this); impl.wr_block.unlock(this);
impl.paused_close.maybe_invoke() || impl.paused_close.maybe_invoke() ||
impl.paused_ping.maybe_invoke() || impl.paused_ping.maybe_invoke() ||
impl.paused_wr.maybe_invoke(); impl.paused_wr.maybe_invoke();
goto do_maybe_suspend; goto acquire_read_lock;
} }
// Handle pong frame // Handle pong frame
if(impl.rd_fh.op == detail::opcode::pong) if(impl.rd_fh.op == detail::opcode::pong)
{ {
// Ignore pong when closing // Ignore pong when closing
if(! impl.wr_close && impl.ctrl_cb) if(! impl.wr_close && impl.ctrl_cb)
{ {
if(! cont_) if(! cont)
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(
ws_.get_executor(), ws_.get_executor(),
std::move(*this)); std::move(*this));
BOOST_ASSERT(cont_); BOOST_ASSERT(cont);
} }
} }
auto const cb = buffers_prefix(clamp( auto const cb = buffers_prefix(clamp(
@@ -286,18 +282,19 @@ public:
impl.ctrl_cb(frame_type::pong, payload); impl.ctrl_cb(frame_type::pong, payload);
goto loop; goto loop;
} }
// Handle close frame // Handle close frame
BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
{ {
if(impl.ctrl_cb) if(impl.ctrl_cb)
{ {
if(! cont_) if(! cont)
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(
ws_.get_executor(), ws_.get_executor(),
std::move(*this)); std::move(*this));
BOOST_ASSERT(cont_); BOOST_ASSERT(cont);
} }
} }
auto const cb = buffers_prefix(clamp( auto const cb = buffers_prefix(clamp(
@@ -358,7 +355,7 @@ public:
impl.rd_buf.prepare(read_size( impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())), impl.rd_buf, impl.rd_buf.max_size())),
std::move(*this)); std::move(*this));
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
impl.rd_buf.commit(bytes_transferred); impl.rd_buf.commit(bytes_transferred);
if(impl.rd_fh.mask) if(impl.rd_fh.mask)
@@ -401,7 +398,7 @@ public:
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
impl.stream.async_read_some(buffers_prefix( impl.stream.async_read_some(buffers_prefix(
clamp(impl.rd_remain), cb_), std::move(*this)); clamp(impl.rd_remain), cb_), std::move(*this));
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffers_prefix( auto const mb = buffers_prefix(
@@ -444,7 +441,7 @@ public:
impl.rd_buf.prepare(read_size( impl.rd_buf.prepare(read_size(
impl.rd_buf, impl.rd_buf.max_size())), impl.rd_buf, impl.rd_buf.max_size())),
std::move(*this)); std::move(*this));
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
impl.rd_buf.commit(bytes_transferred); impl.rd_buf.commit(bytes_transferred);
@@ -492,7 +489,7 @@ public:
if(zs.total_out > 0) if(zs.total_out > 0)
ec = error::partial_deflate_block; ec = error::partial_deflate_block;
} }
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
impl.do_context_takeover_read(impl.role); impl.do_context_takeover_read(impl.role);
impl.rd_done = true; impl.rd_done = true;
@@ -503,7 +500,7 @@ public:
break; break;
} }
impl.inflate(zs, zlib::Flush::sync, ec); impl.inflate(zs, zlib::Flush::sync, ec);
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
if(impl.rd_msg_max && beast::detail::sum_exceeds( if(impl.rd_msg_max && beast::detail::sum_exceeds(
impl.rd_size, zs.total_out, impl.rd_msg_max)) impl.rd_size, zs.total_out, impl.rd_msg_max))
@@ -536,24 +533,16 @@ public:
goto upcall; goto upcall;
close: close:
// Try to acquire the write block // Acquire the write lock
if(! impl.wr_block.try_lock(this)) if(! impl.wr_block.try_lock(this))
{ {
// Suspend
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
impl.paused_rd.emplace(std::move(*this)); impl.paused_rd.emplace(std::move(*this));
// Acquire the write block
impl.wr_block.lock(this); impl.wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(std::move(*this));
ws_.get_executor(), std::move(*this));
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
if(impl.check_stop_now(ec))
// Make sure the stream is open
if(! impl.check_open(ec))
goto upcall; goto upcall;
} }
@@ -573,11 +562,10 @@ public:
// Send close frame // Send close frame
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write( net::async_write(impl.stream, impl.rd_fb.data(),
impl.stream, impl.rd_fb.data(), beast::detail::bind_continuation(std::move(*this)));
std::move(*this));
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
if(! impl.check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
} }
@@ -585,8 +573,8 @@ public:
using beast::websocket::async_teardown; using beast::websocket::async_teardown;
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
async_teardown(impl.role, async_teardown(impl.role, impl.stream,
impl.stream, std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
if(ec == net::error::eof) if(ec == net::error::eof)
{ {
@@ -606,10 +594,10 @@ public:
impl.rd_block.try_unlock(this); impl.rd_block.try_unlock(this);
impl.paused_r_close.maybe_invoke(); impl.paused_r_close.maybe_invoke();
if(impl.wr_block.try_unlock(this)) if(impl.wr_block.try_unlock(this))
impl.paused_close.maybe_invoke() || impl.paused_close.maybe_invoke()
impl.paused_ping.maybe_invoke() || || impl.paused_ping.maybe_invoke()
impl.paused_wr.maybe_invoke(); || impl.paused_wr.maybe_invoke();
this->invoke(cont_, ec, bytes_written_); this->invoke(cont, ec, bytes_written_);
} }
} }
}; };
@@ -648,41 +636,37 @@ public:
std::numeric_limits<std::size_t>::max)()) std::numeric_limits<std::size_t>::max)())
, some_(some) , some_(some)
{ {
(*this)({}, 0, false);
} }
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0) std::size_t bytes_transferred = 0,
bool cont = true)
{ {
using beast::detail::clamp; using beast::detail::clamp;
boost::optional<typename
DynamicBuffer::mutable_buffers_type> mb;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
do do
{ {
BOOST_ASIO_CORO_YIELD mb = beast::detail::dynamic_buffer_prepare(b_,
{ clamp(ws_.read_size_hint(b_), limit_),
auto mb = beast::detail::dynamic_buffer_prepare(b_, ec, error::buffer_overflow);
clamp(ws_.read_size_hint(b_), limit_),
ec, error::buffer_overflow);
if(ec)
net::post(
ws_.get_executor(),
beast::bind_front_handler(
std::move(*this), ec, 0));
else
read_some_op<typename
DynamicBuffer::mutable_buffers_type,
read_op>(std::move(*this), ws_, *mb)(
{}, 0, false);
}
if(ec) if(ec)
goto upcall; goto upcall;
BOOST_ASIO_CORO_YIELD
ws_.async_read_some(*mb,
beast::detail::bind_continuation(std::move(*this)));
b_.commit(bytes_transferred); b_.commit(bytes_transferred);
bytes_written_ += bytes_transferred; bytes_written_ += bytes_transferred;
if(ec)
goto upcall;
} }
while(! some_ && ! ws_.is_message_done()); while(! some_ && ! ws_.is_message_done());
upcall: upcall:
this->invoke_now(ec, bytes_written_); this->invoke(cont, ec, bytes_written_);
} }
} }
}; };
@@ -743,15 +727,10 @@ async_read(DynamicBuffer& buffer, ReadHandler&& handler)
"DynamicBuffer requirements not met"); "DynamicBuffer requirements not met");
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
ReadHandler, void(error_code, std::size_t)); ReadHandler, void(error_code, std::size_t));
read_op< read_op<DynamicBuffer, BOOST_ASIO_HANDLER_TYPE(
DynamicBuffer, ReadHandler, void(error_code, std::size_t))>(
BOOST_ASIO_HANDLER_TYPE( std::move(init.completion_handler),
ReadHandler, void(error_code, std::size_t))>{ *this, buffer, 0, false);
std::move(init.completion_handler),
*this,
buffer,
0,
false}();
return init.result.get(); return init.result.get();
} }
@@ -824,15 +803,10 @@ async_read_some(
"DynamicBuffer requirements not met"); "DynamicBuffer requirements not met");
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
ReadHandler, void(error_code, std::size_t)); ReadHandler, void(error_code, std::size_t));
read_op< read_op<DynamicBuffer, BOOST_ASIO_HANDLER_TYPE(
DynamicBuffer, ReadHandler, void(error_code, std::size_t))>(
BOOST_ASIO_HANDLER_TYPE( std::move(init.completion_handler),
ReadHandler, void(error_code, std::size_t))>{ *this, buffer, limit, true);
std::move(init.completion_handler),
*this,
buffer,
limit,
true}({}, 0);
return init.result.get(); return init.result.get();
} }
@@ -871,23 +845,24 @@ read_some(
MutableBufferSequence>::value, MutableBufferSequence>::value,
"MutableBufferSequence requirements not met"); "MutableBufferSequence requirements not met");
using beast::detail::clamp; using beast::detail::clamp;
auto& impl = *impl_;
close_code code{}; close_code code{};
std::size_t bytes_written = 0; std::size_t bytes_written = 0;
ec = {}; ec = {};
// Make sure the stream is open // Make sure the stream is open
if(! impl_->check_open(ec)) if(impl.check_stop_now(ec))
return 0; return bytes_written;
loop: loop:
// See if we need to read a frame header. This // See if we need to read a frame header. This
// condition is structured to give the decompressor // condition is structured to give the decompressor
// a chance to emit the final empty deflate block // a chance to emit the final empty deflate block
// //
if(impl_->rd_remain == 0 && ( if(impl.rd_remain == 0 && (
! impl_->rd_fh.fin || impl_->rd_done)) ! impl.rd_fh.fin || impl.rd_done))
{ {
// Read frame header // Read frame header
error_code result; error_code result;
while(! parse_fh(impl_->rd_fh, impl_->rd_buf, result)) while(! parse_fh(impl.rd_fh, impl.rd_buf, result))
{ {
if(result) if(result)
{ {
@@ -900,68 +875,68 @@ loop:
return bytes_written; return bytes_written;
} }
auto const bytes_transferred = auto const bytes_transferred =
impl_->stream.read_some( impl.stream.read_some(
impl_->rd_buf.prepare(read_size( impl.rd_buf.prepare(read_size(
impl_->rd_buf, impl_->rd_buf.max_size())), impl.rd_buf, impl.rd_buf.max_size())),
ec); ec);
if(! impl_->check_ok(ec)) impl.rd_buf.commit(bytes_transferred);
if(impl.check_stop_now(ec))
return bytes_written; return bytes_written;
impl_->rd_buf.commit(bytes_transferred);
} }
// Immediately apply the mask to the portion // Immediately apply the mask to the portion
// of the buffer holding payload data. // of the buffer holding payload data.
if(impl_->rd_fh.len > 0 && impl_->rd_fh.mask) if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
detail::mask_inplace(buffers_prefix( detail::mask_inplace(buffers_prefix(
clamp(impl_->rd_fh.len), impl_->rd_buf.data()), clamp(impl.rd_fh.len), impl.rd_buf.data()),
impl_->rd_key); impl.rd_key);
if(detail::is_control(impl_->rd_fh.op)) if(detail::is_control(impl.rd_fh.op))
{ {
// Get control frame payload // Get control frame payload
auto const b = buffers_prefix( auto const b = buffers_prefix(
clamp(impl_->rd_fh.len), impl_->rd_buf.data()); clamp(impl.rd_fh.len), impl.rd_buf.data());
auto const len = buffer_size(b); auto const len = buffer_size(b);
BOOST_ASSERT(len == impl_->rd_fh.len); BOOST_ASSERT(len == impl.rd_fh.len);
// Clear this otherwise the next // Clear this otherwise the next
// frame will be considered final. // frame will be considered final.
impl_->rd_fh.fin = false; impl.rd_fh.fin = false;
// Handle ping frame // Handle ping frame
if(impl_->rd_fh.op == detail::opcode::ping) if(impl.rd_fh.op == detail::opcode::ping)
{ {
ping_data payload; ping_data payload;
detail::read_ping(payload, b); detail::read_ping(payload, b);
impl_->rd_buf.consume(len); impl.rd_buf.consume(len);
if(impl_->wr_close) if(impl.wr_close)
{ {
// Ignore ping when closing // Ignore ping when closing
goto loop; goto loop;
} }
if(impl_->ctrl_cb) if(impl.ctrl_cb)
impl_->ctrl_cb(frame_type::ping, payload); impl.ctrl_cb(frame_type::ping, payload);
detail::frame_buffer fb; detail::frame_buffer fb;
write_ping<flat_static_buffer_base>(fb, write_ping<flat_static_buffer_base>(fb,
detail::opcode::pong, payload); detail::opcode::pong, payload);
net::write(impl_->stream, fb.data(), ec); net::write(impl.stream, fb.data(), ec);
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_written; return bytes_written;
goto loop; goto loop;
} }
// Handle pong frame // Handle pong frame
if(impl_->rd_fh.op == detail::opcode::pong) if(impl.rd_fh.op == detail::opcode::pong)
{ {
ping_data payload; ping_data payload;
detail::read_ping(payload, b); detail::read_ping(payload, b);
impl_->rd_buf.consume(len); impl.rd_buf.consume(len);
if(impl_->ctrl_cb) if(impl.ctrl_cb)
impl_->ctrl_cb(frame_type::pong, payload); impl.ctrl_cb(frame_type::pong, payload);
goto loop; goto loop;
} }
// Handle close frame // Handle close frame
BOOST_ASSERT(impl_->rd_fh.op == detail::opcode::close); BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
{ {
BOOST_ASSERT(! impl_->rd_close); BOOST_ASSERT(! impl.rd_close);
impl_->rd_close = true; impl.rd_close = true;
close_reason cr; close_reason cr;
detail::read_close(cr, b, result); detail::read_close(cr, b, result);
if(result) if(result)
@@ -971,11 +946,11 @@ loop:
result, ec); result, ec);
return bytes_written; return bytes_written;
} }
impl_->cr = cr; impl.cr = cr;
impl_->rd_buf.consume(len); impl.rd_buf.consume(len);
if(impl_->ctrl_cb) if(impl.ctrl_cb)
impl_->ctrl_cb(frame_type::close, impl_->cr.reason); impl.ctrl_cb(frame_type::close, impl.cr.reason);
BOOST_ASSERT(! impl_->wr_close); BOOST_ASSERT(! impl.wr_close);
// _Start the WebSocket Closing Handshake_ // _Start the WebSocket Closing Handshake_
do_fail( do_fail(
cr.code == close_code::none ? cr.code == close_code::none ?
@@ -985,52 +960,52 @@ loop:
return bytes_written; return bytes_written;
} }
} }
if(impl_->rd_fh.len == 0 && ! impl_->rd_fh.fin) if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
{ {
// Empty non-final frame // Empty non-final frame
goto loop; goto loop;
} }
impl_->rd_done = false; impl.rd_done = false;
} }
else else
{ {
ec = {}; ec = {};
} }
if(! impl_->rd_deflated()) if(! impl.rd_deflated())
{ {
if(impl_->rd_remain > 0) if(impl.rd_remain > 0)
{ {
if(impl_->rd_buf.size() == 0 && impl_->rd_buf.max_size() > if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
(std::min)(clamp(impl_->rd_remain), (std::min)(clamp(impl.rd_remain),
buffer_size(buffers))) buffer_size(buffers)))
{ {
// Fill the read buffer first, otherwise we // Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O. // get fewer bytes at the cost of one I/O.
impl_->rd_buf.commit(impl_->stream.read_some( impl.rd_buf.commit(impl.stream.read_some(
impl_->rd_buf.prepare(read_size(impl_->rd_buf, impl.rd_buf.prepare(read_size(impl.rd_buf,
impl_->rd_buf.max_size())), ec)); impl.rd_buf.max_size())), ec));
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_written; return bytes_written;
if(impl_->rd_fh.mask) if(impl.rd_fh.mask)
detail::mask_inplace( detail::mask_inplace(
buffers_prefix(clamp(impl_->rd_remain), buffers_prefix(clamp(impl.rd_remain),
impl_->rd_buf.data()), impl_->rd_key); impl.rd_buf.data()), impl.rd_key);
} }
if(impl_->rd_buf.size() > 0) if(impl.rd_buf.size() > 0)
{ {
// Copy from the read buffer. // Copy from the read buffer.
// The mask was already applied. // The mask was already applied.
auto const bytes_transferred = net::buffer_copy( auto const bytes_transferred = net::buffer_copy(
buffers, impl_->rd_buf.data(), buffers, impl.rd_buf.data(),
clamp(impl_->rd_remain)); clamp(impl.rd_remain));
auto const mb = buffers_prefix( auto const mb = buffers_prefix(
bytes_transferred, buffers); bytes_transferred, buffers);
impl_->rd_remain -= bytes_transferred; impl.rd_remain -= bytes_transferred;
if(impl_->rd_op == detail::opcode::text) if(impl.rd_op == detail::opcode::text)
{ {
if(! impl_->rd_utf8.write(mb) || if(! impl.rd_utf8.write(mb) ||
(impl_->rd_remain == 0 && impl_->rd_fh.fin && (impl.rd_remain == 0 && impl.rd_fh.fin &&
! impl_->rd_utf8.finish())) ! impl.rd_utf8.finish()))
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
do_fail(close_code::bad_payload, do_fail(close_code::bad_payload,
@@ -1039,32 +1014,33 @@ loop:
} }
} }
bytes_written += bytes_transferred; bytes_written += bytes_transferred;
impl_->rd_size += bytes_transferred; impl.rd_size += bytes_transferred;
impl_->rd_buf.consume(bytes_transferred); impl.rd_buf.consume(bytes_transferred);
} }
else else
{ {
// Read into caller's buffer // Read into caller's buffer
BOOST_ASSERT(impl_->rd_remain > 0); BOOST_ASSERT(impl.rd_remain > 0);
BOOST_ASSERT(buffer_size(buffers) > 0); BOOST_ASSERT(buffer_size(buffers) > 0);
BOOST_ASSERT(buffer_size(buffers_prefix( BOOST_ASSERT(buffer_size(buffers_prefix(
clamp(impl_->rd_remain), buffers)) > 0); clamp(impl.rd_remain), buffers)) > 0);
auto const bytes_transferred = auto const bytes_transferred =
impl_->stream.read_some(buffers_prefix( impl.stream.read_some(buffers_prefix(
clamp(impl_->rd_remain), buffers), ec); clamp(impl.rd_remain), buffers), ec);
if(! impl_->check_ok(ec)) // VFALCO What if some bytes were written?
if(impl.check_stop_now(ec))
return bytes_written; return bytes_written;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffers_prefix( auto const mb = buffers_prefix(
bytes_transferred, buffers); bytes_transferred, buffers);
impl_->rd_remain -= bytes_transferred; impl.rd_remain -= bytes_transferred;
if(impl_->rd_fh.mask) if(impl.rd_fh.mask)
detail::mask_inplace(mb, impl_->rd_key); detail::mask_inplace(mb, impl.rd_key);
if(impl_->rd_op == detail::opcode::text) if(impl.rd_op == detail::opcode::text)
{ {
if(! impl_->rd_utf8.write(mb) || if(! impl.rd_utf8.write(mb) ||
(impl_->rd_remain == 0 && impl_->rd_fh.fin && (impl.rd_remain == 0 && impl.rd_fh.fin &&
! impl_->rd_utf8.finish())) ! impl.rd_utf8.finish()))
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
do_fail(close_code::bad_payload, do_fail(close_code::bad_payload,
@@ -1073,10 +1049,10 @@ loop:
} }
} }
bytes_written += bytes_transferred; bytes_written += bytes_transferred;
impl_->rd_size += bytes_transferred; impl.rd_size += bytes_transferred;
} }
} }
impl_->rd_done = impl_->rd_remain == 0 && impl_->rd_fh.fin; impl.rd_done = impl.rd_remain == 0 && impl.rd_fh.fin;
} }
else else
{ {
@@ -1095,14 +1071,14 @@ loop:
zs.avail_out = out.size(); zs.avail_out = out.size();
BOOST_ASSERT(zs.avail_out > 0); BOOST_ASSERT(zs.avail_out > 0);
} }
if(impl_->rd_remain > 0) if(impl.rd_remain > 0)
{ {
if(impl_->rd_buf.size() > 0) if(impl.rd_buf.size() > 0)
{ {
// use what's there // use what's there
auto const in = buffers_prefix( auto const in = buffers_prefix(
clamp(impl_->rd_remain), beast::buffers_front( clamp(impl.rd_remain), beast::buffers_front(
impl_->rd_buf.data())); impl.rd_buf.data()));
zs.avail_in = in.size(); zs.avail_in = in.size();
zs.next_in = in.data(); zs.next_in = in.data();
} }
@@ -1110,21 +1086,21 @@ loop:
{ {
// read new // read new
auto const bytes_transferred = auto const bytes_transferred =
impl_->stream.read_some( impl.stream.read_some(
impl_->rd_buf.prepare(read_size( impl.rd_buf.prepare(read_size(
impl_->rd_buf, impl_->rd_buf.max_size())), impl.rd_buf, impl.rd_buf.max_size())),
ec); ec);
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_written; return bytes_written;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
impl_->rd_buf.commit(bytes_transferred); impl.rd_buf.commit(bytes_transferred);
if(impl_->rd_fh.mask) if(impl.rd_fh.mask)
detail::mask_inplace( detail::mask_inplace(
buffers_prefix(clamp(impl_->rd_remain), buffers_prefix(clamp(impl.rd_remain),
impl_->rd_buf.data()), impl_->rd_key); impl.rd_buf.data()), impl.rd_key);
auto const in = buffers_prefix( auto const in = buffers_prefix(
clamp(impl_->rd_remain), buffers_front( clamp(impl.rd_remain), buffers_front(
impl_->rd_buf.data())); impl.rd_buf.data()));
zs.avail_in = in.size(); zs.avail_in = in.size();
zs.next_in = in.data(); zs.next_in = in.data();
did_read = true; did_read = true;
@@ -1134,7 +1110,7 @@ loop:
break; break;
} }
} }
else if(impl_->rd_fh.fin) else if(impl.rd_fh.fin)
{ {
// append the empty block codes // append the empty block codes
static std::uint8_t constexpr static std::uint8_t constexpr
@@ -1142,45 +1118,45 @@ loop:
0x00, 0x00, 0xff, 0xff }; 0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block; zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block); zs.avail_in = sizeof(empty_block);
impl_->inflate(zs, zlib::Flush::sync, ec); impl.inflate(zs, zlib::Flush::sync, ec);
if(! ec) if(! ec)
{ {
// https://github.com/madler/zlib/issues/280 // https://github.com/madler/zlib/issues/280
if(zs.total_out > 0) if(zs.total_out > 0)
ec = error::partial_deflate_block; ec = error::partial_deflate_block;
} }
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_written; return bytes_written;
impl_->do_context_takeover_read(impl_->role); impl.do_context_takeover_read(impl.role);
impl_->rd_done = true; impl.rd_done = true;
break; break;
} }
else else
{ {
break; break;
} }
impl_->inflate(zs, zlib::Flush::sync, ec); impl.inflate(zs, zlib::Flush::sync, ec);
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_written; return bytes_written;
if(impl_->rd_msg_max && beast::detail::sum_exceeds( if(impl.rd_msg_max && beast::detail::sum_exceeds(
impl_->rd_size, zs.total_out, impl_->rd_msg_max)) impl.rd_size, zs.total_out, impl.rd_msg_max))
{ {
do_fail(close_code::too_big, do_fail(close_code::too_big,
error::message_too_big, ec); error::message_too_big, ec);
return bytes_written; return bytes_written;
} }
cb.consume(zs.total_out); cb.consume(zs.total_out);
impl_->rd_size += zs.total_out; impl.rd_size += zs.total_out;
impl_->rd_remain -= zs.total_in; impl.rd_remain -= zs.total_in;
impl_->rd_buf.consume(zs.total_in); impl.rd_buf.consume(zs.total_in);
bytes_written += zs.total_out; bytes_written += zs.total_out;
} }
if(impl_->rd_op == detail::opcode::text) if(impl.rd_op == detail::opcode::text)
{ {
// check utf8 // check utf8
if(! impl_->rd_utf8.write(beast::buffers_prefix( if(! impl.rd_utf8.write(beast::buffers_prefix(
bytes_written, buffers)) || ( bytes_written, buffers)) || (
impl_->rd_done && ! impl_->rd_utf8.finish())) impl.rd_done && ! impl.rd_utf8.finish()))
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
do_fail(close_code::bad_payload, do_fail(close_code::bad_payload,
@@ -1209,9 +1185,8 @@ async_read_some(
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
ReadHandler, void(error_code, std::size_t)); ReadHandler, void(error_code, std::size_t));
read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE( read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{ ReadHandler, void(error_code, std::size_t))>(
std::move(init.completion_handler), *this, buffers}( std::move(init.completion_handler), *this, buffers);
{}, 0, false);
return init.result.get(); return init.result.get();
} }

View File

@@ -754,7 +754,7 @@ do_fail(
write_close< write_close<
flat_static_buffer_base>(fb, code); flat_static_buffer_base>(fb, code);
net::write(impl_->stream, fb.data(), ec); net::write(impl_->stream, fb.data(), ec);
if(! impl_->check_ok(ec)) if(impl_->check_stop_now(ec))
return; return;
} }
using beast::websocket::teardown; using beast::websocket::teardown;

View File

@@ -10,12 +10,16 @@
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP #ifndef BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP
#define BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP #define BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP
#include <boost/beast/core/static_buffer.hpp>
#include <boost/beast/core/saved_handler.hpp> #include <boost/beast/core/saved_handler.hpp>
#include <boost/beast/core/static_buffer.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/websocket/detail/frame.hpp> #include <boost/beast/websocket/detail/frame.hpp>
#include <boost/beast/websocket/detail/pmd_extension.hpp> #include <boost/beast/websocket/detail/pmd_extension.hpp>
#include <boost/beast/websocket/detail/soft_mutex.hpp> #include <boost/beast/websocket/detail/soft_mutex.hpp>
#include <boost/beast/websocket/detail/utf8_checker.hpp> #include <boost/beast/websocket/detail/utf8_checker.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/core/empty_value.hpp>
#include <boost/optional.hpp> #include <boost/optional.hpp>
namespace boost { namespace boost {
@@ -28,15 +32,8 @@ struct stream<NextLayer, deflateSupported>::impl_type
: std::enable_shared_from_this<impl_type> : std::enable_shared_from_this<impl_type>
, detail::impl_base<deflateSupported> , detail::impl_base<deflateSupported>
{ {
using time_point = typename
std::chrono::steady_clock::time_point;
static constexpr time_point never()
{
return (time_point::max)();
}
NextLayer stream; // The underlying stream NextLayer stream; // The underlying stream
net::steady_timer timer; // used for timeouts
close_reason cr; // set from received close frame close_reason cr; // set from received close frame
control_cb_type ctrl_cb; // control callback control_cb_type ctrl_cb; // control callback
@@ -78,9 +75,15 @@ struct stream<NextLayer, deflateSupported>::impl_type
saved_handler paused_r_rd; // paused read op (async read) saved_handler paused_r_rd; // paused read op (async read)
saved_handler paused_r_close; // paused close op (async read) saved_handler paused_r_close; // paused close op (async read)
boost::optional<bool> tm_auto_ping;
bool tm_opt /* true if auto-timeout option is set */ = false;
bool tm_idle; // set to false on incoming frames
time_point::duration tm_dur /* duration of timer */ = std::chrono::seconds(1);
template<class... Args> template<class... Args>
impl_type(Args&&... args) impl_type(Args&&... args)
: stream(std::forward<Args>(args)...) : stream(std::forward<Args>(args)...)
, timer(stream.get_executor().context())
{ {
} }
@@ -108,12 +111,15 @@ struct stream<NextLayer, deflateSupported>::impl_type
wr_cont = false; wr_cont = false;
wr_buf_size = 0; wr_buf_size = 0;
tm_idle = false;
this->open_pmd(role); this->open_pmd(role);
} }
void void
close() close()
{ {
timer.cancel();
wr_buf.reset(); wr_buf.reset();
this->close_pmd(); this->close_pmd();
} }
@@ -136,6 +142,10 @@ struct stream<NextLayer, deflateSupported>::impl_type
wr_block.reset(); wr_block.reset();
rd_block.reset(); rd_block.reset();
cr.code = close_code::none; cr.code = close_code::none;
tm_idle = false;
// VFALCO Is this needed?
timer.cancel();
} }
// Called before each write frame // Called before each write frame
@@ -163,28 +173,185 @@ struct stream<NextLayer, deflateSupported>::impl_type
} }
} }
private:
bool bool
check_open(error_code& ec) is_timer_set() const
{ {
if(status_ != status::open) return timer.expiry() == never();
}
// returns `true` if we try sending a ping and
// getting a pong before closing an idle stream.
bool
is_auto_ping_enabled() const
{
if(tm_auto_ping.has_value())
return *tm_auto_ping;
if(role == role_type::server)
return true;
return false;
}
template<class Executor>
class timeout_handler
: boost::empty_value<Executor>
{
std::weak_ptr<impl_type> wp_;
public:
timeout_handler(
Executor const& ex,
std::shared_ptr<impl_type> const& sp)
: boost::empty_value<Executor>(
boost::empty_init_t{}, ex)
, wp_(sp)
{ {
ec = net::error::operation_aborted;
return false;
} }
ec = {};
using executor_type = Executor;
executor_type
get_executor() const
{
return this->get();
}
void
operator()(error_code ec)
{
// timer canceled?
if(ec == net::error::operation_aborted)
return;
BOOST_ASSERT(! ec);
// stream destroyed?
auto sp = wp_.lock();
if(! sp)
return;
auto& impl = *sp;
close_socket(get_lowest_layer(impl.stream));
#if 0
if(! impl.tm_idle)
{
impl.tm_idle = true;
BOOST_VERIFY(
impl.timer.expires_after(impl.tm_dur) == 0);
return impl.timer.async_wait(std::move(*this));
}
if(impl.is_auto_ping_enabled())
{
// send ping
}
else
{
// timeout
}
#endif
}
};
public:
// called when there is qualified activity
void
activity()
{
tm_idle = false;
}
// Maintain the expiration timer
template<class Executor>
void
update_timer(Executor const& ex)
{
if(role == role_type::server)
{
if(! is_timer_set())
{
// turn timer on
timer.expires_after(tm_dur);
timer.async_wait(
timeout_handler<Executor>(
ex, this->shared_from_this()));
}
}
else if(tm_opt && ! is_timer_set())
{
// turn timer on
timer.expires_after(tm_dur);
timer.async_wait(
timeout_handler<Executor>(
ex, this->shared_from_this()));
}
else if(! tm_opt && is_timer_set())
{
// turn timer off
timer.cancel();
}
}
//--------------------------------------------------------------------------
bool ec_delivered = false;
// Determine if an operation should stop and
// deliver an error code to the completion handler.
//
// This function must be called at the beginning
// of every composed operation, and every time a
// composed operation receives an intermediate
// completion.
//
bool
check_stop_now(error_code& ec)
{
// If the stream is closed then abort
if( status_ == status::closed ||
status_ == status::failed)
{
//BOOST_ASSERT(ec_delivered);
ec = net::error::operation_aborted;
return true;
}
// If no error then keep going
if(! ec)
return false;
// Is this the first error seen?
if(ec_delivered)
{
// No, so abort
ec = net::error::operation_aborted;
return true;
}
// Deliver the error to the completion handler
ec_delivered = true;
if(status_ != status::closed)
status_ = status::failed;
return true; return true;
} }
bool // Change the status of the stream
check_ok(error_code& ec) void
change_status(status new_status)
{ {
if(ec) switch(new_status)
{ {
if(status_ != status::closed) case status::closing:
status_ = status::failed; BOOST_ASSERT(status_ == status::open);
return false; break;
case status::failed:
case status::closed:
// this->close(); // Is this right?
break;
default:
break;
} }
return true; status_ = new_status;
} }
}; };

View File

@@ -13,6 +13,7 @@
#include <boost/beast/core/async_op_base.hpp> #include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/stream_traits.hpp> #include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/core/detail/bind_continuation.hpp>
#include <boost/beast/core/detail/type_traits.hpp> #include <boost/beast/core/detail/type_traits.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
@@ -50,44 +51,38 @@ public:
, s_(s) , s_(s)
, role_(role) , role_(role)
{ {
(*this)({}, 0, false);
} }
void void
operator()( operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0) std::size_t bytes_transferred = 0,
bool cont = true)
{ {
using tcp = net::ip::tcp; using tcp = net::ip::tcp;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
nb_ = s_.non_blocking(); nb_ = s_.non_blocking();
s_.non_blocking(true, ec); s_.non_blocking(true, ec);
if(! ec)
{
if(role_ == role_type::server)
s_.shutdown(tcp::socket::shutdown_send, ec);
}
if(ec) if(ec)
{
BOOST_ASIO_CORO_YIELD
net::post(
s_.get_executor(),
beast::bind_front_handler(std::move(*this), ec, 0));
goto upcall; goto upcall;
} if(role_ == role_type::server)
s_.shutdown(tcp::socket::shutdown_send, ec);
if(ec)
goto upcall;
for(;;) for(;;)
{ {
{ {
char buf[2048]; char buf[2048];
s_.read_some( s_.read_some(net::buffer(buf), ec);
net::buffer(buf), ec);
} }
if(ec == net::error::would_block) if(ec == net::error::would_block)
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
s_.async_wait( s_.async_wait(
net::ip::tcp::socket::wait_read, net::ip::tcp::socket::wait_read,
std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
continue; continue;
} }
if(ec) if(ec)
@@ -110,6 +105,12 @@ public:
goto upcall; goto upcall;
s_.close(ec); s_.close(ec);
upcall: upcall:
if(! cont)
{
BOOST_ASIO_CORO_YIELD
net::post(bind_front_handler(
std::move(*this), ec));
}
{ {
error_code ignored; error_code ignored;
s_.non_blocking(nb_, ignored); s_.non_blocking(nb_, ignored);
@@ -173,8 +174,7 @@ async_teardown(
"TeardownHandler requirements not met"); "TeardownHandler requirements not met");
detail::teardown_tcp_op<typename std::decay< detail::teardown_tcp_op<typename std::decay<
TeardownHandler>::type>(std::forward< TeardownHandler>::type>(std::forward<
TeardownHandler>(handler), socket, TeardownHandler>(handler), socket, role);
role)();
} }
} // websocket } // websocket

View File

@@ -20,6 +20,7 @@
#include <boost/beast/core/buffers_suffix.hpp> #include <boost/beast/core/buffers_suffix.hpp>
#include <boost/beast/core/flat_static_buffer.hpp> #include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/stream_traits.hpp> #include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/core/detail/bind_continuation.hpp>
#include <boost/beast/core/detail/clamp.hpp> #include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#include <boost/beast/websocket/detail/frame.hpp> #include <boost/beast/websocket/detail/frame.hpp>
@@ -41,6 +42,15 @@ class stream<NextLayer, deflateSupported>::write_some_op
Handler, beast::executor_type<stream>> Handler, beast::executor_type<stream>>
, public net::coroutine , public net::coroutine
{ {
enum
{
do_nomask_nofrag,
do_nomask_frag,
do_mask_nofrag,
do_mask_frag,
do_deflate
};
stream& ws_; stream& ws_;
buffers_suffix<Buffers> cb_; buffers_suffix<Buffers> cb_;
detail::frame_header fh_; detail::frame_header fh_;
@@ -69,6 +79,63 @@ public:
, cb_(bs) , cb_(bs)
, fin_(fin) , fin_(fin)
{ {
auto& impl = *ws_.impl_;
// Set up the outgoing frame header
if(! impl.wr_cont)
{
impl.begin_msg();
fh_.rsv1 = impl.wr_compress;
}
else
{
fh_.rsv1 = false;
}
fh_.rsv2 = false;
fh_.rsv3 = false;
fh_.op = impl.wr_cont ?
detail::opcode::cont : impl.wr_opcode;
fh_.mask =
impl.role == role_type::client;
// Choose a write algorithm
if(impl.wr_compress)
{
how_ = do_deflate;
}
else if(! fh_.mask)
{
if(! impl.wr_frag)
{
how_ = do_nomask_nofrag;
}
else
{
BOOST_ASSERT(impl.wr_buf_size != 0);
remain_ = buffer_size(cb_);
if(remain_ > impl.wr_buf_size)
how_ = do_nomask_frag;
else
how_ = do_nomask_nofrag;
}
}
else
{
if(! impl.wr_frag)
{
how_ = do_mask_nofrag;
}
else
{
BOOST_ASSERT(impl.wr_buf_size != 0);
remain_ = buffer_size(cb_);
if(remain_ > impl.wr_buf_size)
how_ = do_mask_frag;
else
how_ = do_mask_nofrag;
}
}
(*this)({}, 0, false);
} }
void operator()( void operator()(
@@ -88,283 +155,210 @@ operator()(
bool cont) bool cont)
{ {
using beast::detail::clamp; using beast::detail::clamp;
enum
{
do_nomask_nofrag,
do_nomask_frag,
do_mask_nofrag,
do_mask_frag,
do_deflate
};
std::size_t n; std::size_t n;
net::mutable_buffer b; net::mutable_buffer b;
cont_ = cont; auto& impl = *ws_.impl_;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Set up the outgoing frame header // Acquire the write lock
if(! ws_.impl_->wr_cont) if(! impl.wr_block.try_lock(this))
{
ws_.impl_->begin_msg();
fh_.rsv1 = ws_.impl_->wr_compress;
}
else
{
fh_.rsv1 = false;
}
fh_.rsv2 = false;
fh_.rsv3 = false;
fh_.op = ws_.impl_->wr_cont ?
detail::opcode::cont : ws_.impl_->wr_opcode;
fh_.mask =
ws_.impl_->role == role_type::client;
// Choose a write algorithm
if(ws_.impl_->wr_compress)
{
how_ = do_deflate;
}
else if(! fh_.mask)
{
if(! ws_.impl_->wr_frag)
{
how_ = do_nomask_nofrag;
}
else
{
BOOST_ASSERT(ws_.impl_->wr_buf_size != 0);
remain_ = buffer_size(cb_);
if(remain_ > ws_.impl_->wr_buf_size)
how_ = do_nomask_frag;
else
how_ = do_nomask_nofrag;
}
}
else
{
if(! ws_.impl_->wr_frag)
{
how_ = do_mask_nofrag;
}
else
{
BOOST_ASSERT(ws_.impl_->wr_buf_size != 0);
remain_ = buffer_size(cb_);
if(remain_ > ws_.impl_->wr_buf_size)
how_ = do_mask_frag;
else
how_ = do_mask_nofrag;
}
}
// Maybe suspend
if(ws_.impl_->wr_block.try_lock(this))
{
// Make sure the stream is open
if(! ws_.impl_->check_open(ec))
goto upcall;
}
else
{ {
do_suspend: do_suspend:
// Suspend
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
ws_.impl_->paused_wr.emplace(std::move(*this)); impl.paused_wr.emplace(std::move(*this));
impl.wr_block.lock(this);
// Acquire the write block
ws_.impl_->wr_block.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::post( net::post(std::move(*this));
ws_.get_executor(), std::move(*this)); BOOST_ASSERT(impl.wr_block.is_locked(this));
BOOST_ASSERT(ws_.impl_->wr_block.is_locked(this));
// Make sure the stream is open
if(! ws_.impl_->check_open(ec))
goto upcall;
} }
if(impl.check_stop_now(ec))
goto upcall;
//------------------------------------------------------------------ //------------------------------------------------------------------
if(how_ == do_nomask_nofrag) if(how_ == do_nomask_nofrag)
{ {
// send a single frame
fh_.fin = fin_; fh_.fin = fin_;
fh_.len = buffer_size(cb_); fh_.len = buffer_size(cb_);
ws_.impl_->wr_fb.clear(); impl.wr_fb.clear();
detail::write<flat_static_buffer_base>( detail::write<flat_static_buffer_base>(
ws_.impl_->wr_fb, fh_); impl.wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_; impl.wr_cont = ! fin_;
// Send frame
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write(ws_.impl_->stream, net::async_write(impl.stream,
buffers_cat(ws_.impl_->wr_fb.data(), cb_), buffers_cat(impl.wr_fb.data(), cb_),
std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! ws_.impl_->check_ok(ec))
goto upcall;
bytes_transferred_ += clamp(fh_.len); bytes_transferred_ += clamp(fh_.len);
if(impl.check_stop_now(ec))
goto upcall;
goto upcall; goto upcall;
} }
//------------------------------------------------------------------ //------------------------------------------------------------------
else if(how_ == do_nomask_frag) if(how_ == do_nomask_frag)
{ {
// send multiple frames
for(;;) for(;;)
{ {
n = clamp(remain_, ws_.impl_->wr_buf_size); n = clamp(remain_, impl.wr_buf_size);
fh_.len = n; fh_.len = n;
remain_ -= n; remain_ -= n;
fh_.fin = fin_ ? remain_ == 0 : false; fh_.fin = fin_ ? remain_ == 0 : false;
ws_.impl_->wr_fb.clear(); impl.wr_fb.clear();
detail::write<flat_static_buffer_base>( detail::write<flat_static_buffer_base>(
ws_.impl_->wr_fb, fh_); impl.wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_; impl.wr_cont = ! fin_;
// Send frame // Send frame
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write( net::async_write(impl.stream, buffers_cat(
ws_.impl_->stream, buffers_cat( impl.wr_fb.data(),
ws_.impl_->wr_fb.data(), buffers_prefix( buffers_prefix(clamp(fh_.len), cb_)),
clamp(fh_.len), cb_)), beast::detail::bind_continuation(std::move(*this)));
std::move(*this)); n = clamp(fh_.len); // restore `n` on yield
if(! ws_.impl_->check_ok(ec))
goto upcall;
n = clamp(fh_.len); // because yield
bytes_transferred_ += n; bytes_transferred_ += n;
if(impl.check_stop_now(ec))
goto upcall;
if(remain_ == 0) if(remain_ == 0)
break; break;
cb_.consume(n); cb_.consume(n);
fh_.op = detail::opcode::cont; fh_.op = detail::opcode::cont;
// Allow outgoing control frames to
// be sent in between message frames // Give up the write lock in between each frame
ws_.impl_->wr_block.unlock(this); // so that outgoing control frames might be sent.
if( ws_.impl_->paused_close.maybe_invoke() || impl.wr_block.unlock(this);
ws_.impl_->paused_rd.maybe_invoke() || if( impl.paused_close.maybe_invoke() ||
ws_.impl_->paused_ping.maybe_invoke()) impl.paused_rd.maybe_invoke() ||
impl.paused_ping.maybe_invoke())
{ {
BOOST_ASSERT(ws_.impl_->wr_block.is_locked()); BOOST_ASSERT(impl.wr_block.is_locked());
goto do_suspend; goto do_suspend;
} }
ws_.impl_->wr_block.lock(this); impl.wr_block.lock(this);
} }
goto upcall; goto upcall;
} }
//------------------------------------------------------------------ //------------------------------------------------------------------
else if(how_ == do_mask_nofrag) if(how_ == do_mask_nofrag)
{ {
remain_ = buffer_size(cb_); // send a single frame using multiple writes
remain_ = beast::buffer_size(cb_);
fh_.fin = fin_; fh_.fin = fin_;
fh_.len = remain_; fh_.len = remain_;
fh_.key = ws_.create_mask(); fh_.key = ws_.create_mask();
detail::prepare_key(key_, fh_.key); detail::prepare_key(key_, fh_.key);
ws_.impl_->wr_fb.clear(); impl.wr_fb.clear();
detail::write<flat_static_buffer_base>( detail::write<flat_static_buffer_base>(
ws_.impl_->wr_fb, fh_); impl.wr_fb, fh_);
n = clamp(remain_, ws_.impl_->wr_buf_size); n = clamp(remain_, impl.wr_buf_size);
net::buffer_copy(net::buffer( net::buffer_copy(net::buffer(
ws_.impl_->wr_buf.get(), n), cb_); impl.wr_buf.get(), n), cb_);
detail::mask_inplace(net::buffer( detail::mask_inplace(net::buffer(
ws_.impl_->wr_buf.get(), n), key_); impl.wr_buf.get(), n), key_);
remain_ -= n; remain_ -= n;
ws_.impl_->wr_cont = ! fin_; impl.wr_cont = ! fin_;
// Send frame header and partial payload // write frame header and some payload
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write( net::async_write(impl.stream, buffers_cat(
ws_.impl_->stream, buffers_cat(ws_.impl_->wr_fb.data(), impl.wr_fb.data(),
net::buffer(ws_.impl_->wr_buf.get(), n)), net::buffer(impl.wr_buf.get(), n)),
std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! ws_.impl_->check_ok(ec)) // VFALCO What about consuming the buffer on error?
goto upcall;
bytes_transferred_ += bytes_transferred_ +=
bytes_transferred - ws_.impl_->wr_fb.size(); bytes_transferred - impl.wr_fb.size();
if(impl.check_stop_now(ec))
goto upcall;
while(remain_ > 0) while(remain_ > 0)
{ {
cb_.consume(ws_.impl_->wr_buf_size); cb_.consume(impl.wr_buf_size);
n = clamp(remain_, ws_.impl_->wr_buf_size); n = clamp(remain_, impl.wr_buf_size);
net::buffer_copy(net::buffer( net::buffer_copy(net::buffer(
ws_.impl_->wr_buf.get(), n), cb_); impl.wr_buf.get(), n), cb_);
detail::mask_inplace(net::buffer( detail::mask_inplace(net::buffer(
ws_.impl_->wr_buf.get(), n), key_); impl.wr_buf.get(), n), key_);
remain_ -= n; remain_ -= n;
// Send partial payload // write more payload
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write(ws_.impl_->stream, net::async_write(impl.stream,
net::buffer(ws_.impl_->wr_buf.get(), n), net::buffer(impl.wr_buf.get(), n),
std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! ws_.impl_->check_ok(ec))
goto upcall;
bytes_transferred_ += bytes_transferred; bytes_transferred_ += bytes_transferred;
if(impl.check_stop_now(ec))
goto upcall;
} }
goto upcall; goto upcall;
} }
//------------------------------------------------------------------ //------------------------------------------------------------------
else if(how_ == do_mask_frag) if(how_ == do_mask_frag)
{ {
// send multiple frames
for(;;) for(;;)
{ {
n = clamp(remain_, ws_.impl_->wr_buf_size); n = clamp(remain_, impl.wr_buf_size);
remain_ -= n; remain_ -= n;
fh_.len = n; fh_.len = n;
fh_.key = ws_.create_mask(); fh_.key = ws_.create_mask();
fh_.fin = fin_ ? remain_ == 0 : false; fh_.fin = fin_ ? remain_ == 0 : false;
detail::prepare_key(key_, fh_.key); detail::prepare_key(key_, fh_.key);
net::buffer_copy(net::buffer( net::buffer_copy(net::buffer(
ws_.impl_->wr_buf.get(), n), cb_); impl.wr_buf.get(), n), cb_);
detail::mask_inplace(net::buffer( detail::mask_inplace(net::buffer(
ws_.impl_->wr_buf.get(), n), key_); impl.wr_buf.get(), n), key_);
ws_.impl_->wr_fb.clear(); impl.wr_fb.clear();
detail::write<flat_static_buffer_base>( detail::write<flat_static_buffer_base>(
ws_.impl_->wr_fb, fh_); impl.wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_; impl.wr_cont = ! fin_;
// Send frame // Send frame
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write(ws_.impl_->stream, net::async_write(impl.stream, buffers_cat(
buffers_cat(ws_.impl_->wr_fb.data(), impl.wr_fb.data(),
net::buffer(ws_.impl_->wr_buf.get(), n)), net::buffer(impl.wr_buf.get(), n)),
std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! ws_.impl_->check_ok(ec)) n = bytes_transferred - impl.wr_fb.size();
goto upcall;
n = bytes_transferred - ws_.impl_->wr_fb.size();
bytes_transferred_ += n; bytes_transferred_ += n;
if(impl.check_stop_now(ec))
goto upcall;
if(remain_ == 0) if(remain_ == 0)
break; break;
cb_.consume(n); cb_.consume(n);
fh_.op = detail::opcode::cont; fh_.op = detail::opcode::cont;
// Allow outgoing control frames to // Give up the write lock in between each frame
// be sent in between message frames: // so that outgoing control frames might be sent.
ws_.impl_->wr_block.unlock(this); impl.wr_block.unlock(this);
if( ws_.impl_->paused_close.maybe_invoke() || if( impl.paused_close.maybe_invoke() ||
ws_.impl_->paused_rd.maybe_invoke() || impl.paused_rd.maybe_invoke() ||
ws_.impl_->paused_ping.maybe_invoke()) impl.paused_ping.maybe_invoke())
{ {
BOOST_ASSERT(ws_.impl_->wr_block.is_locked()); BOOST_ASSERT(impl.wr_block.is_locked());
goto do_suspend; goto do_suspend;
} }
ws_.impl_->wr_block.lock(this); impl.wr_block.lock(this);
} }
goto upcall; goto upcall;
} }
//------------------------------------------------------------------ //------------------------------------------------------------------
else if(how_ == do_deflate) if(how_ == do_deflate)
{ {
// send compressed frames
for(;;) for(;;)
{ {
b = net::buffer(ws_.impl_->wr_buf.get(), b = net::buffer(impl.wr_buf.get(),
ws_.impl_->wr_buf_size); impl.wr_buf_size);
more_ = ws_.impl_->deflate(b, cb_, fin_, in_, ec); more_ = impl.deflate(b, cb_, fin_, in_, ec);
if(! ws_.impl_->check_ok(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
n = buffer_size(b); n = buffer_size(b);
if(n == 0) if(n == 0)
{ {
// The input was consumed, but there // The input was consumed, but there is
// is no output due to compression // no output due to compression latency.
// latency.
BOOST_ASSERT(! fin_); BOOST_ASSERT(! fin_);
BOOST_ASSERT(buffer_size(cb_) == 0); BOOST_ASSERT(buffer_size(cb_) == 0);
goto upcall; goto upcall;
@@ -378,38 +372,38 @@ operator()(
} }
fh_.fin = ! more_; fh_.fin = ! more_;
fh_.len = n; fh_.len = n;
ws_.impl_->wr_fb.clear(); impl.wr_fb.clear();
detail::write< detail::write<
flat_static_buffer_base>(ws_.impl_->wr_fb, fh_); flat_static_buffer_base>(impl.wr_fb, fh_);
ws_.impl_->wr_cont = ! fin_; impl.wr_cont = ! fin_;
// Send frame // Send frame
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
net::async_write(ws_.impl_->stream, net::async_write(impl.stream, buffers_cat(
buffers_cat(ws_.impl_->wr_fb.data(), b), impl.wr_fb.data(), b),
std::move(*this)); beast::detail::bind_continuation(std::move(*this)));
if(! ws_.impl_->check_ok(ec))
goto upcall;
bytes_transferred_ += in_; bytes_transferred_ += in_;
if(impl.check_stop_now(ec))
goto upcall;
if(more_) if(more_)
{ {
fh_.op = detail::opcode::cont; fh_.op = detail::opcode::cont;
fh_.rsv1 = false; fh_.rsv1 = false;
// Allow outgoing control frames to // Give up the write lock in between each frame
// be sent in between message frames: // so that outgoing control frames might be sent.
ws_.impl_->wr_block.unlock(this); impl.wr_block.unlock(this);
if( ws_.impl_->paused_close.maybe_invoke() || if( impl.paused_close.maybe_invoke() ||
ws_.impl_->paused_rd.maybe_invoke() || impl.paused_rd.maybe_invoke() ||
ws_.impl_->paused_ping.maybe_invoke()) impl.paused_ping.maybe_invoke())
{ {
BOOST_ASSERT(ws_.impl_->wr_block.is_locked()); BOOST_ASSERT(impl.wr_block.is_locked());
goto do_suspend; goto do_suspend;
} }
ws_.impl_->wr_block.lock(this); impl.wr_block.lock(this);
} }
else else
{ {
if(fh_.fin) if(fh_.fin)
ws_.impl_->do_context_takeover_write(ws_.impl_->role); impl.do_context_takeover_write(impl.role);
goto upcall; goto upcall;
} }
} }
@@ -418,11 +412,17 @@ operator()(
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
upcall: upcall:
ws_.impl_->wr_block.unlock(this); impl.wr_block.unlock(this);
ws_.impl_->paused_close.maybe_invoke() || impl.paused_close.maybe_invoke()
ws_.impl_->paused_rd.maybe_invoke() || || impl.paused_rd.maybe_invoke()
ws_.impl_->paused_ping.maybe_invoke(); || impl.paused_ping.maybe_invoke();
this->invoke(cont_, ec, bytes_transferred_); if(! cont)
{
BOOST_ASIO_CORO_YIELD
net::post(bind_front_handler(
std::move(*this), ec, bytes_transferred_));
}
this->invoke(cont, ec, bytes_transferred_);
} }
} }
@@ -460,16 +460,16 @@ write_some(bool fin,
ConstBufferSequence>::value, ConstBufferSequence>::value,
"ConstBufferSequence requirements not met"); "ConstBufferSequence requirements not met");
using beast::detail::clamp; using beast::detail::clamp;
auto& impl = *impl_;
std::size_t bytes_transferred = 0; std::size_t bytes_transferred = 0;
ec = {}; ec = {};
// Make sure the stream is open if(impl.check_stop_now(ec))
if(! impl_->check_open(ec))
return bytes_transferred; return bytes_transferred;
detail::frame_header fh; detail::frame_header fh;
if(! impl_->wr_cont) if(! impl.wr_cont)
{ {
impl_->begin_msg(); impl.begin_msg();
fh.rsv1 = impl_->wr_compress; fh.rsv1 = impl.wr_compress;
} }
else else
{ {
@@ -477,21 +477,22 @@ write_some(bool fin,
} }
fh.rsv2 = false; fh.rsv2 = false;
fh.rsv3 = false; fh.rsv3 = false;
fh.op = impl_->wr_cont ? fh.op = impl.wr_cont ?
detail::opcode::cont : impl_->wr_opcode; detail::opcode::cont : impl.wr_opcode;
fh.mask = impl_->role == role_type::client; fh.mask = impl.role == role_type::client;
auto remain = buffer_size(buffers); auto remain = buffer_size(buffers);
if(impl_->wr_compress) if(impl.wr_compress)
{ {
buffers_suffix< buffers_suffix<
ConstBufferSequence> cb{buffers}; ConstBufferSequence> cb(buffers);
for(;;) for(;;)
{ {
auto b = net::buffer( auto b = net::buffer(
impl_->wr_buf.get(), impl_->wr_buf_size); impl.wr_buf.get(), impl.wr_buf_size);
auto const more = impl_->deflate( auto const more = impl.deflate(
b, cb, fin, bytes_transferred, ec); b, cb, fin, bytes_transferred, ec);
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_transferred; return bytes_transferred;
auto const n = buffer_size(b); auto const n = buffer_size(b);
if(n == 0) if(n == 0)
@@ -516,10 +517,10 @@ write_some(bool fin,
detail::fh_buffer fh_buf; detail::fh_buffer fh_buf;
detail::write< detail::write<
flat_static_buffer_base>(fh_buf, fh); flat_static_buffer_base>(fh_buf, fh);
impl_->wr_cont = ! fin; impl.wr_cont = ! fin;
net::write(impl_->stream, net::write(impl.stream,
buffers_cat(fh_buf.data(), b), ec); buffers_cat(fh_buf.data(), b), ec);
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_transferred; return bytes_transferred;
if(! more) if(! more)
break; break;
@@ -527,11 +528,11 @@ write_some(bool fin,
fh.rsv1 = false; fh.rsv1 = false;
} }
if(fh.fin) if(fh.fin)
impl_->do_context_takeover_write(impl_->role); impl.do_context_takeover_write(impl.role);
} }
else if(! fh.mask) else if(! fh.mask)
{ {
if(! impl_->wr_frag) if(! impl.wr_frag)
{ {
// no mask, no autofrag // no mask, no autofrag
fh.fin = fin; fh.fin = fin;
@@ -539,35 +540,35 @@ write_some(bool fin,
detail::fh_buffer fh_buf; detail::fh_buffer fh_buf;
detail::write< detail::write<
flat_static_buffer_base>(fh_buf, fh); flat_static_buffer_base>(fh_buf, fh);
impl_->wr_cont = ! fin; impl.wr_cont = ! fin;
net::write(impl_->stream, net::write(impl.stream,
buffers_cat(fh_buf.data(), buffers), ec); buffers_cat(fh_buf.data(), buffers), ec);
if(! impl_->check_ok(ec)) if(impl.check_stop_now(ec))
return bytes_transferred; return bytes_transferred;
bytes_transferred += remain; bytes_transferred += remain;
} }
else else
{ {
// no mask, autofrag // no mask, autofrag
BOOST_ASSERT(impl_->wr_buf_size != 0); BOOST_ASSERT(impl.wr_buf_size != 0);
buffers_suffix< buffers_suffix<
ConstBufferSequence> cb{buffers}; ConstBufferSequence> cb{buffers};
for(;;) for(;;)
{ {
auto const n = clamp(remain, impl_->wr_buf_size); auto const n = clamp(remain, impl.wr_buf_size);
remain -= n; remain -= n;
fh.len = n; fh.len = n;
fh.fin = fin ? remain == 0 : false; fh.fin = fin ? remain == 0 : false;
detail::fh_buffer fh_buf; detail::fh_buffer fh_buf;
detail::write< detail::write<
flat_static_buffer_base>(fh_buf, fh); flat_static_buffer_base>(fh_buf, fh);
impl_->wr_cont = ! fin; impl.wr_cont = ! fin;
net::write(impl_->stream, net::write(impl.stream,
beast::buffers_cat(fh_buf.data(), beast::buffers_cat(fh_buf.data(),
beast::buffers_prefix(n, cb)), ec); beast::buffers_prefix(n, cb)), ec);
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n; bytes_transferred += n;
if(impl.check_stop_now(ec))
return bytes_transferred;
if(remain == 0) if(remain == 0)
break; break;
fh.op = detail::opcode::cont; fh.op = detail::opcode::cont;
@@ -575,7 +576,7 @@ write_some(bool fin,
} }
} }
} }
else if(! impl_->wr_frag) else if(! impl.wr_frag)
{ {
// mask, no autofrag // mask, no autofrag
fh.fin = fin; fh.fin = fin;
@@ -590,40 +591,40 @@ write_some(bool fin,
ConstBufferSequence> cb{buffers}; ConstBufferSequence> cb{buffers};
{ {
auto const n = auto const n =
clamp(remain, impl_->wr_buf_size); clamp(remain, impl.wr_buf_size);
auto const b = auto const b =
net::buffer(impl_->wr_buf.get(), n); net::buffer(impl.wr_buf.get(), n);
net::buffer_copy(b, cb); net::buffer_copy(b, cb);
cb.consume(n); cb.consume(n);
remain -= n; remain -= n;
detail::mask_inplace(b, key); detail::mask_inplace(b, key);
impl_->wr_cont = ! fin; impl.wr_cont = ! fin;
net::write(impl_->stream, net::write(impl.stream,
buffers_cat(fh_buf.data(), b), ec); buffers_cat(fh_buf.data(), b), ec);
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n; bytes_transferred += n;
if(impl.check_stop_now(ec))
return bytes_transferred;
} }
while(remain > 0) while(remain > 0)
{ {
auto const n = auto const n =
clamp(remain, impl_->wr_buf_size); clamp(remain, impl.wr_buf_size);
auto const b = auto const b =
net::buffer(impl_->wr_buf.get(), n); net::buffer(impl.wr_buf.get(), n);
net::buffer_copy(b, cb); net::buffer_copy(b, cb);
cb.consume(n); cb.consume(n);
remain -= n; remain -= n;
detail::mask_inplace(b, key); detail::mask_inplace(b, key);
net::write(impl_->stream, b, ec); net::write(impl.stream, b, ec);
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n; bytes_transferred += n;
if(impl.check_stop_now(ec))
return bytes_transferred;
} }
} }
else else
{ {
// mask, autofrag // mask, autofrag
BOOST_ASSERT(impl_->wr_buf_size != 0); BOOST_ASSERT(impl.wr_buf_size != 0);
buffers_suffix< buffers_suffix<
ConstBufferSequence> cb(buffers); ConstBufferSequence> cb(buffers);
for(;;) for(;;)
@@ -632,23 +633,23 @@ write_some(bool fin,
detail::prepared_key key; detail::prepared_key key;
detail::prepare_key(key, fh.key); detail::prepare_key(key, fh.key);
auto const n = auto const n =
clamp(remain, impl_->wr_buf_size); clamp(remain, impl.wr_buf_size);
auto const b = auto const b =
net::buffer(impl_->wr_buf.get(), n); net::buffer(impl.wr_buf.get(), n);
net::buffer_copy(b, cb); net::buffer_copy(b, cb);
detail::mask_inplace(b, key); detail::mask_inplace(b, key);
fh.len = n; fh.len = n;
remain -= n; remain -= n;
fh.fin = fin ? remain == 0 : false; fh.fin = fin ? remain == 0 : false;
impl_->wr_cont = ! fh.fin; impl.wr_cont = ! fh.fin;
detail::fh_buffer fh_buf; detail::fh_buffer fh_buf;
detail::write< detail::write<
flat_static_buffer_base>(fh_buf, fh); flat_static_buffer_base>(fh_buf, fh);
net::write(impl_->stream, net::write(impl.stream,
buffers_cat(fh_buf.data(), b), ec); buffers_cat(fh_buf.data(), b), ec);
if(! impl_->check_ok(ec))
return bytes_transferred;
bytes_transferred += n; bytes_transferred += n;
if(impl.check_stop_now(ec))
return bytes_transferred;
if(remain == 0) if(remain == 0)
break; break;
fh.op = detail::opcode::cont; fh.op = detail::opcode::cont;
@@ -674,9 +675,8 @@ async_write_some(bool fin,
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
WriteHandler, void(error_code, std::size_t)); WriteHandler, void(error_code, std::size_t));
write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE( write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code, std::size_t))>{ WriteHandler, void(error_code, std::size_t))>(
std::move(init.completion_handler), *this, fin, bs}( std::move(init.completion_handler), *this, fin, bs);
{}, 0, false);
return init.result.get(); return init.result.get();
} }
@@ -730,9 +730,8 @@ async_write(
BOOST_BEAST_HANDLER_INIT( BOOST_BEAST_HANDLER_INIT(
WriteHandler, void(error_code, std::size_t)); WriteHandler, void(error_code, std::size_t));
write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE( write_some_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code, std::size_t))>{ WriteHandler, void(error_code, std::size_t))>(
std::move(init.completion_handler), *this, true, bs}( std::move(init.completion_handler), *this, true, bs);
{}, 0, false);
return init.result.get(); return init.result.get();
} }

View File

@@ -151,18 +151,18 @@ class stream
using control_cb_type = using control_cb_type =
std::function<void(frame_type, string_view)>; std::function<void(frame_type, string_view)>;
enum class status
{
open,
closing,
closed,
failed
};
struct impl_type; struct impl_type;
std::shared_ptr<impl_type> impl_; std::shared_ptr<impl_type> impl_;
using time_point = typename
std::chrono::steady_clock::time_point;
static constexpr time_point never()
{
return (time_point::max)();
}
public: public:
/// Indicates if the permessage-deflate extension is supported /// Indicates if the permessage-deflate extension is supported
using is_deflate_supported = using is_deflate_supported =
@@ -579,6 +579,18 @@ public:
bool bool
text() const; text() const;
/*
timer settings
* Timer is disabled
* Close on timeout
- no complete frame received, OR
- no complete frame sent
* Ping on timeout
- ping on no complete frame received
* if can't ping?
*/
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
// Handshaking (Client) // Handshaking (Client)

View File

@@ -21,6 +21,7 @@ add_executable (tests-beast-core
stream_tests.hpp stream_tests.hpp
test_handler.hpp test_handler.hpp
_detail_base64.cpp _detail_base64.cpp
_detail_bind_continuation.cpp
_detail_buffer.cpp _detail_buffer.cpp
_detail_clamp.cpp _detail_clamp.cpp
_detail_is_invocable.cpp _detail_is_invocable.cpp
@@ -31,7 +32,6 @@ add_executable (tests-beast-core
_detail_varint.cpp _detail_varint.cpp
async_op_base.cpp async_op_base.cpp
basic_stream.cpp basic_stream.cpp
bind_continuation.cpp
bind_handler.cpp bind_handler.cpp
buffer_size.cpp buffer_size.cpp
buffer_traits.cpp buffer_traits.cpp

View File

@@ -9,6 +9,7 @@
local SOURCES = local SOURCES =
_detail_base64.cpp _detail_base64.cpp
_detail_bind_continuation.cpp
_detail_buffer.cpp _detail_buffer.cpp
_detail_clamp.cpp _detail_clamp.cpp
_detail_is_invocable.cpp _detail_is_invocable.cpp
@@ -19,7 +20,6 @@ local SOURCES =
_detail_varint.cpp _detail_varint.cpp
async_op_base.cpp async_op_base.cpp
basic_stream.cpp basic_stream.cpp
bind_continuation.cpp
bind_handler.cpp bind_handler.cpp
buffer_size.cpp buffer_size.cpp
buffer_traits.cpp buffer_traits.cpp

View File

@@ -8,7 +8,7 @@
// //
// Test that header file is self-contained. // Test that header file is self-contained.
#include <boost/beast/core/bind_continuation.hpp> #include <boost/beast/core/detail/bind_continuation.hpp>
#include "test_executor.hpp" #include "test_executor.hpp"
#include "test_handler.hpp" #include "test_handler.hpp"
@@ -24,6 +24,7 @@
namespace boost { namespace boost {
namespace beast { namespace beast {
namespace detail {
class bind_continuation_test class bind_continuation_test
: public beast::unit_test::suite : public beast::unit_test::suite
@@ -182,5 +183,6 @@ public:
BEAST_DEFINE_TESTSUITE(beast,core,bind_continuation); BEAST_DEFINE_TESTSUITE(beast,core,bind_continuation);
} // detail
} // beast } // beast
} // boost } // boost

View File

@@ -35,6 +35,7 @@ add_executable (tests-beast-websocket
stream_explicit.cpp stream_explicit.cpp
stream_fwd.cpp stream_fwd.cpp
teardown.cpp teardown.cpp
timer.cpp
utf8_checker.cpp utf8_checker.cpp
write.cpp write.cpp
) )

View File

@@ -25,6 +25,7 @@ local SOURCES =
stream_explicit.cpp stream_explicit.cpp
stream_fwd.cpp stream_fwd.cpp
teardown.cpp teardown.cpp
timer.cpp
utf8_checker.cpp utf8_checker.cpp
write.cpp write.cpp
; ;

View File

@@ -608,21 +608,6 @@ public:
}); });
} }
void
testContHook()
{
struct handler
{
void operator()(error_code) {}
};
stream<test::stream> ws{ioc_};
stream<test::stream>::close_op<handler> op{
handler{}, ws, {}};
using net::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
}
void void
testMoveOnly() testMoveOnly()
{ {
@@ -660,7 +645,6 @@ public:
{ {
testClose(); testClose();
testSuspend(); testSuspend();
testContHook();
testMoveOnly(); testMoveOnly();
testAsioHandlerInvoke(); testAsioHandlerInvoke();
} }

View File

@@ -423,21 +423,6 @@ public:
} }
} }
void
testContHook()
{
struct handler
{
void operator()(error_code) {}
};
stream<test::stream> ws{ioc_};
stream<test::stream>::ping_op<handler> op{
handler{}, ws, detail::opcode::ping, {}};
using net::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
}
void void
testMoveOnly() testMoveOnly()
{ {
@@ -475,7 +460,6 @@ public:
{ {
testPing(); testPing();
testSuspend(); testSuspend();
testContHook();
testMoveOnly(); testMoveOnly();
testAsioHandlerInvoke(); testAsioHandlerInvoke();
} }

View File

@@ -339,43 +339,6 @@ public:
bad(string_view("\x81\x7f\x00\x00\x00\x00\x00\x00\xff\xff", 10)); bad(string_view("\x81\x7f\x00\x00\x00\x00\x00\x00\xff\xff", 10));
} }
void
testContHook()
{
{
struct handler
{
void operator()(error_code, std::size_t) {}
};
char buf[32];
stream<test::stream> ws{ioc_};
stream<test::stream>::read_some_op<
net::mutable_buffer,
handler> op{handler{}, ws,
net::mutable_buffer{
buf, sizeof(buf)}};
using net::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
pass();
}
{
struct handler
{
void operator()(error_code, std::size_t) {}
};
multi_buffer b;
stream<test::stream> ws{ioc_};
stream<test::stream>::read_op<
multi_buffer, handler> op{
handler{}, ws, b, 32, true};
using net::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
pass();
}
}
void void
testIssue802() testIssue802()
{ {
@@ -675,7 +638,6 @@ public:
run() override run() override
{ {
testParseFrame(); testParseFrame();
testContHook();
testIssue802(); testIssue802();
testIssue807(); testIssue807();
testIssue954(); testIssue954();

View File

@@ -0,0 +1,138 @@
// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
// Test that header file is self-contained.
#include <boost/beast/websocket/stream.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include "test.hpp"
namespace boost {
namespace beast {
namespace websocket {
class timer_test
: public websocket_test_suite
{
public:
using tcp = boost::asio::ip::tcp;
static
void
connect(
stream<tcp::socket>& ws1,
stream<tcp::socket>& ws2)
{
struct handler
{
void
operator()(error_code ec)
{
BEAST_EXPECTS(! ec, ec.message());
}
};
tcp::acceptor a(ws1.get_executor().context());
auto ep = tcp::endpoint(
net::ip::make_address_v4("127.0.0.1"), 0);
a.open(ep.protocol());
a.set_option(
net::socket_base::reuse_address(true));
a.bind(ep);
a.listen(0);
ep = a.local_endpoint();
a.async_accept(ws2.next_layer(), handler{});
ws1.next_layer().async_connect(ep, handler{});
for(;;)
{
if( ws1.get_executor().context().run_one() +
ws2.get_executor().context().run_one() == 0)
{
ws1.get_executor().context().restart();
ws2.get_executor().context().restart();
break;
}
}
BEAST_EXPECT(
ws1.next_layer().remote_endpoint() ==
ws2.next_layer().local_endpoint());
BEAST_EXPECT(
ws2.next_layer().remote_endpoint() ==
ws1.next_layer().local_endpoint());
ws2.async_accept(handler{});
ws1.async_handshake("test", "/", handler{});
for(;;)
{
if( ws1.get_executor().context().run_one() +
ws2.get_executor().context().run_one() == 0)
{
ws1.get_executor().context().restart();
ws2.get_executor().context().restart();
break;
}
}
BEAST_EXPECT(ws1.is_open());
BEAST_EXPECT(ws2.is_open());
BEAST_EXPECT(! ws1.get_executor().context().stopped());
BEAST_EXPECT(! ws2.get_executor().context().stopped());
}
#if 0
void
testRead0()
{
echo_server es(log, kind::async);
net::io_context ioc;
stream<test::stream> ws(ioc);
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
flat_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
log << ec.message() << "\n";
});
ioc.run();
}
#endif
void
testRead()
{
net::io_context ioc;
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
connect(ws1, ws2);
flat_buffer b;
ws2.async_read(b,
[&](error_code ec, std::size_t)
{
log << "ws1.async_read: " << ec.message() << "\n";
});
ioc.run();
}
void
run() override
{
testRead();
pass();
}
};
BEAST_DEFINE_TESTSUITE(beast,websocket,timer);
} // websocket
} // beast
} // boost

View File

@@ -605,25 +605,6 @@ public:
} }
} }
void
testContHook()
{
struct handler
{
void operator()(error_code) {}
};
char buf[32];
stream<test::stream> ws{ioc_};
stream<test::stream>::write_some_op<
net::const_buffer,
handler> op{handler{}, ws, true,
net::const_buffer{
buf, sizeof(buf)}};
using net::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
}
void void
testMoveOnly() testMoveOnly()
{ {
@@ -669,7 +650,6 @@ public:
testWriteSuspend(); testWriteSuspend();
testAsyncWriteFrame(); testAsyncWriteFrame();
testIssue300(); testIssue300();
testContHook();
testMoveOnly(); testMoveOnly();
} }
}; };