Refactor read_op + fail_op

This commit is contained in:
Vinnie Falco
2017-08-13 10:59:23 -07:00
parent 3652137718
commit fa087e19f1
12 changed files with 456 additions and 570 deletions

View File

@@ -12,6 +12,7 @@ WebSocket:
* Refactor fail_op * Refactor fail_op
* Refactor read_op * Refactor read_op
* Refactor close_op * Refactor close_op
* Refactor read_op + fail_op
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------

View File

@@ -54,12 +54,6 @@ public:
*/ */
using const_buffers_type = boost::asio::mutable_buffers_1; using const_buffers_type = boost::asio::mutable_buffers_1;
/** The type used to represent the mutable input sequence as a list of buffers.
This buffer sequence is guaranteed to have length 1.
*/
using mutable_data_type = boost::asio::mutable_buffers_1;
/** The type used to represent the output sequence as a list of buffers. /** The type used to represent the output sequence as a list of buffers.
This buffer sequence is guaranteed to have length 1. This buffer sequence is guaranteed to have length 1.
@@ -107,13 +101,6 @@ public:
const_buffers_type const_buffers_type
data() const; data() const;
/** Get a list of mutable buffers that represent the input sequence.
@note These buffers remain valid across subsequent calls to `prepare`.
*/
mutable_data_type
mutable_data();
/// Set the input and output sequences to size 0 /// Set the input and output sequences to size 0
void void
reset(); reset();

View File

@@ -35,15 +35,6 @@ data() const ->
return {in_, dist(in_, out_)}; return {in_, dist(in_, out_)};
} }
inline
auto
flat_static_buffer_base::
mutable_data() ->
mutable_data_type
{
return {in_, dist(in_, out_)};
}
inline inline
void void
flat_static_buffer_base:: flat_static_buffer_base::

View File

@@ -50,27 +50,6 @@ data() const ->
return result; return result;
} }
inline
auto
static_buffer_base::
mutable_data() ->
mutable_data_type
{
using boost::asio::mutable_buffer;
mutable_data_type result;
if(in_off_ + in_size_ <= capacity_)
{
result[0] = mutable_buffer{begin_ + in_off_, in_size_};
result[1] = mutable_buffer{begin_, 0};
}
else
{
result[0] = mutable_buffer{begin_ + in_off_, capacity_ - in_off_};
result[1] = mutable_buffer{begin_, in_size_ - (capacity_ - in_off_)};
}
return result;
}
inline inline
auto auto
static_buffer_base:: static_buffer_base::

View File

