Refactor websocket stream: fixes and tests

This commit is contained in:
Vinnie Falco
2017-08-26 20:10:04 -07:00
parent 1d5b3f488e
commit 30b98674d5
17 changed files with 1225 additions and 950 deletions

View File

@@ -2,6 +2,7 @@ Version 110:
* Refactor stream open state variable * Refactor stream open state variable
* Refactor websocket stream members * Refactor websocket stream members
* Refactor websocket stream: fixes and tests
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------

View File

@@ -22,12 +22,12 @@
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include <memory> #include <memory>
#include <iostream>
namespace boost { namespace boost {
namespace beast { namespace beast {
namespace websocket { namespace websocket {
//------------------------------------------------------------------------------
/* Close the WebSocket Connection /* Close the WebSocket Connection
This composed operation sends the close frame if it hasn't already This composed operation sends the close frame if it hasn't already
@@ -46,6 +46,7 @@ class stream<NextLayer>::close_op
detail::frame_buffer fb; detail::frame_buffer fb;
error_code ev; error_code ev;
token tok; token tok;
bool cont;
state( state(
Handler&, Handler&,
@@ -78,7 +79,8 @@ public:
void void
operator()( operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0,
bool cont = true);
friend friend
void* asio_handler_allocate( void* asio_handler_allocate(
@@ -102,7 +104,7 @@ public:
bool asio_handler_is_continuation(close_op* op) bool asio_handler_is_continuation(close_op* op)
{ {
using boost::asio::asio_handler_is_continuation; using boost::asio::asio_handler_is_continuation;
return asio_handler_is_continuation( return op->d_->cont || asio_handler_is_continuation(
std::addressof(op->d_.handler())); std::addressof(op->d_.handler()));
} }
@@ -120,11 +122,15 @@ template<class NextLayer>
template<class Handler> template<class Handler>
void void
stream<NextLayer>::close_op<Handler>:: stream<NextLayer>::close_op<Handler>::
operator()(error_code ec, std::size_t bytes_transferred) operator()(
error_code ec,
std::size_t bytes_transferred,
bool cont)
{ {
using beast::detail::clamp; using beast::detail::clamp;
auto& d = *d_; auto& d = *d_;
close_code code{}; close_code code{};
d.cont = cont;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Maybe suspend // Maybe suspend
@@ -134,15 +140,9 @@ operator()(error_code ec, std::size_t bytes_transferred)
d.ws.wr_block_ = d.tok; d.ws.wr_block_ = d.tok;
// Make sure the stream is open // Make sure the stream is open
if(! d.ws.open_) if(d.ws.check_fail(ec))
{
BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted));
goto upcall; goto upcall;
} }
}
else else
{ {
// Suspend // Suspend
@@ -160,24 +160,20 @@ operator()(error_code ec, std::size_t bytes_transferred)
BOOST_ASSERT(d.ws.wr_block_ == d.tok); BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open // Make sure the stream is open
if(! d.ws.open_) if(d.ws.check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
goto upcall; goto upcall;
} }
}
// Can't call close twice
BOOST_ASSERT(! d.ws.wr_close_);
// Send close frame // Send close frame
BOOST_ASSERT(! d.ws.wr_close_);
d.ws.wr_close_ = true; d.ws.wr_close_ = true;
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
boost::asio::async_write(d.ws.stream_, boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this)); d.fb.data(), std::move(*this));
if(ec) if(d.ws.check_fail(ec))
{
d.ws.open_ = false;
goto upcall; goto upcall;
}
if(d.ws.rd_close_) if(d.ws.rd_close_)
{ {
@@ -187,6 +183,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
goto teardown; goto teardown;
} }
// Maybe suspend
if(! d.ws.rd_block_) if(! d.ws.rd_block_)
{ {
// Acquire the read block // Acquire the read block
@@ -194,11 +191,6 @@ operator()(error_code ec, std::size_t bytes_transferred)
} }
else else
{ {
// The read_op is currently running so it will see
// the close frame and call teardown. We will suspend
// to cause async_read to return error::closed, before
// we return error::success.
// Suspend // Suspend
BOOST_ASSERT(d.ws.rd_block_ != d.tok); BOOST_ASSERT(d.ws.rd_block_ != d.tok);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
@@ -213,12 +205,11 @@ operator()(error_code ec, std::size_t bytes_transferred)
d.ws.get_io_service().post(std::move(*this)); d.ws.get_io_service().post(std::move(*this));
BOOST_ASSERT(d.ws.rd_block_ == d.tok); BOOST_ASSERT(d.ws.rd_block_ == d.tok);
// Handle the stream closing while suspended // Make sure the stream is open
if(! d.ws.open_) if(d.ws.check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
goto upcall; goto upcall;
}
BOOST_ASSERT(! d.ws.rd_close_);
} }
// Drain // Drain
@@ -240,11 +231,8 @@ operator()(error_code ec, std::size_t bytes_transferred)
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())), d.ws.rd_buf_.max_size())),
std::move(*this)); std::move(*this));
if(ec) if(d.ws.check_fail(ec))
{
d.ws.open_ = false;
goto upcall; goto upcall;
}
d.ws.rd_buf_.commit(bytes_transferred); d.ws.rd_buf_.commit(bytes_transferred);
} }
if(detail::is_control(d.ws.rd_fh_.op)) if(detail::is_control(d.ws.rd_fh_.op))
@@ -283,11 +271,8 @@ operator()(error_code ec, std::size_t bytes_transferred)
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())), d.ws.rd_buf_.max_size())),
std::move(*this)); std::move(*this));
if(ec) if(d.ws.check_fail(ec))
{
d.ws.open_ = false;
goto upcall; goto upcall;
}
d.ws.rd_buf_.commit(bytes_transferred); d.ws.rd_buf_.commit(bytes_transferred);
} }
BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_); BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_);
@@ -304,6 +289,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
async_teardown(d.ws.role_, async_teardown(d.ws.role_,
d.ws.stream_, std::move(*this)); d.ws.stream_, std::move(*this));
BOOST_ASSERT(d.ws.wr_block_ == d.tok); BOOST_ASSERT(d.ws.wr_block_ == d.tok);
BOOST_ASSERT(d.ws.open_);
if(ec == boost::asio::error::eof) if(ec == boost::asio::error::eof)
{ {
// Rationale: // Rationale:
@@ -317,15 +303,20 @@ operator()(error_code ec, std::size_t bytes_transferred)
upcall: upcall:
BOOST_ASSERT(d.ws.wr_block_ == d.tok); BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.wr_block_.reset(); d.ws.wr_block_.reset();
if(d.ws.rd_block_) if(d.ws.rd_block_ == d.tok)
{ {
BOOST_ASSERT(d.ws.rd_block_ = d.tok);
d.ws.rd_block_.reset(); d.ws.rd_block_.reset();
d.ws.paused_r_rd_.maybe_invoke(); d.ws.paused_r_rd_.maybe_invoke();
} }
d.ws.paused_rd_.maybe_invoke() || d.ws.paused_rd_.maybe_invoke() ||
d.ws.paused_ping_.maybe_invoke() || d.ws.paused_ping_.maybe_invoke() ||
d.ws.paused_wr_.maybe_invoke(); d.ws.paused_wr_.maybe_invoke();
if(! d.cont)
{
auto& ws = d.ws;
return ws.stream_.get_io_service().post(
bind_handler(d_.release_handler(), ec));
}
d_.invoke(ec); d_.invoke(ec);
} }
} }
@@ -353,12 +344,10 @@ close(close_reason const& cr, error_code& ec)
static_assert(is_sync_stream<next_layer_type>::value, static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met"); "SyncStream requirements not met");
using beast::detail::clamp; using beast::detail::clamp;
ec.assign(0, ec.category());
// Make sure the stream is open // Make sure the stream is open
if(! open_) if(check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
return; return;
}
// If rd_close_ is set then we already sent a close // If rd_close_ is set then we already sent a close
BOOST_ASSERT(! rd_close_); BOOST_ASSERT(! rd_close_);
BOOST_ASSERT(! wr_close_); BOOST_ASSERT(! wr_close_);
@@ -368,8 +357,7 @@ close(close_reason const& cr, error_code& ec)
write_close<flat_static_buffer_base>(fb, cr); write_close<flat_static_buffer_base>(fb, cr);
boost::asio::write(stream_, fb.data(), ec); boost::asio::write(stream_, fb.data(), ec);
} }
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
// Drain the connection // Drain the connection
close_code code{}; close_code code{};
@@ -387,8 +375,7 @@ close(close_reason const& cr, error_code& ec)
stream_.read_some( stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_, rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec); rd_buf_.max_size())), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
rd_buf_.commit(bytes_transferred); rd_buf_.commit(bytes_transferred);
} }
@@ -406,9 +393,11 @@ close(close_reason const& cr, error_code& ec)
detail::mask_inplace(mb, rd_key_); detail::mask_inplace(mb, rd_key_);
detail::read_close(cr_, mb, code); detail::read_close(cr_, mb, code);
if(code != close_code::none) if(code != close_code::none)
{
// Protocol error // Protocol error
return do_fail(close_code::none, return do_fail(close_code::none,
error::failed, ec); error::failed, ec);
}
rd_buf_.consume(clamp(rd_fh_.len)); rd_buf_.consume(clamp(rd_fh_.len));
break; break;
} }
@@ -454,7 +443,8 @@ async_close(close_reason const& cr, CloseHandler&& handler)
void(error_code)> init{handler}; void(error_code)> init{handler};
close_op<handler_type< close_op<handler_type<
CloseHandler, void(error_code)>>{ CloseHandler, void(error_code)>>{
init.completion_handler, *this, cr}({}); init.completion_handler, *this, cr}(
{}, 0, false);
return init.result.get(); return init.result.get();
} }

View File

@@ -1,271 +0,0 @@
//
// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_FAIL_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_FAIL_IPP
#include <boost/beast/websocket/teardown.hpp>
#include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/detail/config.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/handler_alloc_hook.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/optional.hpp>
#include <memory>
#include <type_traits>
#include <utility>
namespace boost {
namespace beast {
namespace websocket {
/*
This composed operation optionally sends a close frame,
then performs the teardown operation.
*/
template<class NextLayer>
template<class Handler>
class stream<NextLayer>::fail_op
: public boost::asio::coroutine
{
struct state
{
stream<NextLayer>& ws;
detail::frame_buffer fb;
std::uint16_t code;
error_code ev;
token tok;
state(
Handler&,
stream<NextLayer>& ws_,
std::uint16_t code_,
error_code ev_)
: ws(ws_)
, code(code_)
, ev(ev_)
, tok(ws.tok_.unique())
{
}
};
handler_ptr<state, Handler> d_;
public:
fail_op(fail_op&&) = default;
fail_op(fail_op const&) = default;
template<class DeducedHandler>
fail_op(
DeducedHandler&& h,
stream<NextLayer>& ws,
std::uint16_t code,
error_code ev)
: d_(std::forward<DeducedHandler>(h),
ws, code, ev)
{
}
void operator()(
error_code ec = {},
std::size_t bytes_transferred = 0);
friend
void* asio_handler_allocate(
std::size_t size, fail_op* op)
{
using boost::asio::asio_handler_allocate;
return asio_handler_allocate(
size, std::addressof(op->d_.handler()));
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, fail_op* op)
{
using boost::asio::asio_handler_deallocate;
asio_handler_deallocate(
p, size, std::addressof(op->d_.handler()));
}
friend
bool asio_handler_is_continuation(fail_op* op)
{
using boost::asio::asio_handler_is_continuation;
return asio_handler_is_continuation(
std::addressof(op->d_.handler()));
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, fail_op* op)
{
using boost::asio::asio_handler_invoke;
asio_handler_invoke(f,
std::addressof(op->d_.handler()));
}
};
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::
fail_op<Handler>::
operator()(error_code ec, std::size_t)
{
auto& d = *d_;
BOOST_ASIO_CORO_REENTER(*this)
{
// Maybe suspend
if(d.code != close_code::none && ! d.ws.wr_close_)
{
if(! d.ws.wr_block_)
{
// Acquire the write block
d.ws.wr_block_ = d.tok;
// Make sure the stream is open
if(! d.ws.open_)
{
BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted));
goto upcall;
}
}
else
{
// Suspend
BOOST_ASSERT(d.ws.wr_block_ != d.tok);
BOOST_ASIO_CORO_YIELD
d.ws.paused_rd_.emplace(std::move(*this)); // VFALCO emplace to paused_rd_
// Acquire the write block
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = d.tok;
// Resume
BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post(std::move(*this));
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open
if(! d.ws.open_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
}
}
// Serialize close frame
d.ws.template write_close<
flat_static_buffer_base>(
d.fb, d.code);
// Send close frame
d.ws.wr_close_ = true;
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(
d.ws.stream_, d.fb.data(),
std::move(*this));
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.open_ = ! ec;
if(! d.ws.open_)
goto upcall;
}
// Teardown
//BOOST_ASSERT(d.ws.wr_block_ == d.tok);
using beast::websocket::async_teardown;
BOOST_ASIO_CORO_YIELD
async_teardown(d.ws.role_,
d.ws.stream_, std::move(*this));
//BOOST_ASSERT(d.ws.wr_block_ == d.tok);
if(ec == boost::asio::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
if(! ec)
ec = d.ev;
d.ws.open_ = false;
upcall:
if(d.ws.wr_block_ == d.tok)
d.ws.wr_block_.reset();
d_.invoke(ec);
}
}
//------------------------------------------------------------------------------
/* _Fail the WebSocket Connection_
*/
template<class NextLayer>
void
stream<NextLayer>::
do_fail(
std::uint16_t code, // if set, send a close frame first
error_code ev, // error code to use upon success
error_code& ec) // set to the error, else set to ev
{
BOOST_ASSERT(ev);
if(code != close_code::none && ! wr_close_)
{
wr_close_ = true;
detail::frame_buffer fb;
write_close<
flat_static_buffer_base>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
open_ = ! ec;
if(! open_)
return;
}
using beast::websocket::teardown;
teardown(role_, stream_, ec);
if(ec == boost::asio::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
open_ = ! ec;
if(! open_)
return;
ec = ev;
open_ = false;
}
/* _Fail the WebSocket Connection_
*/
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::
do_async_fail(
std::uint16_t code, // if set, send a close frame first
error_code ev, // error code to use upon success
Handler&& handler)
{
fail_op<typename std::decay<Handler>::type>{
std::forward<Handler>(handler),
*this,
code,
ev}();
}
} // websocket
} // beast
} // boost
#endif

View File

@@ -131,12 +131,11 @@ operator()(error_code ec, std::size_t)
d.ws.wr_block_ = d.tok; d.ws.wr_block_ = d.tok;
// Make sure the stream is open // Make sure the stream is open
if(! d.ws.open_) if(d.ws.check_fail(ec))
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post( d.ws.get_io_service().post(
bind_handler(std::move(*this), bind_handler(std::move(*this), ec));
boost::asio::error::operation_aborted));
goto upcall; goto upcall;
} }
} }
@@ -157,19 +156,16 @@ operator()(error_code ec, std::size_t)
BOOST_ASSERT(d.ws.wr_block_ == d.tok); BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open // Make sure the stream is open
if(! d.ws.open_) if(d.ws.check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
goto upcall; goto upcall;
} }
}
// Send ping frame // Send ping frame
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
boost::asio::async_write(d.ws.stream_, boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this)); d.fb.data(), std::move(*this));
if(ec) if(d.ws.check_fail(ec))
d.ws.open_ = false; goto upcall;
upcall: upcall:
BOOST_ASSERT(d.ws.wr_block_ == d.tok); BOOST_ASSERT(d.ws.wr_block_ == d.tok);
@@ -199,16 +195,16 @@ void
stream<NextLayer>:: stream<NextLayer>::
ping(ping_data const& payload, error_code& ec) ping(ping_data const& payload, error_code& ec)
{ {
ec.assign(0, ec.category());
// Make sure the stream is open // Make sure the stream is open
if(! open_) if(check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
return; return;
}
detail::frame_buffer fb; detail::frame_buffer fb;
write_ping<flat_static_buffer_base>( write_ping<flat_static_buffer_base>(
fb, detail::opcode::ping, payload); fb, detail::opcode::ping, payload);
boost::asio::write(stream_, fb.data(), ec); boost::asio::write(stream_, fb.data(), ec);
if(check_fail(ec))
return;
} }
template<class NextLayer> template<class NextLayer>
@@ -227,16 +223,16 @@ void
stream<NextLayer>:: stream<NextLayer>::
pong(ping_data const& payload, error_code& ec) pong(ping_data const& payload, error_code& ec)
{ {
ec.assign(0, ec.category());
// Make sure the stream is open // Make sure the stream is open
if(! open_) if(check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
return; return;
}
detail::frame_buffer fb; detail::frame_buffer fb;
write_ping<flat_static_buffer_base>( write_ping<flat_static_buffer_base>(
fb, detail::opcode::pong, payload); fb, detail::opcode::pong, payload);
boost::asio::write(stream_, fb.data(), ec); boost::asio::write(stream_, fb.data(), ec);
if(check_fail(ec))
return;
} }
template<class NextLayer> template<class NextLayer>

View File

@@ -51,8 +51,9 @@ class stream<NextLayer>::read_some_op
std::size_t bytes_written_ = 0; std::size_t bytes_written_ = 0;
error_code ev_; error_code ev_;
token tok_; token tok_;
bool dispatched_ = false; close_code code_;
bool did_read_ = false; bool did_read_ = false;
bool cont_;
public: public:
read_some_op(read_some_op&&) = default; read_some_op(read_some_op&&) = default;
@@ -67,6 +68,7 @@ public:
, ws_(ws) , ws_(ws)
, cb_(bs) , cb_(bs)
, tok_(ws_.tok_.unique()) , tok_(ws_.tok_.unique())
, code_(close_code::none)
{ {
} }
@@ -78,7 +80,8 @@ public:
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0,
bool cont = true);
friend friend
void* asio_handler_allocate( void* asio_handler_allocate(
@@ -102,8 +105,7 @@ public:
bool asio_handler_is_continuation(read_some_op* op) bool asio_handler_is_continuation(read_some_op* op)
{ {
using boost::asio::asio_handler_is_continuation; using boost::asio::asio_handler_is_continuation;
return op->dispatched_ || return op->cont_ || asio_handler_is_continuation(
asio_handler_is_continuation(
std::addressof(op->h_)); std::addressof(op->h_));
} }
@@ -124,13 +126,15 @@ stream<NextLayer>::
read_some_op<MutableBufferSequence, Handler>:: read_some_op<MutableBufferSequence, Handler>::
operator()( operator()(
error_code ec, error_code ec,
std::size_t bytes_transferred) std::size_t bytes_transferred,
bool cont)
{ {
using beast::detail::clamp; using beast::detail::clamp;
using boost::asio::buffer; using boost::asio::buffer;
using boost::asio::buffer_cast; using boost::asio::buffer_cast;
using boost::asio::buffer_size; using boost::asio::buffer_size;
close_code code{}; close_code code{};
cont_ = cont;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Maybe suspend // Maybe suspend
@@ -140,17 +144,12 @@ operator()(
ws_.rd_block_ = tok_; ws_.rd_block_ = tok_;
// Make sure the stream is open // Make sure the stream is open
if(! ws_.open_) if(ws_.check_fail(ec))
{
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted));
goto upcall; goto upcall;
} }
}
else else
{ {
do_suspend:
// Suspend // Suspend
BOOST_ASSERT(ws_.rd_block_ != tok_); BOOST_ASSERT(ws_.rd_block_ != tok_);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
@@ -164,16 +163,13 @@ operator()(
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(std::move(*this)); ws_.get_io_service().post(std::move(*this));
BOOST_ASSERT(ws_.rd_block_ == tok_); BOOST_ASSERT(ws_.rd_block_ == tok_);
dispatched_ = true;
// Handle the stream closing while suspended // Make sure the stream is open
if(! ws_.open_) if(ws_.check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
goto upcall; goto upcall;
} }
}
loop: loop:
BOOST_ASSERT(ws_.rd_block_ == tok_);
// See if we need to read a frame header. This // See if we need to read a frame header. This
// condition is structured to give the decompressor // condition is structured to give the decompressor
// a chance to emit the final empty deflate block // a chance to emit the final empty deflate block
@@ -188,19 +184,31 @@ operator()(
if(code != close_code::none) if(code != close_code::none)
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
ec = error::failed; code_ = code;
ev_ = error::failed;
goto close; goto close;
} }
BOOST_ASSERT(ws_.rd_block_ == tok_);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some( ws_.stream_.async_read_some(
ws_.rd_buf_.prepare(read_size( ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())), ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this)); std::move(*this));
dispatched_ = true; BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.open_ = ! ec; if(ws_.check_fail(ec))
if(! ws_.open_)
goto upcall; goto upcall;
ws_.rd_buf_.commit(bytes_transferred); ws_.rd_buf_.commit(bytes_transferred);
// Allow a close operation to
// drain the connection if necessary.
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
if( ws_.paused_r_close_.maybe_invoke())
{
BOOST_ASSERT(ws_.rd_block_);
goto do_suspend;
}
ws_.rd_block_ = tok_;
} }
// Immediately apply the mask to the portion // Immediately apply the mask to the portion
// of the buffer holding payload data. // of the buffer holding payload data.
@@ -258,20 +266,18 @@ operator()(
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(std::move(*this)); ws_.get_io_service().post(std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
// Make sure the stream is open // Make sure the stream is open
if(! ws_.open_) if(ws_.check_fail(ec))
{
ws_.wr_block_.reset();
ec = boost::asio::error::operation_aborted;
goto upcall; goto upcall;
}
// Ignore ping when closing // Ignore ping when closing
if(ws_.wr_close_) if(ws_.wr_close_)
{ {
ws_.wr_block_.reset(); ws_.wr_block_.reset();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
goto loop; goto loop;
} }
} }
@@ -282,11 +288,12 @@ operator()(
boost::asio::async_write(ws_.stream_, boost::asio::async_write(ws_.stream_,
ws_.rd_fb_.data(), std::move(*this)); ws_.rd_fb_.data(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true; if(ws_.check_fail(ec))
ws_.wr_block_.reset();
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall; goto upcall;
ws_.wr_block_.reset();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
goto loop; goto loop;
} }
// Handle pong frame // Handle pong frame
@@ -319,7 +326,8 @@ operator()(
if(code != close_code::none) if(code != close_code::none)
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
ec = error::failed; code_ = code;
ev_ = error::failed;
goto close; goto close;
} }
ws_.cr_ = cr; ws_.cr_ = cr;
@@ -330,15 +338,15 @@ operator()(
if(! ws_.wr_close_) if(! ws_.wr_close_)
{ {
// _Start the WebSocket Closing Handshake_ // _Start the WebSocket Closing Handshake_
code = cr.code == close_code::none ? code_ = cr.code == close_code::none ?
close_code::normal : close_code::normal :
static_cast<close_code>(cr.code); static_cast<close_code>(cr.code);
ec = error::closed; ev_ = error::closed;
goto close; goto close;
} }
// _Close the WebSocket Connection_ // _Close the WebSocket Connection_
code = close_code::none; code_ = close_code::none;
ec = error::closed; ev_ = error::closed;
goto close; goto close;
} }
} }
@@ -364,9 +372,7 @@ operator()(
ws_.rd_buf_.prepare(read_size( ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())), ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this)); std::move(*this));
dispatched_ = true; if(ws_.check_fail(ec))
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall; goto upcall;
ws_.rd_buf_.commit(bytes_transferred); ws_.rd_buf_.commit(bytes_transferred);
if(ws_.rd_fh_.mask) if(ws_.rd_fh_.mask)
@@ -390,8 +396,8 @@ operator()(
! ws_.rd_utf8_.finish())) ! ws_.rd_utf8_.finish()))
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
code = close_code::bad_payload; code_ = close_code::bad_payload;
ec = error::failed; ev_ = error::failed;
goto close; goto close;
} }
} }
@@ -407,9 +413,7 @@ operator()(
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(buffer_prefix( ws_.stream_.async_read_some(buffer_prefix(
clamp(ws_.rd_remain_), cb_), std::move(*this)); clamp(ws_.rd_remain_), cb_), std::move(*this));
dispatched_ = true; if(ws_.check_fail(ec))
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall; goto upcall;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix( auto const mb = buffer_prefix(
@@ -424,8 +428,8 @@ operator()(
! ws_.rd_utf8_.finish())) ! ws_.rd_utf8_.finish()))
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
code = close_code::bad_payload; code_ = close_code::bad_payload;
ec = error::failed; ev_ = error::failed;
goto close; goto close;
} }
} }
@@ -434,7 +438,6 @@ operator()(
} }
} }
ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin; ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
goto upcall;
} }
else else
{ {
@@ -453,8 +456,7 @@ operator()(
ws_.rd_buf_.prepare(read_size( ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())), ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this)); std::move(*this));
ws_.open_ = ! ec; if(ws_.check_fail(ec))
if(! ws_.open_)
goto upcall; goto upcall;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
ws_.rd_buf_.commit(bytes_transferred); ws_.rd_buf_.commit(bytes_transferred);
@@ -496,10 +498,8 @@ operator()(
zs.next_in = empty_block; zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block); zs.avail_in = sizeof(empty_block);
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(! ec); if(ws_.check_fail(ec))
ws_.open_ = ! ec; goto upcall;
if(! ws_.open_)
break;
// VFALCO See: // VFALCO See:
// https://github.com/madler/zlib/issues/280 // https://github.com/madler/zlib/issues/280
BOOST_ASSERT(zs.total_out == 0); BOOST_ASSERT(zs.total_out == 0);
@@ -521,15 +521,14 @@ operator()(
} }
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream); BOOST_ASSERT(ec != zlib::error::end_of_stream);
ws_.open_ = ! ec; if(ws_.check_fail(ec))
if(! ws_.open_) goto upcall;
break;
if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
ws_.rd_size_, zs.total_out, ws_.rd_msg_max_)) ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
code = close_code::too_big; code_ = close_code::too_big;
ec = error::failed; ev_ = error::failed;
goto close; goto close;
} }
cb_.consume(zs.total_out); cb_.consume(zs.total_out);
@@ -546,40 +545,102 @@ operator()(
ws_.rd_done_ && ! ws_.rd_utf8_.finish())) ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
{ {
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
code = close_code::bad_payload; code_ = close_code::bad_payload;
ec = error::failed; ev_ = error::failed;
goto close; goto close;
} }
} }
}
goto upcall;
close:
if(! ws_.wr_block_)
{
// Acquire the write block
ws_.wr_block_ = tok_;
// Make sure the stream is open
BOOST_ASSERT(ws_.open_);
}
else
{
// Suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.paused_rd_.save(std::move(*this));
// Acquire the write block
BOOST_ASSERT(! ws_.wr_block_);
ws_.wr_block_ = tok_;
// Resume
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(ws_.check_fail(ec))
goto upcall; goto upcall;
} }
close: if(! ws_.wr_close_)
// Maybe send close frame, then teardown {
ws_.wr_close_ = true;
// Serialize close frame
ws_.rd_fb_.reset();
ws_.template write_close<
flat_static_buffer_base>(
ws_.rd_fb_, code_);
// Send close frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
ws_.do_async_fail(code, ec, std::move(*this)); boost::asio::async_write(
//BOOST_ASSERT(! ws_.wr_block_); ws_.stream_, ws_.rd_fb_.data(),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(ws_.check_fail(ec))
goto upcall; goto upcall;
}
// Teardown
using beast::websocket::async_teardown;
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
async_teardown(ws_.role_,
ws_.stream_, std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec == boost::asio::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
if(! ec)
ec = ev_;
ws_.open_ = false;
upcall: upcall:
BOOST_ASSERT(ws_.rd_block_ == tok_); BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset(); ws_.rd_block_.reset();
ws_.paused_r_close_.maybe_invoke(); ws_.paused_r_close_.maybe_invoke();
if(ws_.wr_block_ == tok_)
{
ws_.wr_block_.reset();
ws_.paused_close_.maybe_invoke() || ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() || ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke(); ws_.paused_wr_.maybe_invoke();
if(! dispatched_) }
{ if(! cont_)
ws_.stream_.get_io_service().post( return ws_.stream_.get_io_service().post(
bind_handler(std::move(h_), bind_handler(std::move(h_),
ec, bytes_written_)); ec, bytes_written_));
}
else
{
h_(ec, bytes_written_); h_(ec, bytes_written_);
} }
} }
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -693,7 +754,8 @@ operator()(
} }
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
read_some_op<buffers_type, read_op>{ read_some_op<buffers_type, read_op>{
std::move(*this), ws_, *mb}(); std::move(*this), ws_, *mb}(
{}, 0, true);
if(ec) if(ec)
break; break;
b_.commit(bytes_transferred); b_.commit(bytes_transferred);
@@ -892,12 +954,10 @@ read_some(
using boost::asio::buffer_size; using boost::asio::buffer_size;
close_code code{}; close_code code{};
std::size_t bytes_written = 0; std::size_t bytes_written = 0;
ec.assign(0, ec.category());
// Make sure the stream is open // Make sure the stream is open
if(! open_) if(check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
return 0; return 0;
}
loop: loop:
// See if we need to read a frame header. This // See if we need to read a frame header. This
// condition is structured to give the decompressor // condition is structured to give the decompressor
@@ -919,8 +979,7 @@ loop:
rd_buf_.prepare(read_size( rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())), rd_buf_, rd_buf_.max_size())),
ec); ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return bytes_written; return bytes_written;
rd_buf_.commit(bytes_transferred); rd_buf_.commit(bytes_transferred);
} }
@@ -959,8 +1018,7 @@ loop:
write_ping<flat_static_buffer_base>(fb, write_ping<flat_static_buffer_base>(fb,
detail::opcode::pong, payload); detail::opcode::pong, payload);
boost::asio::write(stream_, fb.data(), ec); boost::asio::write(stream_, fb.data(), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return bytes_written; return bytes_written;
goto loop; goto loop;
} }
@@ -1024,8 +1082,7 @@ loop:
rd_buf_.commit(stream_.read_some( rd_buf_.commit(stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_, rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec)); rd_buf_.max_size())), ec));
open_ = ! ec; if(check_fail(ec))
if(! open_)
return bytes_written; return bytes_written;
if(rd_fh_.mask) if(rd_fh_.mask)
detail::mask_inplace( detail::mask_inplace(
@@ -1068,8 +1125,7 @@ loop:
auto const bytes_transferred = auto const bytes_transferred =
stream_.read_some(buffer_prefix( stream_.read_some(buffer_prefix(
clamp(rd_remain_), buffers), ec); clamp(rd_remain_), buffers), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return bytes_written; return bytes_written;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix( auto const mb = buffer_prefix(
@@ -1131,8 +1187,7 @@ loop:
rd_buf_.prepare(read_size( rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())), rd_buf_, rd_buf_.max_size())),
ec); ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return bytes_written; return bytes_written;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
rd_buf_.commit(bytes_transferred); rd_buf_.commit(bytes_transferred);
@@ -1162,8 +1217,7 @@ loop:
zs.avail_in = sizeof(empty_block); zs.avail_in = sizeof(empty_block);
pmd_->zi.write(zs, zlib::Flush::sync, ec); pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(! ec); BOOST_ASSERT(! ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return bytes_written; return bytes_written;
// VFALCO See: // VFALCO See:
// https://github.com/madler/zlib/issues/280 // https://github.com/madler/zlib/issues/280
@@ -1186,8 +1240,7 @@ loop:
} }
pmd_->zi.write(zs, zlib::Flush::sync, ec); pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream); BOOST_ASSERT(ec != zlib::error::end_of_stream);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return bytes_written; return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds( if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_size_, zs.total_out, rd_msg_max_)) rd_size_, zs.total_out, rd_msg_max_))
@@ -1237,7 +1290,7 @@ async_read_some(
read_some_op<MutableBufferSequence, handler_type< read_some_op<MutableBufferSequence, handler_type<
ReadHandler, void(error_code, std::size_t)>>{ ReadHandler, void(error_code, std::size_t)>>{
init.completion_handler,*this, buffers}( init.completion_handler,*this, buffers}(
{}, 0); {}, 0, false);
return init.result.get(); return init.result.get();
} }