@@ -54,10 +54,6 @@ public:
using const_buffers_type = using const_buffers_type =
std::array<boost::asio::mutable_buffer, 2>; std::array<boost::asio::mutable_buffer, 2>;
/// The type used to represent the mutable input sequence as a list of buffers.
using mutable_data_type =
std::array<boost::asio::mutable_buffer, 2>;
/// The type used to represent the output sequence as a list of buffers. /// The type used to represent the output sequence as a list of buffers.
using mutable_buffers_type = using mutable_buffers_type =
std::array<boost::asio::mutable_buffer, 2>; std::array<boost::asio::mutable_buffer, 2>;
@@ -98,11 +94,6 @@ public:
const_buffers_type const_buffers_type
data() const; data() const;
/** Get a list of mutable buffers that represent the input sequence.
*/
mutable_data_type
mutable_data();
/** Get a list of buffers that represent the output sequence, with the given size. /** Get a list of buffers that represent the output sequence, with the given size.
@param size The number of bytes to request. @param size The number of bytes to request.

View File

@@ -251,7 +251,7 @@ close(close_reason const& cr, error_code& ec)
rd_close_ = true; rd_close_ = true;
auto const mb = buffer_prefix( auto const mb = buffer_prefix(
clamp(rd_.fh.len), clamp(rd_.fh.len),
rd_.buf.mutable_data()); rd_.buf.data());
if(rd_.fh.len > 0 && rd_.fh.mask) if(rd_.fh.len > 0 && rd_.fh.mask)
detail::mask_inplace(mb, rd_.key); detail::mask_inplace(mb, rd_.key);
detail::read_close(cr_, mb, code); detail::read_close(cr_, mb, code);

View File

@@ -202,10 +202,7 @@ operator()(error_code ec, std::size_t)
upcall: upcall:
BOOST_ASSERT(d.ws.wr_block_ == d.tok); BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.wr_block_.reset(); d.ws.wr_block_.reset();
d.ws.close_op_.maybe_invoke() || d_.invoke(ec);
d.ws.ping_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke();
d_.invoke(ec, 0);
} }
} }

View File

@@ -18,6 +18,7 @@
#include <boost/beast/core/type_traits.hpp> #include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/clamp.hpp> #include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/handler_alloc_hook.hpp> #include <boost/asio/handler_alloc_hook.hpp>
#include <boost/asio/handler_continuation_hook.hpp> #include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp> #include <boost/asio/handler_invoke_hook.hpp>
@@ -33,299 +34,22 @@ namespace boost {
namespace beast { namespace beast {
namespace websocket { namespace websocket {
//------------------------------------------------------------------------------ /* Read some message frame data.
// read a frame header, process control frames Also reads and handles control frames.
template<class NextLayer> */
template<class Handler>
class stream<NextLayer>::read_fh_op
{
Handler h_;
stream<NextLayer>& ws_;
int step_ = 0;
bool dispatched_ = false;
token tok_;
public:
read_fh_op(read_fh_op&&) = default;
read_fh_op(read_fh_op const&) = default;
template<class DeducedHandler>
read_fh_op(
DeducedHandler&& h,
stream<NextLayer>& ws)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, tok_(ws_.t_.unique())
{
}
Handler&
handler()
{
return h_;
}
void operator()(error_code ec = {},
std::size_t bytes_transferred = 0);
friend
void* asio_handler_allocate(
std::size_t size, read_fh_op* op)
{
using boost::asio::asio_handler_allocate;
return asio_handler_allocate(
size, std::addressof(op->h_));
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, read_fh_op* op)
{
using boost::asio::asio_handler_deallocate;
asio_handler_deallocate(
p, size, std::addressof(op->h_));
}
friend
bool asio_handler_is_continuation(read_fh_op* op)
{
using boost::asio::asio_handler_is_continuation;
return op->dispatched_ ||
asio_handler_is_continuation(
std::addressof(op->h_));
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, read_fh_op* op)
{
using boost::asio::asio_handler_invoke;
asio_handler_invoke(f, std::addressof(op->h_));
}
};
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::
read_fh_op<Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred)
{
using beast::detail::clamp;
enum
{
do_loop = 0,
do_pong = 10
};
switch(step_)
{
case do_loop:
go_loop:
{
BOOST_ASSERT(
ws_.rd_.remain == 0 &&
(! ws_.rd_.fh.fin || ws_.rd_.done));
if(ws_.failed_)
{
// Reads after failure are aborted
ec = boost::asio::error::operation_aborted;
break;
}
close_code code{};
// Read frame header
if(! ws_.parse_fh(ws_.rd_.fh, ws_.rd_.buf, code))
{
if(code != close_code::none)
// _Fail the WebSocket Connection_
return ws_.do_async_fail(
code, error::failed, std::move(h_));
step_ = do_loop + 1;
return ws_.stream_.async_read_some(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
}
// Immediately apply the mask to the portion
// of the buffer holding payload data.
if(ws_.rd_.fh.len > 0 && ws_.rd_.fh.mask)
detail::mask_inplace(buffer_prefix(
clamp(ws_.rd_.fh.len),
ws_.rd_.buf.mutable_data()),
ws_.rd_.key);
if(detail::is_control(ws_.rd_.fh.op))
{
// Get control frame payload
auto const cb = buffer_prefix(clamp(
ws_.rd_.fh.len), ws_.rd_.buf.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_.fh.len);
// Process control frame
if(ws_.rd_.fh.op == detail::opcode::ping)
{
ping_data payload;
detail::read_ping(payload, cb);
ws_.rd_.buf.consume(len);
if(ws_.wr_close_)
{
// Ignore ping when closing
goto go_loop;
}
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::ping, payload);
ws_.rd_.fb.consume(ws_.rd_.fb.size());
ws_.template write_ping<
flat_static_buffer_base>(ws_.rd_.fb,
detail::opcode::pong, payload);
goto go_pong;
}
if(ws_.rd_.fh.op == detail::opcode::pong)
{
code = close_code::none;
ping_data payload;
detail::read_ping(payload, cb);
ws_.rd_.buf.consume(len);
// Ignore pong when closing
if(! ws_.wr_close_ && ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::pong, payload);
goto go_loop;
}
BOOST_ASSERT(ws_.rd_.fh.op == detail::opcode::close);
{
BOOST_ASSERT(! ws_.rd_close_);
ws_.rd_close_ = true;
close_reason cr;
detail::read_close(cr, cb, code);
if(code != close_code::none)
{
// _Fail the WebSocket Connection_
return ws_.do_async_fail(
code, error::failed, std::move(h_));
}
ws_.cr_ = cr;
ws_.rd_.buf.consume(len);
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::close,
ws_.cr_.reason);
if(! ws_.wr_close_)
// _Start the WebSocket Closing Handshake_
return ws_.do_async_fail(
cr.code == close_code::none ?
close_code::normal : cr.code,
error::closed, std::move(h_));
// _Close the WebSocket Connection_
return ws_.do_async_fail(close_code::none,
error::closed, std::move(h_));
}
}
if(ws_.rd_.fh.len == 0 && ! ws_.rd_.fh.fin)
{
// Empty non-final frame
goto go_loop;
}
ws_.rd_.done =
ws_.rd_.remain == 0 && ws_.rd_.fh.fin;
break;
}
case do_loop + 1:
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
break;
ws_.rd_.buf.commit(bytes_transferred);
goto go_loop;
go_pong:
if(ws_.wr_block_)
{
// suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
step_ = do_pong;
ws_.rd_op_.save(std::move(*this));
return;
}
ws_.wr_block_ = tok_;
goto go_pong_send;
case do_pong:
BOOST_ASSERT(! ws_.wr_block_);
ws_.wr_block_ = tok_;
step_ = do_pong + 1;
// The current context is safe but might not be
// the same as the one for this operation (since
// we are being called from a write operation).
// Call post to make sure we are invoked the same
// way as the final handler for this operation.
ws_.get_io_service().post(bind_handler(
std::move(*this), ec, 0));
return;
case do_pong + 1:
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
if(ws_.failed_)
{
// call handler
ws_.wr_block_.reset();
ec = boost::asio::error::operation_aborted;
break;
}
if(ws_.wr_close_)
{
// ignore ping when closing
ws_.wr_block_.reset();
ws_.rd_.fb.consume(ws_.rd_.fb.size());
goto go_loop;
}
go_pong_send:
// send pong
BOOST_ASSERT(ws_.wr_block_ == tok_);
step_ = do_pong + 2;
boost::asio::async_write(ws_.stream_,
ws_.rd_.fb.data(), std::move(*this));
return;
case do_pong + 2:
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
ws_.wr_block_.reset();
ws_.failed_ = !!ec;
if(ws_.failed_)
break;
ws_.rd_.fb.consume(ws_.rd_.fb.size());
goto go_loop;
}
// upcall
BOOST_ASSERT(ws_.wr_block_ != tok_);
ws_.close_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke() ||
ws_.wr_op_.maybe_invoke();
if(! dispatched_)
ws_.stream_.get_io_service().post(
bind_handler(std::move(h_), ec));
else
h_(ec);
}
//------------------------------------------------------------------------------
// Reads a single message frame,
// processes any received control frames.
//
template<class NextLayer> template<class NextLayer>
template< template<
class MutableBufferSequence, class MutableBufferSequence,
class Handler> class Handler>
class stream<NextLayer>::read_some_op class stream<NextLayer>::read_some_op
: public boost::asio::coroutine
{ {
Handler h_; Handler h_;
stream<NextLayer>& ws_; stream<NextLayer>& ws_;
consuming_buffers<MutableBufferSequence> cb_; consuming_buffers<MutableBufferSequence> cb_;
std::size_t bytes_written_ = 0; std::size_t bytes_written_ = 0;
int step_ = 0; token tok_;
bool did_read_ = false; bool did_read_ = false;
bool dispatched_ = false; bool dispatched_ = false;
@@ -341,10 +65,18 @@ public:
: h_(std::forward<DeducedHandler>(h)) : h_(std::forward<DeducedHandler>(h))
, ws_(ws) , ws_(ws)
, cb_(bs) , cb_(bs)
, tok_(ws_.t_.unique())
{ {
} }
void operator()(error_code ec = {}, Handler&
handler()
{
return h_;
}
void operator()(
error_code ec = {},
std::size_t bytes_transferred = 0); std::size_t bytes_transferred = 0);
friend friend
@@ -393,26 +125,54 @@ operator()(
error_code ec, error_code ec,
std::size_t bytes_transferred) std::size_t bytes_transferred)
{ {
enum
{
do_start = 0,
do_maybe_fill = 10,
do_read = 20,
do_inflate = 30
};
using beast::detail::clamp; using beast::detail::clamp;
using boost::asio::buffer; using boost::asio::buffer;
using boost::asio::buffer_cast; using boost::asio::buffer_cast;
using boost::asio::buffer_size; using boost::asio::buffer_size;
switch(step_) close_code code{};
BOOST_ASIO_CORO_REENTER(*this)
{ {
case do_start: // Maybe suspend
if(! ws_.rd_block_)
{
// Acquire the read block
ws_.rd_block_ = tok_;
// Make sure the stream is open
if(ws_.failed_) if(ws_.failed_)
{ {
// Reads after failure are aborted BOOST_ASIO_CORO_YIELD
ec = boost::asio::error::operation_aborted; ws_.get_io_service().post(
break; bind_handler(std::move(*this),
boost::asio::error::operation_aborted));
goto upcall;
} }
}
else
{
// Suspend
BOOST_ASSERT(ws_.rd_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.r_rd_op_.save(std::move(*this));
// Acquire the read block
BOOST_ASSERT(! ws_.rd_block_);
ws_.rd_block_ = tok_;
// Resume
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(std::move(*this));
BOOST_ASSERT(ws_.rd_block_ == tok_);
dispatched_ = true;
// Handle the stream closing while suspended
if(ws_.failed_)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
}
}
loop:
// See if we need to read a frame header. This // See if we need to read a frame header. This
// condition is structured to give the decompressor // condition is structured to give the decompressor
// a chance to emit the final empty deflate block // a chance to emit the final empty deflate block
@@ -420,49 +180,197 @@ operator()(
if(ws_.rd_.remain == 0 && if(ws_.rd_.remain == 0 &&
(! ws_.rd_.fh.fin || ws_.rd_.done)) (! ws_.rd_.fh.fin || ws_.rd_.done))
{ {
step_ = do_maybe_fill; // Read frame header
return read_fh_op<read_some_op>{ while(! ws_.parse_fh(
std::move(*this), ws_}({}, 0); ws_.rd_.fh, ws_.rd_.buf, code))
{
if(code != close_code::none)
{
// _Fail the WebSocket Connection_
ec = error::failed;
goto close;
} }
goto go_maybe_fill; BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
case do_maybe_fill: ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
dispatched_ = true; dispatched_ = true;
if(ec) ws_.failed_ = !!ec;
break; if(ws_.failed_)
if(ws_.rd_.done) goto upcall;
break; ws_.rd_.buf.commit(bytes_transferred);
}
// Immediately apply the mask to the portion
// of the buffer holding payload data.
if(ws_.rd_.fh.len > 0 && ws_.rd_.fh.mask)
detail::mask_inplace(buffer_prefix(
clamp(ws_.rd_.fh.len),
ws_.rd_.buf.data()),
ws_.rd_.key);
if(detail::is_control(ws_.rd_.fh.op))
{
// Handle ping frame
if(ws_.rd_.fh.op == detail::opcode::ping)
{
{
auto const cb = buffer_prefix(
clamp(ws_.rd_.fh.len),
ws_.rd_.buf.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_.fh.len);
ping_data payload;
detail::read_ping(payload, cb);
ws_.rd_.buf.consume(len);
// Ignore ping when closing
if(ws_.wr_close_)
goto loop;
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::ping, payload);
ws_.rd_.fb.reset();
ws_.template write_ping<
flat_static_buffer_base>(ws_.rd_.fb,
detail::opcode::pong, payload);
}
// Maybe suspend
if(! ws_.wr_block_)
{
// Acquire the write block
ws_.wr_block_ = tok_;
}
else
{
// Suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.rd_op_.save(std::move(*this));
go_maybe_fill: // Acquire the write block
if(ws_.pmd_ && ws_.pmd_->rd_set) BOOST_ASSERT(! ws_.wr_block_);
goto go_inflate; ws_.wr_block_ = tok_;
// Resume
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
// Make sure the stream is open
if(ws_.failed_)
{
ws_.wr_block_.reset();
ec = boost::asio::error::operation_aborted;
goto upcall;
}
// Ignore ping when closing
if(ws_.wr_close_)
{
ws_.wr_block_.reset();
goto loop;
}
}
// Send pong
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
ws_.rd_.fb.data(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
ws_.wr_block_.reset();
ws_.failed_ = !!ec;
if(ws_.failed_)
goto upcall;
goto loop;
}
// Handle pong frame
if(ws_.rd_.fh.op == detail::opcode::pong)
{
auto const cb = buffer_prefix(clamp(
ws_.rd_.fh.len), ws_.rd_.buf.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_.fh.len);
code = close_code::none;
ping_data payload;
detail::read_ping(payload, cb);
ws_.rd_.buf.consume(len);
// Ignore pong when closing
if(! ws_.wr_close_ && ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::pong, payload);
goto loop;
}
// Handle close frame
BOOST_ASSERT(ws_.rd_.fh.op == detail::opcode::close);
{
auto const cb = buffer_prefix(clamp(
ws_.rd_.fh.len), ws_.rd_.buf.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_.fh.len);
BOOST_ASSERT(! ws_.rd_close_);
ws_.rd_close_ = true;
close_reason cr;
detail::read_close(cr, cb, code);
if(code != close_code::none)
{
// _Fail the WebSocket Connection_
ec = error::failed;
goto close;
}
ws_.cr_ = cr;
ws_.rd_.buf.consume(len);
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::close,
ws_.cr_.reason);
if(! ws_.wr_close_)
{
// _Start the WebSocket Closing Handshake_
code = cr.code == close_code::none ?
close_code::normal :
static_cast<close_code>(cr.code);
ec = error::closed;
goto close;
}
// _Close the WebSocket Connection_
code = close_code::none;
ec = error::closed;
goto close;
}
}
if(ws_.rd_.fh.len == 0 && ! ws_.rd_.fh.fin)
{
// Empty non-final frame
goto loop;
}
ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin;
if(ws_.rd_.done)
goto upcall; // Empty final frame
}
if(! ws_.pmd_ || ! ws_.pmd_->rd_set)
{
if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() >
(std::min)(clamp(ws_.rd_.remain), (std::min)(clamp(ws_.rd_.remain),
buffer_size(cb_))) buffer_size(cb_)))
{ {
// Fill the read buffer first, otherwise we // Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O. // get fewer bytes at the cost of one I/O.
auto const mb = ws_.rd_.buf.prepare( BOOST_ASIO_CORO_YIELD
read_size(ws_.rd_.buf, ws_.stream_.async_read_some(
ws_.rd_.buf.max_size())); ws_.rd_.buf.prepare(
step_ = do_maybe_fill + 1; read_size(
return ws_.stream_.async_read_some( ws_.rd_.buf,
mb, std::move(*this)); ws_.rd_.buf.max_size())),
} std::move(*this));
goto go_rd_buf;
case do_maybe_fill + 1:
dispatched_ = true; dispatched_ = true;
ws_.failed_ = !!ec; ws_.failed_ = !!ec;
if(ws_.failed_) if(ws_.failed_)
break; goto upcall;
ws_.rd_.buf.commit(bytes_transferred); ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask) if(ws_.rd_.fh.mask)
detail::mask_inplace(buffer_prefix(clamp( detail::mask_inplace(buffer_prefix(clamp(
ws_.rd_.remain), ws_.rd_.buf.mutable_data()), ws_.rd_.remain), ws_.rd_.buf.data()),
ws_.rd_.key); ws_.rd_.key);
}
go_rd_buf:
if(ws_.rd_.buf.size() > 0) if(ws_.rd_.buf.size() > 0)
{ {
// Copy from the read buffer. // Copy from the read buffer.
@@ -477,28 +385,29 @@ operator()(
if(! ws_.rd_.utf8.write(mb) || if(! ws_.rd_.utf8.write(mb) ||
(ws_.rd_.remain == 0 && ws_.rd_.fh.fin && (ws_.rd_.remain == 0 && ws_.rd_.fh.fin &&
! ws_.rd_.utf8.finish())) ! ws_.rd_.utf8.finish()))
{
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
return ws_.do_async_fail(close_code::bad_payload, code = close_code::bad_payload;
error::failed, std::move(h_)); ec = error::failed;
goto close;
}
} }
bytes_written_ += bytes_transferred; bytes_written_ += bytes_transferred;
ws_.rd_.size += bytes_transferred; ws_.rd_.size += bytes_transferred;
ws_.rd_.buf.consume(bytes_transferred); ws_.rd_.buf.consume(bytes_transferred);
goto go_done;
} }
else
{
// Read into caller's buffer // Read into caller's buffer
step_ = do_read;
BOOST_ASSERT(ws_.rd_.remain > 0); BOOST_ASSERT(ws_.rd_.remain > 0);
BOOST_ASSERT(buffer_size(cb_) > 0); BOOST_ASSERT(buffer_size(cb_) > 0);
return ws_.stream_.async_read_some(buffer_prefix( BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(buffer_prefix(
clamp(ws_.rd_.remain), cb_), std::move(*this)); clamp(ws_.rd_.remain), cb_), std::move(*this));
case do_read:
{
dispatched_ = true; dispatched_ = true;
ws_.failed_ = !!ec; ws_.failed_ = !!ec;
if(ws_.failed_) if(ws_.failed_)
break; goto upcall;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix( auto const mb = buffer_prefix(
bytes_transferred, cb_); bytes_transferred, cb_);
@@ -510,27 +419,48 @@ operator()(
if(! ws_.rd_.utf8.write(mb) || if(! ws_.rd_.utf8.write(mb) ||
(ws_.rd_.remain == 0 && ws_.rd_.fh.fin && (ws_.rd_.remain == 0 && ws_.rd_.fh.fin &&
! ws_.rd_.utf8.finish())) ! ws_.rd_.utf8.finish()))
{
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
return ws_.do_async_fail(close_code::bad_payload, code = close_code::bad_payload;
error::failed, std::move(h_)); ec = error::failed;
goto close;
}
} }
bytes_written_ += bytes_transferred; bytes_written_ += bytes_transferred;
ws_.rd_.size += bytes_transferred; ws_.rd_.size += bytes_transferred;
} }
go_done:
ws_.rd_.done = ws_.rd_.done =
ws_.rd_.remain == 0 && ws_.rd_.fh.fin; ws_.rd_.remain == 0 && ws_.rd_.fh.fin;
break; goto upcall;
}
case do_inflate: else
go_inflate:
{ {
// Read compressed message frame payload: // Read compressed message frame payload:
// inflate even if rd_.fh.len == 0, otherwise we // inflate even if rd_.fh.len == 0, otherwise we
// never emit the end-of-stream deflate block. // never emit the end-of-stream deflate block.
while(buffer_size(cb_) > 0) while(buffer_size(cb_) > 0)
{ {
if( ws_.rd_.remain > 0 &&
ws_.rd_.buf.size() == 0 &&
! did_read_)
{
// read new
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
ws_.failed_ = !!ec;
if(ws_.failed_)
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask)
detail::mask_inplace(
buffer_prefix(clamp(ws_.rd_.remain),
ws_.rd_.buf.data()), ws_.rd_.key);
did_read_ = true;
}
zlib::z_params zs; zlib::z_params zs;
{ {
auto const out = buffer_front(cb_); auto const out = buffer_front(cb_);
@@ -549,15 +479,6 @@ operator()(
zs.avail_in = buffer_size(in); zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in); zs.next_in = buffer_cast<void const*>(in);
} }
else if(! did_read_)
{
// read new
step_ = do_inflate + 1;
return ws_.stream_.async_read_some(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
}
else else
{ {
break; break;
@@ -602,9 +523,12 @@ operator()(
break; break;
if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
ws_.rd_.size, zs.total_out, ws_.rd_msg_max_)) ws_.rd_.size, zs.total_out, ws_.rd_msg_max_))
{
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_
return ws_.do_async_fail(close_code::too_big, code = close_code::too_big;
error::failed, std::move(h_)); ec = error::failed;
goto close;
}
cb_.consume(zs.total_out); cb_.consume(zs.total_out);
ws_.rd_.size += zs.total_out; ws_.rd_.size += zs.total_out;
ws_.rd_.remain -= zs.total_in; ws_.rd_.remain -= zs.total_in;
@@ -618,29 +542,27 @@ operator()(
buffer_prefix(bytes_written_, cb_.get())) || ( buffer_prefix(bytes_written_, cb_.get())) || (
ws_.rd_.remain == 0 && ws_.rd_.fh.fin && ws_.rd_.remain == 0 && ws_.rd_.fh.fin &&
! ws_.rd_.utf8.finish())) ! ws_.rd_.utf8.finish()))
// _Fail the WebSocket Connection_
return ws_.do_async_fail(close_code::bad_payload,
error::failed, std::move(h_));
}
break;
}
case do_inflate + 1:
{ {
ws_.failed_ = !!ec; // _Fail the WebSocket Connection_
if(ws_.failed_) code = close_code::bad_payload;
break; ec = error::failed;
BOOST_ASSERT(bytes_transferred > 0); goto close;
ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask)
detail::mask_inplace(
buffer_prefix(clamp(ws_.rd_.remain),
ws_.rd_.buf.mutable_data()), ws_.rd_.key);
did_read_ = true;
goto go_inflate;
} }
} }
// upcall goto upcall;
}
close:
// Maybe send close frame, then teardown
BOOST_ASIO_CORO_YIELD
ws_.do_async_fail(code, ec, std::move(*this));
BOOST_ASSERT(! ws_.wr_block_);
upcall:
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
ws_.close_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke() ||
ws_.wr_op_.maybe_invoke();
if(! dispatched_) if(! dispatched_)
{ {
ws_.stream_.get_io_service().post( ws_.stream_.get_io_service().post(
@@ -652,6 +574,7 @@ operator()(
h_(ec, bytes_written_); h_(ec, bytes_written_);
} }
} }
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -753,9 +676,8 @@ operator()(
do_read: do_read:
using buffers_type = typename using buffers_type = typename
DynamicBuffer::mutable_buffers_type; DynamicBuffer::mutable_buffers_type;
auto const rsh = ws_.read_size_hint(b_);
auto const size = clamp( auto const size = clamp(
rsh, limit_); ws_.read_size_hint(b_), limit_);
boost::optional<buffers_type> mb; boost::optional<buffers_type> mb;
try try
{ {
@@ -819,6 +741,12 @@ read(DynamicBuffer& buffer, error_code& ec)
"SyncStream requirements not met"); "SyncStream requirements not met");
static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value, static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met"); "DynamicBuffer requirements not met");
// Make sure the stream is open
if(failed_)
{
ec = boost::asio::error::operation_aborted;
return 0;
}
std::size_t bytes_written = 0; std::size_t bytes_written = 0;
do do
{ {
@@ -888,6 +816,12 @@ read_some(
"SyncStream requirements not met"); "SyncStream requirements not met");
static_assert(is_dynamic_buffer<DynamicBuffer>::value, static_assert(is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met"); "DynamicBuffer requirements not met");
// Make sure the stream is open
if(failed_)
{
ec = boost::asio::error::operation_aborted;
return 0;
}
using beast::detail::clamp; using beast::detail::clamp;
if(! limit) if(! limit)
limit = (std::numeric_limits<std::size_t>::max)(); limit = (std::numeric_limits<std::size_t>::max)();
@@ -972,16 +906,16 @@ read_some(
static_assert(is_mutable_buffer_sequence< static_assert(is_mutable_buffer_sequence<
MutableBufferSequence>::value, MutableBufferSequence>::value,
"MutableBufferSequence requirements not met"); "MutableBufferSequence requirements not met");
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
// Make sure the stream is open // Make sure the stream is open
if(failed_) if(failed_)
{ {
ec = boost::asio::error::operation_aborted; ec = boost::asio::error::operation_aborted;
return 0; return 0;
} }
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
close_code code{}; close_code code{};
std::size_t bytes_written = 0; std::size_t bytes_written = 0;
loop: loop:
@@ -1014,7 +948,7 @@ loop:
// of the buffer holding payload data. // of the buffer holding payload data.
if(rd_.fh.len > 0 && rd_.fh.mask) if(rd_.fh.len > 0 && rd_.fh.mask)
detail::mask_inplace(buffer_prefix( detail::mask_inplace(buffer_prefix(
clamp(rd_.fh.len), rd_.buf.mutable_data()), clamp(rd_.fh.len), rd_.buf.data()),
rd_.key); rd_.key);
if(detail::is_control(rd_.fh.op)) if(detail::is_control(rd_.fh.op))
{ {
@@ -1115,6 +1049,7 @@ loop:
if(rd_.fh.mask) if(rd_.fh.mask)
detail::mask_inplace(buffer_prefix( detail::mask_inplace(buffer_prefix(
clamp(rd_.remain), mb), rd_.key); clamp(rd_.remain), mb), rd_.key);
// VFALCO Do this before masking for symmetry with the async version
rd_.buf.commit(bytes_transferred); rd_.buf.commit(bytes_transferred);
} }
if(rd_.buf.size() > 0) if(rd_.buf.size() > 0)
@@ -1225,7 +1160,7 @@ loop:
if(rd_.fh.mask) if(rd_.fh.mask)
detail::mask_inplace( detail::mask_inplace(
buffer_prefix(clamp(rd_.remain), buffer_prefix(clamp(rd_.remain),
rd_.buf.mutable_data()), rd_.key); rd_.buf.data()), rd_.key);
auto const in = buffer_prefix( auto const in = buffer_prefix(
clamp(rd_.remain), buffer_front( clamp(rd_.remain), buffer_front(
rd_.buf.data())); rd_.buf.data()));

View File

@@ -177,11 +177,13 @@ open(role_type role)
rd_.cont = false; rd_.cont = false;
rd_.done = true; rd_.done = true;
// Can't clear this because accept uses it // Can't clear this because accept uses it
//rd_.buf.consume(rd_.buf.size()); //rd_.buf.reset();
rd_.fh.fin = false; rd_.fh.fin = false;
rd_close_ = false; rd_close_ = false;
wr_close_ = false; wr_close_ = false;
wr_block_.reset(); wr_block_.reset();
rd_block_.reset();
cr_.code = close_code::none;
ping_data_ = nullptr; // should be nullptr on close anyway ping_data_ = nullptr; // should be nullptr on close anyway
wr_.cont = false; wr_.cont = false;
@@ -240,6 +242,8 @@ reset()
wr_close_ = false; wr_close_ = false;
wr_.cont = false; wr_.cont = false;
wr_block_.reset(); wr_block_.reset();
rd_block_.reset();
cr_.code = close_code::none;
ping_data_ = nullptr; // should be nullptr on close anyway ping_data_ = nullptr; // should be nullptr on close anyway
} }

View File

@@ -230,12 +230,15 @@ class stream
bool rd_close_; // read close frame bool rd_close_; // read close frame
bool wr_close_; // sent close frame bool wr_close_; // sent close frame
token wr_block_; // op currenly writing token wr_block_; // op currenly writing
token rd_block_; // op currenly reading
ping_data* ping_data_; // where to put the payload ping_data* ping_data_; // where to put the payload
detail::pausation rd_op_; // paused read op detail::pausation rd_op_; // paused read op
detail::pausation wr_op_; // paused write op detail::pausation wr_op_; // paused write op
detail::pausation ping_op_; // paused ping op detail::pausation ping_op_; // paused ping op
detail::pausation close_op_; // paused close op detail::pausation close_op_; // paused close op
detail::pausation r_rd_op_; // paused read op (read)
detail::pausation r_close_op_; // paused close op (read)
close_reason cr_; // set from received close frame close_reason cr_; // set from received close frame
rd_t rd_; // read state rd_t rd_; // read state
wr_t wr_; // write state wr_t wr_; // write state

View File

@@ -105,7 +105,6 @@ public:
ba.commit(buffer_copy(d, buffer(s.data()+x+y, z))); ba.commit(buffer_copy(d, buffer(s.data()+x+y, z)));
} }
ba.commit(2); ba.commit(2);
BEAST_EXPECT(buffer_size(ba.data()) == buffer_size(ba.mutable_data()));
BEAST_EXPECT(ba.size() == x + y + z); BEAST_EXPECT(ba.size() == x + y + z);
BEAST_EXPECT(buffer_size(ba.data()) == ba.size()); BEAST_EXPECT(buffer_size(ba.data()) == ba.size());
BEAST_EXPECT(to_string(ba.data()) == s); BEAST_EXPECT(to_string(ba.data()) == s);

View File

@@ -105,7 +105,6 @@ public:
ba.commit(buffer_copy(d, buffer(s.data()+x+y, z))); ba.commit(buffer_copy(d, buffer(s.data()+x+y, z)));
} }
ba.commit(2); ba.commit(2);
BEAST_EXPECT(buffer_size(ba.data()) == buffer_size(ba.mutable_data()));
BEAST_EXPECT(ba.size() == x + y + z); BEAST_EXPECT(ba.size() == x + y + z);
BEAST_EXPECT(buffer_size(ba.data()) == ba.size()); BEAST_EXPECT(buffer_size(ba.data()) == ba.size());
BEAST_EXPECT(to_string(ba.data()) == s); BEAST_EXPECT(to_string(ba.data()) == s);