View File

@@ -440,9 +440,15 @@ write_close(DynamicBuffer& db, close_reason const& cr)
fh.rsv3 = false; fh.rsv3 = false;
fh.len = cr.code == close_code::none ? fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size(); 0 : 2 + cr.reason.size();
fh.mask = role_ == role_type::client; if(role_ == role_type::client)
if(fh.mask) {
fh.mask = true;
fh.key = wr_gen_(); fh.key = wr_gen_();
}
else
{
fh.mask = false;
}
detail::write(db, fh); detail::write(db, fh);
if(cr.code != close_code::none) if(cr.code != close_code::none)
{ {
@@ -668,7 +674,41 @@ on_response(response_type const& res,
open(role_type::client); open(role_type::client);
} }
//------------------------------------------------------------------------------ // _Fail the WebSocket Connection_
template<class NextLayer>
void
stream<NextLayer>::
do_fail(
std::uint16_t code, // if set, send a close frame first
error_code ev, // error code to use upon success
error_code& ec) // set to the error, else set to ev
{
BOOST_ASSERT(ev);
if(code != close_code::none && ! wr_close_)
{
wr_close_ = true;
detail::frame_buffer fb;
write_close<
flat_static_buffer_base>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
open_ = ! ec;
if(! open_)
return;
}
using beast::websocket::teardown;
teardown(role_, stream_, ec);
if(ec == boost::asio::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
open_ = ! ec;
if(! open_)
return;
ec = ev;
open_ = false;
}
} // websocket } // websocket
} // beast } // beast

View File

@@ -49,6 +49,7 @@ class stream<NextLayer>::write_some_op
int how_; int how_;
bool fin_; bool fin_;
bool more_; bool more_;
bool cont_;
public: public:
write_some_op(write_some_op&&) = default; write_some_op(write_some_op&&) = default;
@@ -74,17 +75,10 @@ public:
return h_; return h_;
} }
void operator()(
error_code ec,
std::size_t bytes_transferred,
bool)
{
(*this)(ec, bytes_transferred);
}
void operator()( void operator()(
error_code ec = {}, error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0,
bool cont = true);
friend friend
void* asio_handler_allocate( void* asio_handler_allocate(
@@ -108,7 +102,7 @@ public:
bool asio_handler_is_continuation(write_some_op* op) bool asio_handler_is_continuation(write_some_op* op)
{ {
using boost::asio::asio_handler_is_continuation; using boost::asio::asio_handler_is_continuation;
return asio_handler_is_continuation( return op->cont_ || asio_handler_is_continuation(
std::addressof(op->h_)); std::addressof(op->h_));
} }
@@ -127,8 +121,10 @@ template<class Buffers, class Handler>
void void
stream<NextLayer>:: stream<NextLayer>::
write_some_op<Buffers, Handler>:: write_some_op<Buffers, Handler>::
operator()(error_code ec, operator()(
std::size_t bytes_transferred) error_code ec,
std::size_t bytes_transferred,
bool cont)
{ {
using beast::detail::clamp; using beast::detail::clamp;
using boost::asio::buffer; using boost::asio::buffer;
@@ -145,7 +141,7 @@ operator()(error_code ec,
}; };
std::size_t n; std::size_t n;
boost::asio::mutable_buffer b; boost::asio::mutable_buffer b;
cont_ = cont;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
// Set up the outgoing frame header // Set up the outgoing frame header
@@ -203,7 +199,6 @@ operator()(error_code ec,
} }
} }
do_maybe_suspend:
// Maybe suspend // Maybe suspend
if(! ws_.wr_block_) if(! ws_.wr_block_)
{ {
@@ -211,17 +206,12 @@ operator()(error_code ec,
ws_.wr_block_ = tok_; ws_.wr_block_ = tok_;
// Make sure the stream is open // Make sure the stream is open
if(! ws_.open_) if(ws_.check_fail(ec))
{
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted));
goto upcall; goto upcall;
} }
}
else else
{ {
do_suspend:
// Suspend // Suspend
BOOST_ASSERT(ws_.wr_block_ != tok_); BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
@@ -237,12 +227,9 @@ operator()(error_code ec,
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open // Make sure the stream is open
if(! ws_.open_) if(ws_.check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
goto upcall; goto upcall;
} }
}
//------------------------------------------------------------------ //------------------------------------------------------------------
@@ -261,8 +248,8 @@ operator()(error_code ec,
buffer_cat(ws_.wr_fb_.data(), cb_), buffer_cat(ws_.wr_fb_.data(), cb_),
std::move(*this)); std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec) if(ws_.check_fail(ec))
ws_.open_ = false; goto upcall;
goto upcall; goto upcall;
} }
@@ -288,13 +275,10 @@ operator()(error_code ec,
clamp(fh_.len), cb_)), clamp(fh_.len), cb_)),
std::move(*this)); std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec) if(ws_.check_fail(ec))
{
ws_.open_ = false;
goto upcall; goto upcall;
}
if(remain_ == 0) if(remain_ == 0)
goto upcall; break;
cb_.consume( cb_.consume(
bytes_transferred - ws_.wr_fb_.size()); bytes_transferred - ws_.wr_fb_.size());
fh_.op = detail::opcode::cont; fh_.op = detail::opcode::cont;
@@ -305,13 +289,12 @@ operator()(error_code ec,
ws_.paused_rd_.maybe_invoke() || ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke()) ws_.paused_ping_.maybe_invoke())
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASSERT(ws_.wr_block_);
ws_.get_io_service().post( goto do_suspend;
std::move(*this));
goto do_maybe_suspend;
} }
ws_.wr_block_ = tok_; ws_.wr_block_ = tok_;
} }
goto upcall;
} }
//------------------------------------------------------------------ //------------------------------------------------------------------
@@ -341,11 +324,8 @@ operator()(error_code ec,
buffer(ws_.wr_buf_.get(), n)), buffer(ws_.wr_buf_.get(), n)),
std::move(*this)); std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec) if(ws_.check_fail(ec))
{
ws_.open_ = false;
goto upcall; goto upcall;
}
while(remain_ > 0) while(remain_ > 0)
{ {
cb_.consume(ws_.wr_buf_size_); cb_.consume(ws_.wr_buf_size_);
@@ -362,12 +342,9 @@ operator()(error_code ec,
buffer(ws_.wr_buf_.get(), n), buffer(ws_.wr_buf_.get(), n),
std::move(*this)); std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec) if(ws_.check_fail(ec))
{
ws_.open_ = false;
goto upcall; goto upcall;
} }
}
goto upcall; goto upcall;
} }
@@ -399,13 +376,10 @@ operator()(error_code ec,
buffer(ws_.wr_buf_.get(), n)), buffer(ws_.wr_buf_.get(), n)),
std::move(*this)); std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec) if(ws_.check_fail(ec))
{
ws_.open_ = false;
goto upcall; goto upcall;
}
if(remain_ == 0) if(remain_ == 0)
goto upcall; break;
cb_.consume( cb_.consume(
bytes_transferred - ws_.wr_fb_.size()); bytes_transferred - ws_.wr_fb_.size());
fh_.op = detail::opcode::cont; fh_.op = detail::opcode::cont;
@@ -416,13 +390,12 @@ operator()(error_code ec,
ws_.paused_rd_.maybe_invoke() || ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke()) ws_.paused_ping_.maybe_invoke())
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASSERT(ws_.wr_block_);
ws_.get_io_service().post( goto do_suspend;
std::move(*this));
goto do_maybe_suspend;
} }
ws_.wr_block_ = tok_; ws_.wr_block_ = tok_;
} }
goto upcall;
} }
//------------------------------------------------------------------ //------------------------------------------------------------------
@@ -435,15 +408,8 @@ operator()(error_code ec,
ws_.wr_buf_size_); ws_.wr_buf_size_);
more_ = detail::deflate( more_ = detail::deflate(
ws_.pmd_->zo, b, cb_, fin_, ec); ws_.pmd_->zo, b, cb_, fin_, ec);
ws_.open_ = ! ec; if(ws_.check_fail(ec))
if(! ws_.open_)
{
// Always dispatching is easiest
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
bind_handler(std::move(*this), ec));
goto upcall; goto upcall;
}
n = buffer_size(b); n = buffer_size(b);
if(n == 0) if(n == 0)
{ {
@@ -452,14 +418,6 @@ operator()(error_code ec,
// latency. // latency.
BOOST_ASSERT(! fin_); BOOST_ASSERT(! fin_);
BOOST_ASSERT(buffer_size(cb_) == 0); BOOST_ASSERT(buffer_size(cb_) == 0);
// We can skip the dispatch if the
// asynchronous initiation function is
// not on call stack but its hard to
// figure out so be safe and dispatch.
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
std::move(*this));
goto upcall; goto upcall;
} }
if(fh_.mask) if(fh_.mask)
@@ -482,11 +440,8 @@ operator()(error_code ec,
buffer_cat(ws_.wr_fb_.data(), buffer_cat(ws_.wr_fb_.data(),
mutable_buffers_1{b}), std::move(*this)); mutable_buffers_1{b}), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec) if(ws_.check_fail(ec))
{
ws_.open_ = false;
goto upcall; goto upcall;
}
if(more_) if(more_)
{ {
fh_.op = detail::opcode::cont; fh_.op = detail::opcode::cont;
@@ -498,10 +453,8 @@ operator()(error_code ec,
ws_.paused_rd_.maybe_invoke() || ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke()) ws_.paused_ping_.maybe_invoke())
{ {
BOOST_ASIO_CORO_YIELD BOOST_ASSERT(ws_.wr_block_);
ws_.get_io_service().post( goto do_suspend;
std::move(*this));
goto do_maybe_suspend;
} }
ws_.wr_block_ = tok_; ws_.wr_block_ = tok_;
} }
@@ -527,6 +480,9 @@ operator()(error_code ec,
ws_.paused_close_.maybe_invoke() || ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() || ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke(); ws_.paused_ping_.maybe_invoke();
if(! cont_)
return ws_.stream_.get_io_service().post(
bind_handler(h_, ec));
h_(ec); h_(ec);
} }
} }
@@ -566,12 +522,10 @@ write_some(bool fin,
using boost::asio::buffer; using boost::asio::buffer;
using boost::asio::buffer_copy; using boost::asio::buffer_copy;
using boost::asio::buffer_size; using boost::asio::buffer_size;
ec.assign(0, ec.category());
// Make sure the stream is open // Make sure the stream is open
if(! open_) if(check_fail(ec))
{
ec = boost::asio::error::operation_aborted;
return; return;
}
detail::frame_header fh; detail::frame_header fh;
if(! wr_cont_) if(! wr_cont_)
{ {
@@ -627,8 +581,7 @@ write_some(bool fin,
wr_cont_ = ! fin; wr_cont_ = ! fin;
boost::asio::write(stream_, boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec); buffer_cat(fh_buf.data(), b), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
if(! more) if(! more)
break; break;
@@ -641,9 +594,8 @@ write_some(bool fin,
(role_ == role_type::server && (role_ == role_type::server &&
pmd_config_.server_no_context_takeover))) pmd_config_.server_no_context_takeover)))
pmd_->zo.reset(); pmd_->zo.reset();
return;
} }
if(! fh.mask) else if(! fh.mask)
{ {
if(! wr_frag_) if(! wr_frag_)
{ {
@@ -656,8 +608,7 @@ write_some(bool fin,
wr_cont_ = ! fin; wr_cont_ = ! fin;
boost::asio::write(stream_, boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffers), ec); buffer_cat(fh_buf.data(), buffers), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
} }
else else
@@ -679,8 +630,7 @@ write_some(bool fin,
boost::asio::write(stream_, boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffer_cat(fh_buf.data(),
buffer_prefix(n, cb)), ec); buffer_prefix(n, cb)), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
if(remain == 0) if(remain == 0)
break; break;
@@ -688,9 +638,8 @@ write_some(bool fin,
cb.consume(n); cb.consume(n);
} }
} }
return;
} }
if(! wr_frag_) else if(! wr_frag_)
{ {
// mask, no autofrag // mask, no autofrag
fh.fin = fin; fh.fin = fin;
@@ -713,8 +662,7 @@ write_some(bool fin,
wr_cont_ = ! fin; wr_cont_ = ! fin;
boost::asio::write(stream_, boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec); buffer_cat(fh_buf.data(), b), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
} }
while(remain > 0) while(remain > 0)
@@ -726,12 +674,11 @@ write_some(bool fin,
remain -= n; remain -= n;
detail::mask_inplace(b, key); detail::mask_inplace(b, key);
boost::asio::write(stream_, b, ec); boost::asio::write(stream_, b, ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
} }
return;
} }
else
{ {
// mask, autofrag // mask, autofrag
BOOST_ASSERT(wr_buf_size_ != 0); BOOST_ASSERT(wr_buf_size_ != 0);
@@ -755,15 +702,13 @@ write_some(bool fin,
flat_static_buffer_base>(fh_buf, fh); flat_static_buffer_base>(fh_buf, fh);
boost::asio::write(stream_, boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec); buffer_cat(fh_buf.data(), b), ec);
open_ = ! ec; if(check_fail(ec))
if(! open_)
return; return;
if(remain == 0) if(remain == 0)
break; break;
fh.op = detail::opcode::cont; fh.op = detail::opcode::cont;
cb.consume(n); cb.consume(n);
} }
return;
} }
} }
@@ -784,7 +729,7 @@ async_write_some(bool fin,
void(error_code)> init{handler}; void(error_code)> init{handler};
write_some_op<ConstBufferSequence, handler_type< write_some_op<ConstBufferSequence, handler_type<
WriteHandler, void(error_code)>>{init.completion_handler, WriteHandler, void(error_code)>>{init.completion_handler,
*this, fin, bs}(); *this, fin, bs}({}, 0, false);
return init.result.get(); return init.result.get();
} }
@@ -838,7 +783,7 @@ async_write(
void(error_code)> init{handler}; void(error_code)> init{handler};
write_some_op<ConstBufferSequence, handler_type< write_some_op<ConstBufferSequence, handler_type<
WriteHandler, void(error_code)>>{init.completion_handler, WriteHandler, void(error_code)>>{init.completion_handler,
*this, true, bs}(); *this, true, bs}({}, 0, false);
return init.result.get(); return init.result.get();
} }

View File

@@ -31,6 +31,7 @@
#include <boost/beast/http/detail/type_traits.hpp> #include <boost/beast/http/detail/type_traits.hpp>
#include <boost/beast/zlib/deflate_stream.hpp> #include <boost/beast/zlib/deflate_stream.hpp>
#include <boost/beast/zlib/inflate_stream.hpp> #include <boost/beast/zlib/inflate_stream.hpp>
#include <boost/asio/error.hpp>
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <functional> #include <functional>
@@ -150,7 +151,7 @@ class stream
struct pmd_t struct pmd_t
{ {
// `true` if current read message is compressed // `true` if current read message is compressed
bool rd_set; bool rd_set = false;
zlib::deflate_stream zo; zlib::deflate_stream zo;
zlib::inflate_stream zi; zlib::inflate_stream zi;
@@ -165,37 +166,50 @@ class stream
std::size_t rd_msg_max_ // max message size std::size_t rd_msg_max_ // max message size
= 16 * 1024 * 1024; = 16 * 1024 * 1024;
std::uint64_t rd_size_; // total size of current message so far std::uint64_t rd_size_ // total size of current message so far
std::uint64_t rd_remain_; // message frame bytes left in current frame = 0;
std::uint64_t rd_remain_ // message frame bytes left in current frame
= 0;
detail::frame_header rd_fh_; // current frame header detail::frame_header rd_fh_; // current frame header
detail::prepared_key rd_key_; // current stateful mask key detail::prepared_key rd_key_ // current stateful mask key
= 0;
detail::frame_buffer rd_fb_; // to write control frames (during reads) detail::frame_buffer rd_fb_; // to write control frames (during reads)
detail::utf8_checker rd_utf8_; // to validate utf8 detail::utf8_checker rd_utf8_; // to validate utf8
static_buffer< static_buffer<
+tcp_frame_size> rd_buf_; // buffer for reads +tcp_frame_size> rd_buf_; // buffer for reads
detail::opcode rd_op_; // current message binary or text detail::opcode rd_op_ // current message binary or text
bool rd_cont_; // `true` if the next frame is a continuation = detail::opcode::text;
bool rd_done_; // set when a message is done bool rd_cont_ // `true` if the next frame is a continuation
bool rd_close_; // did we read a close frame? = false;
bool rd_done_ // set when a message is done
= true;
bool rd_close_ // did we read a close frame?
= false;
token rd_block_; // op currenly reading token rd_block_; // op currenly reading
token tok_; // used to order asynchronous ops token tok_; // used to order asynchronous ops
role_type role_; // server or client role_type role_ // server or client
= role_type::client;
bool open_ // `true` if connected bool open_ // `true` if connected
= false; = false;
token wr_block_; // op currenly writing token wr_block_; // op currenly writing
bool wr_close_; // did we write a close frame? bool wr_close_ // did we write a close frame?
bool wr_cont_; // next write is a continuation = false;
bool wr_frag_; // autofrag the current message bool wr_cont_ // next write is a continuation
= false;
bool wr_frag_ // autofrag the current message
= false;
bool wr_frag_opt_ // autofrag option setting bool wr_frag_opt_ // autofrag option setting
= true; = true;
bool wr_compress_; // compress current message bool wr_compress_ // compress current message
= false;
detail::opcode wr_opcode_ // message type detail::opcode wr_opcode_ // message type
= detail::opcode::text; = detail::opcode::text;
std::unique_ptr< std::unique_ptr<
std::uint8_t[]> wr_buf_; // write buffer std::uint8_t[]> wr_buf_; // write buffer
std::size_t wr_buf_size_; // write buffer size (current message) std::size_t wr_buf_size_ // write buffer size (current message)
= 0;
std::size_t wr_buf_opt_ // write buffer size option setting std::size_t wr_buf_opt_ // write buffer size option setting
= 4096; = 4096;
detail::fh_buffer wr_fb_; // header buffer used for writes detail::fh_buffer wr_fb_; // header buffer used for writes
@@ -344,6 +358,17 @@ public:
// //
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
/** Returns `true` if the stream is open.
The stream is open after a successful handshake, and when
no error has occurred.
*/
bool
is_open() const
{
return open_;
}
/** Returns `true` if the latest message data indicates binary. /** Returns `true` if the latest message data indicates binary.
This function informs the caller of whether the last This function informs the caller of whether the last
@@ -3368,6 +3393,22 @@ private:
void reset(); void reset();
void begin_msg(); void begin_msg();
bool
check_fail(error_code& ec)
{
if(! open_)
{
ec = boost::asio::error::operation_aborted;
return true;
}
if(ec)
{
open_ = false;
return true;
}
return false;
}
template<class DynamicBuffer> template<class DynamicBuffer>
bool bool
parse_fh(detail::frame_header& fh, parse_fh(detail::frame_header& fh,
@@ -3424,13 +3465,6 @@ private:
std::uint16_t code, std::uint16_t code,
error_code ev, error_code ev,
error_code& ec); error_code& ec);
template<class Handler>
void
do_async_fail(
std::uint16_t code,
error_code ev,
Handler&& handler);
}; };
} // websocket } // websocket
@@ -3439,7 +3473,6 @@ private:
#include <boost/beast/websocket/impl/accept.ipp> #include <boost/beast/websocket/impl/accept.ipp>
#include <boost/beast/websocket/impl/close.ipp> #include <boost/beast/websocket/impl/close.ipp>
#include <boost/beast/websocket/impl/fail.ipp>
#include <boost/beast/websocket/impl/handshake.ipp> #include <boost/beast/websocket/impl/handshake.ipp>
#include <boost/beast/websocket/impl/ping.ipp> #include <boost/beast/websocket/impl/ping.ipp>
#include <boost/beast/websocket/impl/read.ipp> #include <boost/beast/websocket/impl/read.ipp>

View File

@@ -20,7 +20,9 @@ add_executable (tests-beast-websocket
accept.cpp accept.cpp
close.cpp close.cpp
error.cpp error.cpp
frame.cpp
handshake.cpp handshake.cpp
mask.cpp
option.cpp option.cpp
ping.cpp ping.cpp
read.cpp read.cpp
@@ -28,8 +30,6 @@ add_executable (tests-beast-websocket
role.cpp role.cpp
stream.cpp stream.cpp
teardown.cpp teardown.cpp
frame.cpp
mask.cpp
utf8_checker.cpp utf8_checker.cpp
write.cpp write.cpp
) )

View File

@@ -11,16 +11,16 @@ local SOURCES =
accept.cpp accept.cpp
close.cpp close.cpp
error.cpp error.cpp
frame.cpp
handshake.cpp handshake.cpp
mask.cpp
option.cpp option.cpp
ping.cpp ping.cpp
read.cpp read.cpp
rfc6455.cpp rfc6455.cpp
role.cpp
stream.cpp stream.cpp
teardown.cpp teardown.cpp
frame.cpp
mask.cpp
role.cpp
utf8_checker.cpp utf8_checker.cpp
write.cpp write.cpp
; ;

View File

@@ -51,7 +51,7 @@ public:
}(); }();
// request in stream // request in stream
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -95,7 +95,7 @@ public:
} }
// request in stream, decorator // request in stream, decorator
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -141,7 +141,7 @@ public:
} }
// request in buffers // request in buffers
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -183,7 +183,7 @@ public:
} }
// request in buffers, decorator // request in buffers, decorator
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -228,7 +228,7 @@ public:
} }
// request in buffers and stream // request in buffers and stream
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -273,7 +273,7 @@ public:
} }
// request in buffers and stream, decorator // request in buffers and stream, decorator
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -320,7 +320,7 @@ public:
} }
// request in message // request in message
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -337,7 +337,7 @@ public:
}); });
// request in message, decorator // request in message, decorator
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -357,7 +357,7 @@ public:
}); });
// request in message, close frame in stream // request in message, close frame in stream
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());
@@ -386,7 +386,7 @@ public:
}); });
// failed handshake (missing Sec-WebSocket-Key) // failed handshake (missing Sec-WebSocket-Key)
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
stream<test::stream&> ws{ts}; stream<test::stream&> ws{ts};
auto tr = connect(ws.next_layer()); auto tr = connect(ws.next_layer());

View File

@@ -144,109 +144,213 @@ public:
{ {
doTestClose(AsyncClient{yield}); doTestClose(AsyncClient{yield});
}); });
// suspend on write
{
echo_server es{log};
error_code ec;
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/", ec);
BEAST_EXPECTS(! ec, ec.message());
std::size_t count = 0;
ws.async_ping("",
[&](error_code ec)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
});
BEAST_EXPECT(ws.wr_block_);
ws.async_close({},
[&](error_code ec)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
});
ios.run();
BEAST_EXPECT(count == 2);
}
// suspend on read
{
echo_server es{log};
error_code ec;
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/", ec);
BEAST_EXPECTS(! ec, ec.message());
flat_buffer b;
std::size_t count = 0;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
BEAST_EXPECTS(
ec == error::closed, ec.message());
});
BEAST_EXPECT(ws.rd_block_);
ws.async_close({},
[&](error_code ec)
{
++count;
BEAST_EXPECTS(
ec == boost::asio::error::operation_aborted,
ec.message());
});
BEAST_EXPECT(ws.wr_close_);
ios.run();
BEAST_EXPECT(count == 2);
}
} }
void void
testCloseSuspend() testCloseSuspend()
{ {
echo_server es{log, kind::async}; // suspend on ping
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios; boost::asio::io_service ios;
stream<test::stream> ws{ios}; stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream()); ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/"); ws.handshake("localhost", "/");
// Cause close to be received
es.async_close();
multi_buffer b;
std::size_t count = 0; std::size_t count = 0;
ws.async_ping("",
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_);
BEAST_EXPECT(count == 0);
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on write
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
ws.async_write(sbuf("*"),
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_);
BEAST_EXPECT(count == 0);
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read ping + message
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add a ping and message to the input
ws.next_layer().append(string_view{
"\x89\x00" "\x81\x01*", 5});
std::size_t count = 0;
multi_buffer b;
ws.async_read(b, ws.async_read(b,
[&](error_code ec, std::size_t) [&](error_code ec, std::size_t)
{ {
++count; ++count;
BEAST_EXPECTS(ec == error::closed, if(ec)
ec.message()); BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
while(! ws.wr_block_) while(! ws.wr_block_)
{
ios.run_one(); ios.run_one();
// try to close if(! BEAST_EXPECT(! ios.stopped()))
ws.async_close("payload", break;
}
BEAST_EXPECT(count == 0);
ws.async_close({},
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(ec == boost::asio:: if(ec)
error::operation_aborted, BOOST_THROW_EXCEPTION(
ec.message()); system_error{ec});
}); });
static std::size_t constexpr limit = 100; BEAST_EXPECT(count == 0);
std::size_t n;
for(n = 0; n < limit; ++n)
{
if(count >= 2)
break;
ios.run_one();
}
BEAST_EXPECT(n < limit);
ios.run(); ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read bad message
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add an invalid frame to the input
ws.next_layer().append(string_view{
"\x09\x00", 2});
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::failed)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_)
{
ios.run_one();
if(! BEAST_EXPECT(! ios.stopped()))
break;
}
BEAST_EXPECT(count == 0);
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read close #1
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add a close frame to the input
ws.next_layer().append(string_view{
"\x88\x00", 2});
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::closed)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_)
{
ios.run_one();
if(! BEAST_EXPECT(! ios.stopped()))
break;
}
BEAST_EXPECT(count == 0);
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
}
void
testContHook()
{
struct handler
{
void operator()(error_code) {}
};
stream<test::stream> ws{ios_};
stream<test::stream>::close_op<handler> op{
handler{}, ws, {}};
using boost::asio::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
} }
void void
@@ -254,6 +358,7 @@ public:
{ {
testClose(); testClose();
testCloseSuspend(); testCloseSuspend();
testContHook();
} }
}; };

View File

@@ -44,7 +44,7 @@ public:
}; };
// handshake // handshake
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
echo_server es{log}; echo_server es{log};
ws_type ws{ts}; ws_type ws{ts};
@@ -62,7 +62,7 @@ public:
}); });
// handshake, response // handshake, response
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
echo_server es{log}; echo_server es{log};
ws_type ws{ts}; ws_type ws{ts};
@@ -82,7 +82,7 @@ public:
}); });
// handshake, decorator // handshake, decorator
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
echo_server es{log}; echo_server es{log};
ws_type ws{ts}; ws_type ws{ts};
@@ -103,7 +103,7 @@ public:
}); });
// handshake, response, decorator // handshake, response, decorator
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
echo_server es{log}; echo_server es{log};
ws_type ws{ts}; ws_type ws{ts};

View File

@@ -123,6 +123,235 @@ public:
void void
testPingSuspend() testPingSuspend()
{
// suspend on write
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
ws.async_write(sbuf("Hello, world"),
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_);
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on close
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_);
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read ping + message
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add a ping and message to the input
ws.next_layer().append(string_view{
"\x89\x00" "\x81\x01*", 5});
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_)
{
ios.run_one();
if(! BEAST_EXPECT(! ios.stopped()))
break;
}
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read bad message
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add an invalid frame to the input
ws.next_layer().append(string_view{
"\x09\x00", 2});
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::failed)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_)
{
ios.run_one();
if(! BEAST_EXPECT(! ios.stopped()))
break;
}
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read close #1
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add a close frame to the input
ws.next_layer().append(string_view{
"\x88\x00", 2});
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::closed)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_)
{
ios.run_one();
if(! BEAST_EXPECT(! ios.stopped()))
break;
}
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read close #2
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log, kind::async};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// Cause close to be received
es.async_close();
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::closed)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_)
{
ios.run_one();
if(! BEAST_EXPECT(! ios.stopped()))
break;
}
BEAST_EXPECT(count == 0);
ws.async_ping({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
{ {
echo_server es{log, kind::async}; echo_server es{log, kind::async};
boost::asio::io_service ios; boost::asio::io_service ios;
@@ -187,12 +416,29 @@ public:
BEAST_EXPECT(n < limit); BEAST_EXPECT(n < limit);
ios.run(); ios.run();
} }
}
void
testContHook()
{
struct handler
{
void operator()(error_code) {}
};
stream<test::stream> ws{ios_};
stream<test::stream>::ping_op<handler> op{
handler{}, ws, detail::opcode::ping, {}};
using boost::asio::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
}
void void
run() override run() override
{ {
testPing(); testPing();
testPingSuspend(); testPingSuspend();
testContHook();
} }
}; };

View File

@@ -568,38 +568,83 @@ public:
check(error::closed, check(error::closed,
"\x88\x06\xfc\x15utf8"); "\x88\x06\xfc\x15utf8");
} }
}
void
testReadSuspend()
{
using boost::asio::buffer;
// suspend on write // suspend on write
doFailLoop([&](test::fail_counter& fc)
{ {
echo_server es{log}; echo_server es{log};
error_code ec;
boost::asio::io_service ios; boost::asio::io_service ios;
stream<test::stream> ws{ios}; stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream()); ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/", ec); ws.handshake("localhost", "/");
BEAST_EXPECTS(! ec, ec.message());
// insert a ping // insert a ping
ws.next_layer().append(string_view( ws.next_layer().append(string_view(
"\x89\x00", 2)); "\x89\x00", 2));
std::size_t count = 0; std::size_t count = 0;
multi_buffer b;
std::string const s = "Hello, world"; std::string const s = "Hello, world";
multi_buffer b;
ws.async_read(b, ws.async_read(b,
[&](error_code ec, std::size_t) [&](error_code ec, std::size_t)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(to_string(b.data()) == s); BEAST_EXPECT(to_string(b.data()) == s);
}); });
BEAST_EXPECT(ws.rd_block_);
ws.async_write(buffer(s), ws.async_write(buffer(s),
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
BEAST_EXPECT(ws.wr_block_); BEAST_EXPECT(ws.wr_block_);
ios.run(); ios.run();
BEAST_EXPECT(count == 2); BEAST_EXPECT(count == 2);
});
}
void
testContHook()
{
{
struct handler
{
void operator()(error_code, std::size_t) {}
};
char buf[32];
stream<test::stream> ws{ios_};
stream<test::stream>::read_some_op<
boost::asio::mutable_buffers_1,
handler> op{handler{}, ws,
boost::asio::mutable_buffers_1{
buf, sizeof(buf)}};
using boost::asio::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
}
{
struct handler
{
void operator()(error_code, std::size_t) {}
};
multi_buffer b;
stream<test::stream> ws{ios_};
stream<test::stream>::read_op<
multi_buffer, handler> op{
handler{}, ws, b, 32, true};
using boost::asio::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
} }
} }
@@ -607,6 +652,8 @@ public:
run() override run() override
{ {
testRead(); testRead();
testReadSuspend();
testContHook();
} }
}; };

View File

@@ -58,6 +58,7 @@ public:
test::stream ts_; test::stream ts_;
std::thread t_; std::thread t_;
websocket::stream<test::stream&> ws_; websocket::stream<test::stream&> ws_;
bool close_ = false;
public: public:
explicit explicit
@@ -119,12 +120,19 @@ public:
{ {
ios_.post( ios_.post(
[&] [&]
{
if(ws_.is_open())
{ {
ws_.async_close({}, ws_.async_close({},
std::bind( std::bind(
&echo_server::on_close, &echo_server::on_close,
this, this,
std::placeholders::_1)); std::placeholders::_1));
}
else
{
close_ = true;
}
}); });
} }
@@ -173,6 +181,7 @@ public:
{ {
if(ec) if(ec)
return fail(ec); return fail(ec);
do_read(); do_read();
} }
@@ -181,6 +190,16 @@ public:
{ {
if(ec) if(ec)
return fail(ec); return fail(ec);
if(close_)
{
return ws_.async_close({},
std::bind(
&echo_server::on_close,
this,
std::placeholders::_1));
}
do_read(); do_read();
} }
@@ -241,21 +260,16 @@ public:
template<class Test> template<class Test>
void void
doTestLoop(Test const& f) doFailLoop(
Test const& f, std::size_t limit = 200)
{ {
// This number has to be high for the
// test that writes the large buffer.
static std::size_t constexpr limit = 1000;
std::size_t n; std::size_t n;
for(n = 0; n <= limit; ++n) for(n = 0; n <= limit; ++n)
{ {
test::fail_counter fc{n}; test::fail_counter fc{n};
test::stream ts{ios_, fc};
try try
{ {
f(ts); f(fc);
ts.close();
break; break;
} }
catch(system_error const& se) catch(system_error const& se)
@@ -264,16 +278,28 @@ public:
se.code() == test::error::fail_error, se.code() == test::error::fail_error,
se.code().message()); se.code().message());
} }
catch(std::exception const& e)
{
fail(e.what(), __FILE__, __LINE__);
}
ts.close();
continue;
} }
BEAST_EXPECT(n < limit); BEAST_EXPECT(n < limit);
} }
template<class Test>
void
doStreamLoop(Test const& f)
{
// This number has to be high for the
// test that writes the large buffer.
static std::size_t constexpr limit = 1000;
doFailLoop(
[&](test::fail_counter& fc)
{
test::stream ts{ios_, fc};
f(ts);
ts.close();
}
, limit);
}
template<class Test> template<class Test>
void void
doTest( doTest(
@@ -422,6 +448,14 @@ public:
return false; return false;
} }
template<class Pred>
bool
run_until(
boost::asio::io_service& ios, Pred&& pred)
{
return run_until(ios, 100, pred);
}
inline inline
std::string const& std::string const&
random_string() random_string()

View File

@@ -123,7 +123,7 @@ public:
}); });
// nomask // nomask
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
echo_server es{log, kind::async_client}; echo_server es{log, kind::async_client};
ws_type ws{ts}; ws_type ws{ts};
@@ -149,7 +149,7 @@ public:
}); });
// nomask, autofrag // nomask, autofrag
doTestLoop([&](test::stream& ts) doStreamLoop([&](test::stream& ts)
{ {
echo_server es{log, kind::async_client}; echo_server es{log, kind::async_client};
ws_type ws{ts}; ws_type ws{ts};
@@ -230,93 +230,160 @@ public:
{ {
doTestWrite(AsyncClient{yield}); doTestWrite(AsyncClient{yield});
}); });
}
// suspend on write void
testWriteSuspend()
{
using boost::asio::buffer;
// suspend on ping
doFailLoop([&](test::fail_counter& fc)
{ {
echo_server es{log}; echo_server es{log};
error_code ec;
boost::asio::io_service ios; boost::asio::io_service ios;
stream<test::stream> ws{ios}; stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream()); ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/", ec); ws.handshake("localhost", "/");
BEAST_EXPECTS(! ec, ec.message());
std::size_t count = 0; std::size_t count = 0;
ws.async_ping("", ws.async_ping("",
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
BEAST_EXPECT(ws.wr_block_); BEAST_EXPECT(ws.wr_block_);
BEAST_EXPECT(count == 0);
ws.async_write(sbuf("*"), ws.async_write(sbuf("*"),
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS( if(ec)
ec == boost::asio::error::operation_aborted, BOOST_THROW_EXCEPTION(
ec.message()); system_error{ec});
}); });
ws.async_close({}, [&](error_code){}); BEAST_EXPECT(count == 0);
ios.run(); ios.run();
BEAST_EXPECT(count == 2); BEAST_EXPECT(count == 2);
} });
// suspend on write, nomask, frag // suspend on close
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_);
BEAST_EXPECT(count == 0);
ws.async_write(sbuf("*"),
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on read ping + message
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add a ping and message to the input
ws.next_layer().append(string_view{
"\x89\x00" "\x81\x01*", 5});
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
while(! ws.wr_block_)
{
ios.run_one();
if(! BEAST_EXPECT(! ios.stopped()))
break;
}
BEAST_EXPECT(count == 0);
ws.async_write(sbuf("*"),
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// suspend on ping: nomask, nofrag
doFailLoop([&](test::fail_counter& fc)
{ {
echo_server es{log, kind::async_client}; echo_server es{log, kind::async_client};
error_code ec;
boost::asio::io_service ios; boost::asio::io_service ios;
stream<test::stream> ws{ios}; stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream()); ws.next_layer().connect(es.stream());
es.async_handshake(); es.async_handshake();
ws.accept(ec); ws.accept();
BEAST_EXPECTS(! ec, ec.message());
std::size_t count = 0; std::size_t count = 0;
std::string const s(16384, '*'); std::string const s(16384, '*');
ws.auto_fragment(true); ws.auto_fragment(false);
ws.async_write(buffer(s), ws.async_write(buffer(s),
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
BEAST_EXPECT(ws.wr_block_); BEAST_EXPECT(ws.wr_block_);
ws.async_ping("", ws.async_ping("",
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
ios.run(); ios.run();
ios.reset();
BEAST_EXPECT(count == 2); BEAST_EXPECT(count == 2);
flat_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
BEAST_EXPECT(to_string(b.data()) == s);
ws.async_close({},
[&](error_code ec)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
}); });
});
ios.run();
BEAST_EXPECT(count == 4);
}
// suspend on write, mask, frag // suspend on ping: nomask, frag
doFailLoop([&](test::fail_counter& fc)
{ {
echo_server es{log, kind::async}; echo_server es{log, kind::async_client};
error_code ec;
boost::asio::io_service ios; boost::asio::io_service ios;
stream<test::stream> ws{ios}; stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream()); ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/", ec); es.async_handshake();
BEAST_EXPECTS(! ec, ec.message()); ws.accept();
std::size_t count = 0; std::size_t count = 0;
std::string const s(16384, '*'); std::string const s(16384, '*');
ws.auto_fragment(true); ws.auto_fragment(true);
@@ -324,42 +391,93 @@ public:
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
BEAST_EXPECT(ws.wr_block_); BEAST_EXPECT(ws.wr_block_);
ws.async_ping("", ws.async_ping("",
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
ios.run(); ios.run();
ios.reset();
BEAST_EXPECT(count == 2); BEAST_EXPECT(count == 2);
flat_buffer b; });
ws.async_read(b,
[&](error_code ec, std::size_t) // suspend on ping: mask, nofrag
doFailLoop([&](test::fail_counter& fc)
{ {
++count; echo_server es{log};
BEAST_EXPECTS(! ec, ec.message()); error_code ec;
BEAST_EXPECT(to_string(b.data()) == s); boost::asio::io_service ios;
ws.async_close({}, stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
std::string const s(16384, '*');
ws.auto_fragment(false);
ws.async_write(buffer(s),
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
BEAST_EXPECT(ws.wr_block_);
ws.async_ping("",
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
ios.run(); ios.run();
BEAST_EXPECT(count == 4); });
}
// suspend on write, deflate // suspend on ping: mask, frag
doFailLoop([&](test::fail_counter& fc)
{ {
echo_server es{log, kind::async}; echo_server es{log};
error_code ec; error_code ec;
boost::asio::io_service ios; boost::asio::io_service ios;
stream<test::stream> ws{ios}; stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
std::string const s(16384, '*');
ws.auto_fragment(true);
ws.async_write(buffer(s),
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_);
ws.async_ping("",
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
ios.run();
});
// suspend on ping: deflate
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log, kind::async};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
{ {
permessage_deflate pmd; permessage_deflate pmd;
pmd.client_enable = true; pmd.client_enable = true;
@@ -367,8 +485,7 @@ public:
ws.set_option(pmd); ws.set_option(pmd);
} }
ws.next_layer().connect(es.stream()); ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/", ec); ws.handshake("localhost", "/");
BEAST_EXPECTS(! ec, ec.message());
std::size_t count = 0; std::size_t count = 0;
auto const& s = random_string(); auto const& s = random_string();
ws.binary(true); ws.binary(true);
@@ -376,130 +493,21 @@ public:
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
BEAST_EXPECT(ws.wr_block_); BEAST_EXPECT(ws.wr_block_);
ws.async_ping("", ws.async_ping("",
[&](error_code ec) [&](error_code ec)
{ {
++count; ++count;
BEAST_EXPECTS(! ec, ec.message()); if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
}); });
ios.run(); ios.run();
ios.reset();
BEAST_EXPECT(count == 2);
flat_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
BEAST_EXPECT(to_string(b.data()) == s);
ws.async_close({},
[&](error_code ec)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
}); });
});
ios.run();
BEAST_EXPECT(count == 4);
}
}
/*
https://github.com/boostorg/beast/issues/300
Write a message as two individual frames
*/
void
testIssue300()
{
for(int i = 0; i < 2; ++i )
{
echo_server es{log, i==1 ?
kind::async : kind::sync};
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
error_code ec;
ws.handshake("localhost", "/", ec);
if(! BEAST_EXPECTS(! ec, ec.message()))
return;
ws.write_some(false, sbuf("u"));
ws.write_some(true, sbuf("v"));
multi_buffer b;
ws.read(b, ec);
BEAST_EXPECTS(! ec, ec.message());
}
}
void
testWriteSuspend()
{
for(int i = 0; i < 2; ++i )
{
echo_server es{log, i==1 ?
kind::async : kind::sync};
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// Make remote send a text message with bad utf8.
ws.binary(true);
put(ws.next_layer().buffer(), cbuf(
0x03, 0xea, 0xf0, 0x28, 0x8c, 0xbc));
multi_buffer b;
std::size_t count = 0;
// Read text message with bad utf8.
// Causes a close to be sent, blocking writes.
ws.async_read(b,
[&](error_code ec, std::size_t)
{
// Read should fail with protocol error
++count;
BEAST_EXPECTS(
ec == error::failed, ec.message());
// Reads after failure are aborted
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
BEAST_EXPECTS(ec == boost::asio::
error::operation_aborted,
ec.message());
});
});
// Run until the read_op writes a close frame.
while(! ws.wr_block_)
ios.run_one();
// Write a text message, leaving
// the write_op suspended as a pausation.
ws.async_write(sbuf("Hello"),
[&](error_code ec)
{
++count;
// Send is canceled because close received.
BEAST_EXPECTS(ec == boost::asio::
error::operation_aborted,
ec.message());
// Writes after close are aborted.
ws.async_write(sbuf("World"),
[&](error_code ec)
{
++count;
BEAST_EXPECTS(ec == boost::asio::
error::operation_aborted,
ec.message());
});
});
// Run until all completions are delivered.
while(! ios.stopped())
ios.run_one();
BEAST_EXPECT(count == 4);
}
} }
void void
@@ -536,13 +544,61 @@ public:
} }
} }
/*
https://github.com/boostorg/beast/issues/300
Write a message as two individual frames
*/
void
testIssue300()
{
for(int i = 0; i < 2; ++i )
{
echo_server es{log, i==1 ?
kind::async : kind::sync};
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
error_code ec;
ws.handshake("localhost", "/", ec);
if(! BEAST_EXPECTS(! ec, ec.message()))
return;
ws.write_some(false, sbuf("u"));
ws.write_some(true, sbuf("v"));
multi_buffer b;
ws.read(b, ec);
BEAST_EXPECTS(! ec, ec.message());
}
}
void
testContHook()
{
struct handler
{
void operator()(error_code) {}
};
char buf[32];
stream<test::stream> ws{ios_};
stream<test::stream>::write_some_op<
boost::asio::const_buffers_1,
handler> op{handler{}, ws, true,
boost::asio::const_buffers_1{
buf, sizeof(buf)}};
using boost::asio::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
}
void void
run() override run() override
{ {
testWrite(); testWrite();
testWriteSuspend(); testWriteSuspend();
testIssue300();
testAsyncWriteFrame(); testAsyncWriteFrame();
testIssue300();
testContHook();
} }
}; };