Refactor websocket stream members

This commit is contained in:
Vinnie Falco
2017-08-26 15:18:02 -07:00
parent 4c335a64cf
commit 1d5b3f488e
10 changed files with 422 additions and 469 deletions

View File

@@ -1,6 +1,7 @@
Version 110:
* Refactor stream open state variable
* Refactor websocket stream members
--------------------------------------------------------------------------------

View File

@@ -225,14 +225,14 @@ run(Buffers const& buffers)
auto const len = buffer_size(buffers);
try
{
mb.emplace(d.ws.rd_.buf.prepare(len));
mb.emplace(d.ws.rd_buf_.prepare(len));
}
catch(std::length_error const&)
{
ec = error::buffer_overflow;
return (*this)(ec);
}
d.ws.rd_.buf.commit(
d.ws.rd_buf_.commit(
buffer_copy(*mb, buffers));
(*this)(ec);
}
@@ -257,7 +257,7 @@ operator()(error_code ec)
{
BOOST_ASIO_CORO_YIELD
http::async_read(
d.ws.next_layer(), d.ws.rd_.buf,
d.ws.next_layer(), d.ws.rd_buf_,
d.p, std::move(*this));
if(ec == http::error::end_of_stream)
ec = error::closed;
@@ -407,7 +407,7 @@ accept(
static_buffer_base::mutable_buffers_type> mb;
try
{
mb.emplace(rd_.buf.prepare(
mb.emplace(rd_buf_.prepare(
buffer_size(buffers)));
}
catch(std::length_error const&)
@@ -415,7 +415,7 @@ accept(
ec = error::buffer_overflow;
return;
}
rd_.buf.commit(
rd_buf_.commit(
buffer_copy(*mb, buffers));
do_accept(&default_decorate_res, ec);
}
@@ -447,7 +447,7 @@ accept_ex(
static_buffer_base::mutable_buffers_type> mb;
try
{
mb.emplace(rd_.buf.prepare(
mb.emplace(rd_buf_.prepare(
buffer_size(buffers)));
}
catch(std::length_error const&)
@@ -455,7 +455,7 @@ accept_ex(
ec = error::buffer_overflow;
return;
}
rd_.buf.commit(buffer_copy(*mb, buffers));
rd_buf_.commit(buffer_copy(*mb, buffers));
do_accept(decorator, ec);
}
@@ -713,7 +713,7 @@ do_accept(
error_code& ec)
{
http::request_parser<http::empty_body> p;
http::read(next_layer(), rd_.buf, p, ec);
http::read(next_layer(), rd_buf_, p, ec);
if(ec == http::error::end_of_stream)
ec = error::closed;
if(ec)

View File

@@ -52,7 +52,7 @@ class stream<NextLayer>::close_op
stream<NextLayer>& ws_,
close_reason const& cr)
: ws(ws_)
, tok(ws.t_.unique())
, tok(ws.tok_.unique())
{
// Serialize the close frame
ws.template write_close<
@@ -148,7 +148,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
// Suspend
BOOST_ASSERT(d.ws.wr_block_ != d.tok);
BOOST_ASIO_CORO_YIELD
d.ws.close_op_.emplace(std::move(*this));
d.ws.paused_close_.emplace(std::move(*this));
// Acquire the write block
BOOST_ASSERT(! d.ws.wr_block_);
@@ -202,7 +202,7 @@ operator()(error_code ec, std::size_t bytes_transferred)
// Suspend
BOOST_ASSERT(d.ws.rd_block_ != d.tok);
BOOST_ASIO_CORO_YIELD
d.ws.r_close_op_.emplace(std::move(*this));
d.ws.paused_r_close_.emplace(std::move(*this));
// Acquire the read block
BOOST_ASSERT(! d.ws.rd_block_);
@@ -222,13 +222,13 @@ operator()(error_code ec, std::size_t bytes_transferred)
}
// Drain
if(d.ws.rd_.remain > 0)
if(d.ws.rd_remain_ > 0)
goto read_payload;
for(;;)
{
// Read frame header
while(! d.ws.parse_fh(
d.ws.rd_.fh, d.ws.rd_.buf, code))
d.ws.rd_fh_, d.ws.rd_buf_, code))
{
if(code != close_code::none)
{
@@ -237,28 +237,28 @@ operator()(error_code ec, std::size_t bytes_transferred)
}
BOOST_ASIO_CORO_YIELD
d.ws.stream_.async_read_some(
d.ws.rd_.buf.prepare(read_size(d.ws.rd_.buf,
d.ws.rd_.buf.max_size())),
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())),
std::move(*this));
if(ec)
{
d.ws.open_ = false;
goto upcall;
}
d.ws.rd_.buf.commit(bytes_transferred);
d.ws.rd_buf_.commit(bytes_transferred);
}
if(detail::is_control(d.ws.rd_.fh.op))
if(detail::is_control(d.ws.rd_fh_.op))
{
// Process control frame
if(d.ws.rd_.fh.op == detail::opcode::close)
if(d.ws.rd_fh_.op == detail::opcode::close)
{
BOOST_ASSERT(! d.ws.rd_close_);
d.ws.rd_close_ = true;
auto const mb = buffer_prefix(
clamp(d.ws.rd_.fh.len),
d.ws.rd_.buf.data());
if(d.ws.rd_.fh.len > 0 && d.ws.rd_.fh.mask)
detail::mask_inplace(mb, d.ws.rd_.key);
clamp(d.ws.rd_fh_.len),
d.ws.rd_buf_.data());
if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask)
detail::mask_inplace(mb, d.ws.rd_key_);
detail::read_close(d.ws.cr_, mb, code);
if(code != close_code::none)
{
@@ -266,33 +266,33 @@ operator()(error_code ec, std::size_t bytes_transferred)
d.ev = error::failed;
goto teardown;
}
d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len));
d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
goto teardown;
}
d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len));
d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
}
else
{
read_payload:
while(d.ws.rd_.buf.size() < d.ws.rd_.remain)
while(d.ws.rd_buf_.size() < d.ws.rd_remain_)
{
d.ws.rd_.remain -= d.ws.rd_.buf.size();
d.ws.rd_.buf.consume(d.ws.rd_.buf.size());
d.ws.rd_remain_ -= d.ws.rd_buf_.size();
d.ws.rd_buf_.consume(d.ws.rd_buf_.size());
BOOST_ASIO_CORO_YIELD
d.ws.stream_.async_read_some(
d.ws.rd_.buf.prepare(read_size(d.ws.rd_.buf,
d.ws.rd_.buf.max_size())),
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())),
std::move(*this));
if(ec)
{
d.ws.open_ = false;
goto upcall;
}
d.ws.rd_.buf.commit(bytes_transferred);
d.ws.rd_buf_.commit(bytes_transferred);
}
BOOST_ASSERT(d.ws.rd_.buf.size() >= d.ws.rd_.remain);
d.ws.rd_.buf.consume(clamp(d.ws.rd_.remain));
d.ws.rd_.remain = 0;
BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_);
d.ws.rd_buf_.consume(clamp(d.ws.rd_remain_));
d.ws.rd_remain_ = 0;
}
}
@@ -321,11 +321,11 @@ operator()(error_code ec, std::size_t bytes_transferred)
{
BOOST_ASSERT(d.ws.rd_block_ = d.tok);
d.ws.rd_block_.reset();
d.ws.r_rd_op_.maybe_invoke();
d.ws.paused_r_rd_.maybe_invoke();
}
d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke();
d.ws.paused_rd_.maybe_invoke() ||
d.ws.paused_ping_.maybe_invoke() ||
d.ws.paused_wr_.maybe_invoke();
d_.invoke(ec);
}
}
@@ -373,66 +373,66 @@ close(close_reason const& cr, error_code& ec)
return;
// Drain the connection
close_code code{};
if(rd_.remain > 0)
if(rd_remain_ > 0)
goto read_payload;
for(;;)
{
// Read frame header
while(! parse_fh(rd_.fh, rd_.buf, code))
while(! parse_fh(rd_fh_, rd_buf_, code))
{
if(code != close_code::none)
return do_fail(close_code::none,
error::failed, ec);
auto const bytes_transferred =
stream_.read_some(
rd_.buf.prepare(read_size(rd_.buf,
rd_.buf.max_size())), ec);
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec);
open_ = ! ec;
if(! open_)
return;
rd_.buf.commit(bytes_transferred);
rd_buf_.commit(bytes_transferred);
}
if(detail::is_control(rd_.fh.op))
if(detail::is_control(rd_fh_.op))
{
// Process control frame
if(rd_.fh.op == detail::opcode::close)
if(rd_fh_.op == detail::opcode::close)
{
BOOST_ASSERT(! rd_close_);
rd_close_ = true;
auto const mb = buffer_prefix(
clamp(rd_.fh.len),
rd_.buf.data());
if(rd_.fh.len > 0 && rd_.fh.mask)
detail::mask_inplace(mb, rd_.key);
clamp(rd_fh_.len),
rd_buf_.data());
if(rd_fh_.len > 0 && rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
detail::read_close(cr_, mb, code);
if(code != close_code::none)
// Protocol error
return do_fail(close_code::none,
error::failed, ec);
rd_.buf.consume(clamp(rd_.fh.len));
rd_buf_.consume(clamp(rd_fh_.len));
break;
}
rd_.buf.consume(clamp(rd_.fh.len));
rd_buf_.consume(clamp(rd_fh_.len));
}
else
{
read_payload:
while(rd_.buf.size() < rd_.remain)
while(rd_buf_.size() < rd_remain_)
{
rd_.remain -= rd_.buf.size();
rd_.buf.consume(rd_.buf.size());
rd_remain_ -= rd_buf_.size();
rd_buf_.consume(rd_buf_.size());
auto const bytes_transferred =
stream_.read_some(
rd_.buf.prepare(read_size(rd_.buf,
rd_.buf.max_size())), ec);
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec);
open_ = ! ec;
if(! open_)
return;
rd_.buf.commit(bytes_transferred);
rd_buf_.commit(bytes_transferred);
}
BOOST_ASSERT(rd_.buf.size() >= rd_.remain);
rd_.buf.consume(clamp(rd_.remain));
rd_.remain = 0;
BOOST_ASSERT(rd_buf_.size() >= rd_remain_);
rd_buf_.consume(clamp(rd_remain_));
rd_remain_ = 0;
}
}
// _Close the WebSocket Connection_

View File

@@ -55,7 +55,7 @@ class stream<NextLayer>::fail_op
: ws(ws_)
, code(code_)
, ev(ev_)
, tok(ws.t_.unique())
, tok(ws.tok_.unique())
{
}
};
@@ -150,7 +150,7 @@ operator()(error_code ec, std::size_t)
// Suspend
BOOST_ASSERT(d.ws.wr_block_ != d.tok);
BOOST_ASIO_CORO_YIELD
d.ws.rd_op_.emplace(std::move(*this)); // VFALCO emplace to rd_op_
d.ws.paused_rd_.emplace(std::move(*this)); // VFALCO emplace to paused_rd_
// Acquire the write block
BOOST_ASSERT(! d.ws.wr_block_);

View File

@@ -140,7 +140,7 @@ operator()(error_code ec)
// Read HTTP response
BOOST_ASIO_CORO_YIELD
http::async_read(d.ws.next_layer(),
d.ws.rd_.buf, d.res,
d.ws.rd_buf_, d.res,
std::move(*this));
if(ec)
goto upcall;
@@ -398,7 +398,7 @@ do_handshake(
}
if(ec)
return;
http::read(next_layer(), rd_.buf, res, ec);
http::read(next_layer(), rd_buf_, res, ec);
if(ec)
return;
on_response(res, key, ec);

View File

@@ -48,7 +48,7 @@ class stream<NextLayer>::ping_op
detail::opcode op,
ping_data const& payload)
: ws(ws_)
, tok(ws.t_.unique())
, tok(ws.tok_.unique())
{
// Serialize the control frame
ws.template write_ping<
@@ -145,7 +145,7 @@ operator()(error_code ec, std::size_t)
// Suspend
BOOST_ASSERT(d.ws.wr_block_ != d.tok);
BOOST_ASIO_CORO_YIELD
d.ws.ping_op_.emplace(std::move(*this));
d.ws.paused_ping_.emplace(std::move(*this));
// Acquire the write block
BOOST_ASSERT(! d.ws.wr_block_);
@@ -174,9 +174,9 @@ operator()(error_code ec, std::size_t)
upcall:
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.wr_block_.reset();
d.ws.close_op_.maybe_invoke() ||
d.ws.rd_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke();
d.ws.paused_close_.maybe_invoke() ||
d.ws.paused_rd_.maybe_invoke() ||
d.ws.paused_wr_.maybe_invoke();
d_.invoke(ec);
}
}

View File

@@ -66,7 +66,7 @@ public:
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, cb_(bs)
, tok_(ws_.t_.unique())
, tok_(ws_.tok_.unique())
{
}
@@ -154,7 +154,7 @@ operator()(
// Suspend
BOOST_ASSERT(ws_.rd_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.r_rd_op_.save(std::move(*this));
ws_.paused_r_rd_.save(std::move(*this));
// Acquire the read block
BOOST_ASSERT(! ws_.rd_block_);
@@ -178,12 +178,12 @@ operator()(
// condition is structured to give the decompressor
// a chance to emit the final empty deflate block
//
if(ws_.rd_.remain == 0 &&
(! ws_.rd_.fh.fin || ws_.rd_.done))
if(ws_.rd_remain_ == 0 &&
(! ws_.rd_fh_.fin || ws_.rd_done_))
{
// Read frame header
while(! ws_.parse_fh(
ws_.rd_.fh, ws_.rd_.buf, code))
ws_.rd_fh_, ws_.rd_buf_, code))
{
if(code != close_code::none)
{
@@ -193,48 +193,48 @@ operator()(
}
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
dispatched_ = true;
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
ws_.rd_.buf.commit(bytes_transferred);
ws_.rd_buf_.commit(bytes_transferred);
}
// Immediately apply the mask to the portion
// of the buffer holding payload data.
if(ws_.rd_.fh.len > 0 && ws_.rd_.fh.mask)
if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
detail::mask_inplace(buffer_prefix(
clamp(ws_.rd_.fh.len),
ws_.rd_.buf.data()),
ws_.rd_.key);
if(detail::is_control(ws_.rd_.fh.op))
clamp(ws_.rd_fh_.len),
ws_.rd_buf_.data()),
ws_.rd_key_);
if(detail::is_control(ws_.rd_fh_.op))
{
// Clear this otherwise the next
// frame will be considered final.
ws_.rd_.fh.fin = false;
ws_.rd_fh_.fin = false;
// Handle ping frame
if(ws_.rd_.fh.op == detail::opcode::ping)
if(ws_.rd_fh_.op == detail::opcode::ping)
{
{
auto const b = buffer_prefix(
clamp(ws_.rd_.fh.len),
ws_.rd_.buf.data());
clamp(ws_.rd_fh_.len),
ws_.rd_buf_.data());
auto const len = buffer_size(b);
BOOST_ASSERT(len == ws_.rd_.fh.len);
BOOST_ASSERT(len == ws_.rd_fh_.len);
ping_data payload;
detail::read_ping(payload, b);
ws_.rd_.buf.consume(len);
ws_.rd_buf_.consume(len);
// Ignore ping when closing
if(ws_.wr_close_)
goto loop;
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::ping, payload);
ws_.rd_.fb.reset();
ws_.rd_fb_.reset();
ws_.template write_ping<
flat_static_buffer_base>(ws_.rd_.fb,
flat_static_buffer_base>(ws_.rd_fb_,
detail::opcode::pong, payload);
}
// Maybe suspend
@@ -248,7 +248,7 @@ operator()(
// Suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.rd_op_.save(std::move(*this));
ws_.paused_rd_.save(std::move(*this));
// Acquire the write block
BOOST_ASSERT(! ws_.wr_block_);
@@ -280,7 +280,7 @@ operator()(
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
ws_.rd_.fb.data(), std::move(*this));
ws_.rd_fb_.data(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
ws_.wr_block_.reset();
@@ -290,28 +290,28 @@ operator()(
goto loop;
}
// Handle pong frame
if(ws_.rd_.fh.op == detail::opcode::pong)
if(ws_.rd_fh_.op == detail::opcode::pong)
{
auto const cb = buffer_prefix(clamp(
ws_.rd_.fh.len), ws_.rd_.buf.data());
ws_.rd_fh_.len), ws_.rd_buf_.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_.fh.len);
BOOST_ASSERT(len == ws_.rd_fh_.len);
code = close_code::none;
ping_data payload;
detail::read_ping(payload, cb);
ws_.rd_.buf.consume(len);
ws_.rd_buf_.consume(len);
// Ignore pong when closing
if(! ws_.wr_close_ && ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::pong, payload);
goto loop;
}
// Handle close frame
BOOST_ASSERT(ws_.rd_.fh.op == detail::opcode::close);
BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
{
auto const cb = buffer_prefix(clamp(
ws_.rd_.fh.len), ws_.rd_.buf.data());
ws_.rd_fh_.len), ws_.rd_buf_.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_.fh.len);
BOOST_ASSERT(len == ws_.rd_fh_.len);
BOOST_ASSERT(! ws_.rd_close_);
ws_.rd_close_ = true;
close_reason cr;
@@ -323,7 +323,7 @@ operator()(
goto close;
}
ws_.cr_ = cr;
ws_.rd_.buf.consume(len);
ws_.rd_buf_.consume(len);
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::close,
ws_.cr_.reason);
@@ -342,52 +342,52 @@ operator()(
goto close;
}
}
if(ws_.rd_.fh.len == 0 && ! ws_.rd_.fh.fin)
if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
{
// Empty non-final frame
goto loop;
}
ws_.rd_.done = false;
ws_.rd_done_ = false;
}
if(! ws_.pmd_ || ! ws_.pmd_->rd_set)
{
if(ws_.rd_.remain > 0)
if(ws_.rd_remain_ > 0)
{
if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() >
(std::min)(clamp(ws_.rd_.remain),
if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
(std::min)(clamp(ws_.rd_remain_),
buffer_size(cb_)))
{
// Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O.
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
dispatched_ = true;
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask)
ws_.rd_buf_.commit(bytes_transferred);
if(ws_.rd_fh_.mask)
detail::mask_inplace(buffer_prefix(clamp(
ws_.rd_.remain), ws_.rd_.buf.data()),
ws_.rd_.key);
ws_.rd_remain_), ws_.rd_buf_.data()),
ws_.rd_key_);
}
if(ws_.rd_.buf.size() > 0)
if(ws_.rd_buf_.size() > 0)
{
// Copy from the read buffer.
// The mask was already applied.
bytes_transferred = buffer_copy(cb_,
ws_.rd_.buf.data(), clamp(ws_.rd_.remain));
ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
auto const mb = buffer_prefix(
bytes_transferred, cb_);
ws_.rd_.remain -= bytes_transferred;
if(ws_.rd_.op == detail::opcode::text)
ws_.rd_remain_ -= bytes_transferred;
if(ws_.rd_op_ == detail::opcode::text)
{
if(! ws_.rd_.utf8.write(mb) ||
(ws_.rd_.remain == 0 && ws_.rd_.fh.fin &&
! ws_.rd_.utf8.finish()))
if(! ws_.rd_utf8_.write(mb) ||
(ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
! ws_.rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
code = close_code::bad_payload;
@@ -396,17 +396,17 @@ operator()(
}
}
bytes_written_ += bytes_transferred;
ws_.rd_.size += bytes_transferred;
ws_.rd_.buf.consume(bytes_transferred);
ws_.rd_size_ += bytes_transferred;
ws_.rd_buf_.consume(bytes_transferred);
}
else
{
// Read into caller's buffer
BOOST_ASSERT(ws_.rd_.remain > 0);
BOOST_ASSERT(ws_.rd_remain_ > 0);
BOOST_ASSERT(buffer_size(cb_) > 0);
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(buffer_prefix(
clamp(ws_.rd_.remain), cb_), std::move(*this));
clamp(ws_.rd_remain_), cb_), std::move(*this));
dispatched_ = true;
ws_.open_ = ! ec;
if(! ws_.open_)
@@ -414,14 +414,14 @@ operator()(
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
bytes_transferred, cb_);
ws_.rd_.remain -= bytes_transferred;
if(ws_.rd_.fh.mask)
detail::mask_inplace(mb, ws_.rd_.key);
if(ws_.rd_.op == detail::opcode::text)
ws_.rd_remain_ -= bytes_transferred;
if(ws_.rd_fh_.mask)
detail::mask_inplace(mb, ws_.rd_key_);
if(ws_.rd_op_ == detail::opcode::text)
{
if(! ws_.rd_.utf8.write(mb) ||
(ws_.rd_.remain == 0 && ws_.rd_.fh.fin &&
! ws_.rd_.utf8.finish()))
if(! ws_.rd_utf8_.write(mb) ||
(ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
! ws_.rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
code = close_code::bad_payload;
@@ -430,38 +430,38 @@ operator()(
}
}
bytes_written_ += bytes_transferred;
ws_.rd_.size += bytes_transferred;
ws_.rd_size_ += bytes_transferred;
}
}
ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin;
ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
goto upcall;
}
else
{
// Read compressed message frame payload:
// inflate even if rd_.fh.len == 0, otherwise we
// inflate even if rd_fh_.len == 0, otherwise we
// never emit the end-of-stream deflate block.
while(buffer_size(cb_) > 0)
{
if( ws_.rd_.remain > 0 &&
ws_.rd_.buf.size() == 0 &&
if( ws_.rd_remain_ > 0 &&
ws_.rd_buf_.size() == 0 &&
! did_read_)
{
// read new
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
ws_.open_ = ! ec;
if(! ws_.open_)
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask)
ws_.rd_buf_.commit(bytes_transferred);
if(ws_.rd_fh_.mask)
detail::mask_inplace(
buffer_prefix(clamp(ws_.rd_.remain),
ws_.rd_.buf.data()), ws_.rd_.key);
buffer_prefix(clamp(ws_.rd_remain_),
ws_.rd_buf_.data()), ws_.rd_key_);
did_read_ = true;
}
zlib::z_params zs;
@@ -471,14 +471,14 @@ operator()(
zs.avail_out = buffer_size(out);
BOOST_ASSERT(zs.avail_out > 0);
}
if(ws_.rd_.remain > 0)
if(ws_.rd_remain_ > 0)
{
if(ws_.rd_.buf.size() > 0)
if(ws_.rd_buf_.size() > 0)
{
// use what's there
auto const in = buffer_prefix(
clamp(ws_.rd_.remain), buffer_front(
ws_.rd_.buf.data()));
clamp(ws_.rd_remain_), buffer_front(
ws_.rd_buf_.data()));
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
}
@@ -487,7 +487,7 @@ operator()(
break;
}
}
else if(ws_.rd_.fh.fin)
else if(ws_.rd_fh_.fin)
{
// append the empty block codes
static std::uint8_t constexpr
@@ -504,7 +504,7 @@ operator()(
// https://github.com/madler/zlib/issues/280
BOOST_ASSERT(zs.total_out == 0);
cb_.consume(zs.total_out);
ws_.rd_.size += zs.total_out;
ws_.rd_size_ += zs.total_out;
bytes_written_ += zs.total_out;
if(
(ws_.role_ == role_type::client &&
@@ -512,7 +512,7 @@ operator()(
(ws_.role_ == role_type::server &&
ws_.pmd_config_.client_no_context_takeover))
ws_.pmd_->zi.reset();
ws_.rd_.done = true;
ws_.rd_done_ = true;
break;
}
else
@@ -525,7 +525,7 @@ operator()(
if(! ws_.open_)
break;
if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
ws_.rd_.size, zs.total_out, ws_.rd_msg_max_))
ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
{
// _Fail the WebSocket Connection_
code = close_code::too_big;
@@ -533,17 +533,17 @@ operator()(
goto close;
}
cb_.consume(zs.total_out);
ws_.rd_.size += zs.total_out;
ws_.rd_.remain -= zs.total_in;
ws_.rd_.buf.consume(zs.total_in);
ws_.rd_size_ += zs.total_out;
ws_.rd_remain_ -= zs.total_in;
ws_.rd_buf_.consume(zs.total_in);
bytes_written_ += zs.total_out;
}
if(ws_.rd_.op == detail::opcode::text)
if(ws_.rd_op_ == detail::opcode::text)
{
// check utf8
if(! ws_.rd_.utf8.write(
if(! ws_.rd_utf8_.write(
buffer_prefix(bytes_written_, cb_.get())) || (
ws_.rd_.done && ! ws_.rd_.utf8.finish()))
ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
code = close_code::bad_payload;
@@ -564,10 +564,10 @@ operator()(
upcall:
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
ws_.r_close_op_.maybe_invoke();
ws_.close_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke() ||
ws_.wr_op_.maybe_invoke();
ws_.paused_r_close_.maybe_invoke();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
if(! dispatched_)
{
ws_.stream_.get_io_service().post(
@@ -903,10 +903,10 @@ loop:
// condition is structured to give the decompressor
// a chance to emit the final empty deflate block
//
if(rd_.remain == 0 && (! rd_.fh.fin || rd_.done))
if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
{
// Read frame header
while(! parse_fh(rd_.fh, rd_.buf, code))
while(! parse_fh(rd_fh_, rd_buf_, code))
{
if(code != close_code::none)
{
@@ -916,38 +916,38 @@ loop:
}
auto const bytes_transferred =
stream_.read_some(
rd_.buf.prepare(read_size(
rd_.buf, rd_.buf.max_size())),
rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())),
ec);
open_ = ! ec;
if(! open_)
return bytes_written;
rd_.buf.commit(bytes_transferred);
rd_buf_.commit(bytes_transferred);
}
// Immediately apply the mask to the portion
// of the buffer holding payload data.
if(rd_.fh.len > 0 && rd_.fh.mask)
if(rd_fh_.len > 0 && rd_fh_.mask)
detail::mask_inplace(buffer_prefix(
clamp(rd_.fh.len), rd_.buf.data()),
rd_.key);
if(detail::is_control(rd_.fh.op))
clamp(rd_fh_.len), rd_buf_.data()),
rd_key_);
if(detail::is_control(rd_fh_.op))
{
// Get control frame payload
auto const b = buffer_prefix(
clamp(rd_.fh.len), rd_.buf.data());
clamp(rd_fh_.len), rd_buf_.data());
auto const len = buffer_size(b);
BOOST_ASSERT(len == rd_.fh.len);
BOOST_ASSERT(len == rd_fh_.len);
// Clear this otherwise the next
// frame will be considered final.
rd_.fh.fin = false;
rd_fh_.fin = false;
// Handle ping frame
if(rd_.fh.op == detail::opcode::ping)
if(rd_fh_.op == detail::opcode::ping)
{
ping_data payload;
detail::read_ping(payload, b);
rd_.buf.consume(len);
rd_buf_.consume(len);
if(wr_close_)
{
// Ignore ping when closing
@@ -965,17 +965,17 @@ loop:
goto loop;
}
// Handle pong frame
if(rd_.fh.op == detail::opcode::pong)
if(rd_fh_.op == detail::opcode::pong)
{
ping_data payload;
detail::read_ping(payload, b);
rd_.buf.consume(len);
rd_buf_.consume(len);
if(ctrl_cb_)
ctrl_cb_(frame_type::pong, payload);
goto loop;
}
// Handle close frame
BOOST_ASSERT(rd_.fh.op == detail::opcode::close);
BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
{
BOOST_ASSERT(! rd_close_);
rd_close_ = true;
@@ -988,7 +988,7 @@ loop:
return bytes_written;
}
cr_ = cr;
rd_.buf.consume(len);
rd_buf_.consume(len);
if(ctrl_cb_)
ctrl_cb_(frame_type::close, cr_.reason);
BOOST_ASSERT(! wr_close_);
@@ -1000,12 +1000,12 @@ loop:
return bytes_written;
}
}
if(rd_.fh.len == 0 && ! rd_.fh.fin)
if(rd_fh_.len == 0 && ! rd_fh_.fin)
{
// Empty non-final frame
goto loop;
}
rd_.done = false;
rd_done_ = false;
}
else
{
@@ -1013,40 +1013,40 @@ loop:
}
if(! pmd_ || ! pmd_->rd_set)
{
if(rd_.remain > 0)
if(rd_remain_ > 0)
{
if(rd_.buf.size() == 0 && rd_.buf.max_size() >
(std::min)(clamp(rd_.remain),
if(rd_buf_.size() == 0 && rd_buf_.max_size() >
(std::min)(clamp(rd_remain_),
buffer_size(buffers)))
{
// Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O.
rd_.buf.commit(stream_.read_some(
rd_.buf.prepare(read_size(rd_.buf,
rd_.buf.max_size())), ec));
rd_buf_.commit(stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec));
open_ = ! ec;
if(! open_)
return bytes_written;
if(rd_.fh.mask)
if(rd_fh_.mask)
detail::mask_inplace(
buffer_prefix(clamp(rd_.remain),
rd_.buf.data()), rd_.key);
buffer_prefix(clamp(rd_remain_),
rd_buf_.data()), rd_key_);
}
if(rd_.buf.size() > 0)
if(rd_buf_.size() > 0)
{
// Copy from the read buffer.
// The mask was already applied.
auto const bytes_transferred =
buffer_copy(buffers, rd_.buf.data(),
clamp(rd_.remain));
buffer_copy(buffers, rd_buf_.data(),
clamp(rd_remain_));
auto const mb = buffer_prefix(
bytes_transferred, buffers);
rd_.remain -= bytes_transferred;
if(rd_.op == detail::opcode::text)
rd_remain_ -= bytes_transferred;
if(rd_op_ == detail::opcode::text)
{
if(! rd_.utf8.write(mb) ||
(rd_.remain == 0 && rd_.fh.fin &&
! rd_.utf8.finish()))
if(! rd_utf8_.write(mb) ||
(rd_remain_ == 0 && rd_fh_.fin &&
! rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
do_fail(
@@ -1057,31 +1057,31 @@ loop:
}
}
bytes_written += bytes_transferred;
rd_.size += bytes_transferred;
rd_.buf.consume(bytes_transferred);
rd_size_ += bytes_transferred;
rd_buf_.consume(bytes_transferred);
}
else
{
// Read into caller's buffer
BOOST_ASSERT(rd_.remain > 0);
BOOST_ASSERT(rd_remain_ > 0);
BOOST_ASSERT(buffer_size(buffers) > 0);
auto const bytes_transferred =
stream_.read_some(buffer_prefix(
clamp(rd_.remain), buffers), ec);
clamp(rd_remain_), buffers), ec);
open_ = ! ec;
if(! open_)
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
bytes_transferred, buffers);
rd_.remain -= bytes_transferred;
if(rd_.fh.mask)
detail::mask_inplace(mb, rd_.key);
if(rd_.op == detail::opcode::text)
rd_remain_ -= bytes_transferred;
if(rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
if(rd_op_ == detail::opcode::text)
{
if(! rd_.utf8.write(mb) ||
(rd_.remain == 0 && rd_.fh.fin &&
! rd_.utf8.finish()))
if(! rd_utf8_.write(mb) ||
(rd_remain_ == 0 && rd_fh_.fin &&
! rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
do_fail(close_code::bad_payload,
@@ -1090,15 +1090,15 @@ loop:
}
}
bytes_written += bytes_transferred;
rd_.size += bytes_transferred;
rd_size_ += bytes_transferred;
}
}
rd_.done = rd_.remain == 0 && rd_.fh.fin;
rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
}
else
{
// Read compressed message frame payload:
// inflate even if rd_.fh.len == 0, otherwise we
// inflate even if rd_fh_.len == 0, otherwise we
// never emit the end-of-stream deflate block.
//
bool did_read = false;
@@ -1112,14 +1112,14 @@ loop:
zs.avail_out = buffer_size(out);
BOOST_ASSERT(zs.avail_out > 0);
}
if(rd_.remain > 0)
if(rd_remain_ > 0)
{
if(rd_.buf.size() > 0)
if(rd_buf_.size() > 0)
{
// use what's there
auto const in = buffer_prefix(
clamp(rd_.remain), buffer_front(
rd_.buf.data()));
clamp(rd_remain_), buffer_front(
rd_buf_.data()));
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
}
@@ -1128,21 +1128,21 @@ loop:
// read new
auto const bytes_transferred =
stream_.read_some(
rd_.buf.prepare(read_size(
rd_.buf, rd_.buf.max_size())),
rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())),
ec);
open_ = ! ec;
if(! open_)
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
rd_.buf.commit(bytes_transferred);
if(rd_.fh.mask)
rd_buf_.commit(bytes_transferred);
if(rd_fh_.mask)
detail::mask_inplace(
buffer_prefix(clamp(rd_.remain),
rd_.buf.data()), rd_.key);
buffer_prefix(clamp(rd_remain_),
rd_buf_.data()), rd_key_);
auto const in = buffer_prefix(
clamp(rd_.remain), buffer_front(
rd_.buf.data()));
clamp(rd_remain_), buffer_front(
rd_buf_.data()));
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
did_read = true;
@@ -1152,7 +1152,7 @@ loop:
break;
}
}
else if(rd_.fh.fin)
else if(rd_fh_.fin)
{
// append the empty block codes
static std::uint8_t constexpr
@@ -1169,7 +1169,7 @@ loop:
// https://github.com/madler/zlib/issues/280
BOOST_ASSERT(zs.total_out == 0);
cb.consume(zs.total_out);
rd_.size += zs.total_out;
rd_size_ += zs.total_out;
bytes_written += zs.total_out;
if(
(role_ == role_type::client &&
@@ -1177,7 +1177,7 @@ loop:
(role_ == role_type::server &&
pmd_config_.client_no_context_takeover))
pmd_->zi.reset();
rd_.done = true;
rd_done_ = true;
break;
}
else
@@ -1190,24 +1190,24 @@ loop:
if(! open_)
return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_.size, zs.total_out, rd_msg_max_))
rd_size_, zs.total_out, rd_msg_max_))
{
do_fail(close_code::too_big,
error::failed, ec);
return bytes_written;
}
cb.consume(zs.total_out);
rd_.size += zs.total_out;
rd_.remain -= zs.total_in;
rd_.buf.consume(zs.total_in);
rd_size_ += zs.total_out;
rd_remain_ -= zs.total_in;
rd_buf_.consume(zs.total_in);
bytes_written += zs.total_out;
}
if(rd_.op == detail::opcode::text)
if(rd_op_ == detail::opcode::text)
{
// check utf8
if(! rd_.utf8.write(
if(! rd_utf8_.write(
buffer_prefix(bytes_written, buffers)) || (
rd_.done && ! rd_.utf8.finish()))
rd_done_ && ! rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
do_fail(close_code::bad_payload,

View File

@@ -45,9 +45,9 @@ template<class... Args>
stream<NextLayer>::
stream(Args&&... args)
: stream_(std::forward<Args>(args)...)
, t_(1)
, tok_(1)
{
BOOST_ASSERT(rd_.buf.max_size() >=
BOOST_ASSERT(rd_buf_.max_size() >=
max_control_frame_size);
}
@@ -60,26 +60,26 @@ read_size_hint(
using beast::detail::clamp;
std::size_t result;
BOOST_ASSERT(initial_size > 0);
if(! pmd_ || (! rd_.done && ! pmd_->rd_set))
if(! pmd_ || (! rd_done_ && ! pmd_->rd_set))
{
// current message is uncompressed
if(rd_.done)
if(rd_done_)
{
// first message frame
result = initial_size;
goto done;
}
else if(rd_.fh.fin)
else if(rd_fh_.fin)
{
// last message frame
BOOST_ASSERT(rd_.remain > 0);
result = clamp(rd_.remain);
BOOST_ASSERT(rd_remain_ > 0);
result = clamp(rd_remain_);
goto done;
}
}
result = (std::max)(
initial_size, clamp(rd_.remain));
initial_size, clamp(rd_remain_));
done:
BOOST_ASSERT(result != 0);
return result;
@@ -135,21 +135,20 @@ open(role_type role)
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
open_ = true;
rd_.remain = 0;
rd_.cont = false;
rd_.done = true;
rd_remain_ = 0;
rd_cont_ = false;
rd_done_ = true;
// Can't clear this because accept uses it
//rd_.buf.reset();
rd_.fh.fin = false;
//rd_buf_.reset();
rd_fh_.fin = false;
rd_close_ = false;
wr_close_ = false;
wr_block_.reset();
rd_block_.reset();
cr_.code = close_code::none;
ping_data_ = nullptr; // should be nullptr on close anyway
wr_.cont = false;
wr_.buf_size = 0;
wr_cont_ = false;
wr_buf_size_ = 0;
if(((role_ == role_type::client && pmd_opts_.client_enable) ||
(role_ == role_type::server && pmd_opts_.server_enable)) &&
@@ -185,7 +184,7 @@ void
stream<NextLayer>::
close()
{
wr_.buf.reset();
wr_buf_.reset();
pmd_.reset();
}
@@ -196,44 +195,43 @@ reset()
{
BOOST_ASSERT(! open_);
open_ = false; // VFALCO is this needed?
rd_.remain = 0;
rd_.cont = false;
rd_.done = true;
rd_.buf.consume(rd_.buf.size());
rd_.fh.fin = false;
rd_remain_ = 0;
rd_cont_ = false;
rd_done_ = true;
rd_buf_.consume(rd_buf_.size());
rd_fh_.fin = false;
rd_close_ = false;
wr_close_ = false;
wr_.cont = false;
wr_cont_ = false;
wr_block_.reset();
rd_block_.reset();
cr_.code = close_code::none;
ping_data_ = nullptr; // should be nullptr on close anyway
}
// Called before each write frame
template<class NextLayer>
void
stream<NextLayer>::
wr_begin()
begin_msg()
{
wr_.autofrag = wr_autofrag_;
wr_.compress = static_cast<bool>(pmd_);
wr_frag_ = wr_frag_opt_;
wr_compress_ = static_cast<bool>(pmd_);
// Maintain the write buffer
if( wr_.compress ||
if( wr_compress_ ||
role_ == role_type::client)
{
if(! wr_.buf || wr_.buf_size != wr_buf_size_)
if(! wr_buf_ || wr_buf_size_ != wr_buf_opt_)
{
wr_.buf_size = wr_buf_size_;
wr_.buf = boost::make_unique_noinit<
std::uint8_t[]>(wr_.buf_size);
wr_buf_size_ = wr_buf_opt_;
wr_buf_ = boost::make_unique_noinit<
std::uint8_t[]>(wr_buf_size_);
}
}
else
{
wr_.buf_size = wr_buf_size_;
wr_.buf.reset();
wr_buf_size_ = wr_buf_opt_;
wr_buf_.reset();
}
}
@@ -298,7 +296,7 @@ parse_fh(
{
case detail::opcode::binary:
case detail::opcode::text:
if(rd_.cont)
if(rd_cont_)
{
// new data frame when continuation expected
return err(close_code::protocol_error);
@@ -314,7 +312,7 @@ parse_fh(
break;
case detail::opcode::cont:
if(! rd_.cont)
if(! rd_cont_)
{
// continuation without an active message
return err(close_code::protocol_error);
@@ -393,7 +391,7 @@ parse_fh(
BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp));
cb.consume(buffer_copy(buffer(tmp), cb));
fh.key = detail::little_uint32_to_native(&tmp[0]);
detail::prepare_key(rd_.key, fh.key);
detail::prepare_key(rd_key_, fh.key);
}
else
{
@@ -404,23 +402,23 @@ parse_fh(
{
if(fh.op != detail::opcode::cont)
{
rd_.size = 0;
rd_.op = fh.op;
rd_size_ = 0;
rd_op_ = fh.op;
}
else
{
if(rd_.size > (std::numeric_limits<
if(rd_size_ > (std::numeric_limits<
std::uint64_t>::max)() - fh.len)
return err(close_code::too_big);
}
if(! pmd_ || ! pmd_->rd_set)
{
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_.size, fh.len, rd_msg_max_))
rd_size_, fh.len, rd_msg_max_))
return err(close_code::too_big);
}
rd_.cont = ! fh.fin;
rd_.remain = fh.len;
rd_cont_ = ! fh.fin;
rd_remain_ = fh.len;
}
b.consume(b.size() - buffer_size(cb));
code = close_code::none;
@@ -444,7 +442,7 @@ write_close(DynamicBuffer& db, close_reason const& cr)
0 : 2 + cr.reason.size();
fh.mask = role_ == role_type::client;
if(fh.mask)
fh.key = maskgen_();
fh.key = wr_gen_();
detail::write(db, fh);
if(cr.code != close_code::none)
{
@@ -491,7 +489,7 @@ write_ping(DynamicBuffer& db,
fh.len = data.size();
fh.mask = role_ == role_type::client;
if(fh.mask)
fh.key = maskgen_();
fh.key = wr_gen_();
detail::write(db, fh);
if(data.empty())
return;
@@ -525,7 +523,7 @@ build_request(detail::sec_ws_key_type& key,
req.set(http::field::host, host);
req.set(http::field::upgrade, "websocket");
req.set(http::field::connection, "upgrade");
detail::make_sec_ws_key(key, maskgen_);
detail::make_sec_ws_key(key, wr_gen_);
req.set(http::field::sec_websocket_key, key);
req.set(http::field::sec_websocket_version, "13");
if(pmd_opts_.client_enable)

View File

@@ -63,7 +63,7 @@ public:
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, cb_(bs)
, tok_(ws_.t_.unique())
, tok_(ws_.tok_.unique())
, fin_(fin)
{
}
@@ -149,10 +149,10 @@ operator()(error_code ec,
BOOST_ASIO_CORO_REENTER(*this)
{
// Set up the outgoing frame header
if(! ws_.wr_.cont)
if(! ws_.wr_cont_)
{
ws_.wr_begin();
fh_.rsv1 = ws_.wr_.compress;
ws_.begin_msg();
fh_.rsv1 = ws_.wr_compress_;
}
else
{
@@ -160,27 +160,27 @@ operator()(error_code ec,
}
fh_.rsv2 = false;
fh_.rsv3 = false;
fh_.op = ws_.wr_.cont ?
fh_.op = ws_.wr_cont_ ?
detail::opcode::cont : ws_.wr_opcode_;
fh_.mask =
ws_.role_ == role_type::client;
// Choose a write algorithm
if(ws_.wr_.compress)
if(ws_.wr_compress_)
{
how_ = do_deflate;
}
else if(! fh_.mask)
{
if(! ws_.wr_.autofrag)
if(! ws_.wr_frag_)
{
how_ = do_nomask_nofrag;
}
else
{
BOOST_ASSERT(ws_.wr_.buf_size != 0);
BOOST_ASSERT(ws_.wr_buf_size_ != 0);
remain_ = buffer_size(cb_);
if(remain_ > ws_.wr_.buf_size)
if(remain_ > ws_.wr_buf_size_)
how_ = do_nomask_frag;
else
how_ = do_nomask_nofrag;
@@ -188,15 +188,15 @@ operator()(error_code ec,
}
else
{
if(! ws_.wr_.autofrag)
if(! ws_.wr_frag_)
{
how_ = do_mask_nofrag;
}
else
{
BOOST_ASSERT(ws_.wr_.buf_size != 0);
BOOST_ASSERT(ws_.wr_buf_size_ != 0);
remain_ = buffer_size(cb_);
if(remain_ > ws_.wr_.buf_size)
if(remain_ > ws_.wr_buf_size_)
how_ = do_mask_frag;
else
how_ = do_mask_nofrag;
@@ -225,7 +225,7 @@ operator()(error_code ec,
// Suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.wr_op_.save(std::move(*this));
ws_.paused_wr_.save(std::move(*this));
// Acquire the write block
BOOST_ASSERT(! ws_.wr_block_);
@@ -250,15 +250,15 @@ operator()(error_code ec,
{
fh_.fin = fin_;
fh_.len = buffer_size(cb_);
ws_.wr_.fb.reset();
ws_.wr_fb_.reset();
detail::write<flat_static_buffer_base>(
ws_.wr_.fb, fh_);
ws_.wr_.cont = ! fin_;
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer_cat(ws_.wr_.fb.data(), cb_),
buffer_cat(ws_.wr_fb_.data(), cb_),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
@@ -272,19 +272,19 @@ operator()(error_code ec,
{
for(;;)
{
fh_.len = clamp(remain_, ws_.wr_.buf_size);
fh_.len = clamp(remain_, ws_.wr_buf_size_);
remain_ -= clamp(fh_.len);
fh_.fin = fin_ ? remain_ == 0 : false;
ws_.wr_.fb.reset();
ws_.wr_fb_.reset();
detail::write<flat_static_buffer_base>(
ws_.wr_.fb, fh_);
ws_.wr_.cont = ! fin_;
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(
ws_.stream_, buffer_cat(
ws_.wr_.fb.data(), buffer_prefix(
ws_.wr_fb_.data(), buffer_prefix(
clamp(fh_.len), cb_)),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
@@ -296,14 +296,14 @@ operator()(error_code ec,
if(remain_ == 0)
goto upcall;
cb_.consume(
bytes_transferred - ws_.wr_.fb.size());
bytes_transferred - ws_.wr_fb_.size());
fh_.op = detail::opcode::cont;
// Allow outgoing control frames to
// be sent in between message frames
ws_.wr_block_.reset();
if( ws_.close_op_.maybe_invoke() ||
ws_.rd_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke())
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
{
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
@@ -321,24 +321,24 @@ operator()(error_code ec,
remain_ = buffer_size(cb_);
fh_.fin = fin_;
fh_.len = remain_;
fh_.key = ws_.maskgen_();
fh_.key = ws_.wr_gen_();
detail::prepare_key(key_, fh_.key);
ws_.wr_.fb.reset();
ws_.wr_fb_.reset();
detail::write<flat_static_buffer_base>(
ws_.wr_.fb, fh_);
n = clamp(remain_, ws_.wr_.buf_size);
ws_.wr_fb_, fh_);
n = clamp(remain_, ws_.wr_buf_size_);
buffer_copy(buffer(
ws_.wr_.buf.get(), n), cb_);
ws_.wr_buf_.get(), n), cb_);
detail::mask_inplace(buffer(
ws_.wr_.buf.get(), n), key_);
ws_.wr_buf_.get(), n), key_);
remain_ -= n;
ws_.wr_.cont = ! fin_;
ws_.wr_cont_ = ! fin_;
// Send frame header and partial payload
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(
ws_.stream_, buffer_cat(ws_.wr_.fb.data(),
buffer(ws_.wr_.buf.get(), n)),
ws_.stream_, buffer_cat(ws_.wr_fb_.data(),
buffer(ws_.wr_buf_.get(), n)),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
@@ -348,18 +348,18 @@ operator()(error_code ec,
}
while(remain_ > 0)
{
cb_.consume(ws_.wr_.buf_size);
n = clamp(remain_, ws_.wr_.buf_size);
cb_.consume(ws_.wr_buf_size_);
n = clamp(remain_, ws_.wr_buf_size_);
buffer_copy(buffer(
ws_.wr_.buf.get(), n), cb_);
ws_.wr_buf_.get(), n), cb_);
detail::mask_inplace(buffer(
ws_.wr_.buf.get(), n), key_);
ws_.wr_buf_.get(), n), key_);
remain_ -= n;
// Send partial payload
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer(ws_.wr_.buf.get(), n),
buffer(ws_.wr_buf_.get(), n),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
@@ -377,26 +377,26 @@ operator()(error_code ec,
{
for(;;)
{
n = clamp(remain_, ws_.wr_.buf_size);
n = clamp(remain_, ws_.wr_buf_size_);
remain_ -= n;
fh_.len = n;
fh_.key = ws_.maskgen_();
fh_.key = ws_.wr_gen_();
fh_.fin = fin_ ? remain_ == 0 : false;
detail::prepare_key(key_, fh_.key);
buffer_copy(buffer(
ws_.wr_.buf.get(), n), cb_);
ws_.wr_buf_.get(), n), cb_);
detail::mask_inplace(buffer(
ws_.wr_.buf.get(), n), key_);
ws_.wr_.fb.reset();
ws_.wr_buf_.get(), n), key_);
ws_.wr_fb_.reset();
detail::write<flat_static_buffer_base>(
ws_.wr_.fb, fh_);
ws_.wr_.cont = ! fin_;
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer_cat(ws_.wr_.fb.data(),
buffer(ws_.wr_.buf.get(), n)),
buffer_cat(ws_.wr_fb_.data(),
buffer(ws_.wr_buf_.get(), n)),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
@@ -407,14 +407,14 @@ operator()(error_code ec,
if(remain_ == 0)
goto upcall;
cb_.consume(
bytes_transferred - ws_.wr_.fb.size());
bytes_transferred - ws_.wr_fb_.size());
fh_.op = detail::opcode::cont;
// Allow outgoing control frames to
// be sent in between message frames:
ws_.wr_block_.reset();
if( ws_.close_op_.maybe_invoke() ||
ws_.rd_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke())
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
{
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
@@ -431,8 +431,8 @@ operator()(error_code ec,
{
for(;;)
{
b = buffer(ws_.wr_.buf.get(),
ws_.wr_.buf_size);
b = buffer(ws_.wr_buf_.get(),
ws_.wr_buf_size_);
more_ = detail::deflate(
ws_.pmd_->zo, b, cb_, fin_, ec);
ws_.open_ = ! ec;
@@ -464,22 +464,22 @@ operator()(error_code ec,
}
if(fh_.mask)
{
fh_.key = ws_.maskgen_();
fh_.key = ws_.wr_gen_();
detail::prepared_key key;
detail::prepare_key(key, fh_.key);
detail::mask_inplace(b, key);
}
fh_.fin = ! more_;
fh_.len = n;
ws_.wr_.fb.reset();
ws_.wr_fb_.reset();
detail::write<
flat_static_buffer_base>(ws_.wr_.fb, fh_);
ws_.wr_.cont = ! fin_;
flat_static_buffer_base>(ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer_cat(ws_.wr_.fb.data(),
buffer_cat(ws_.wr_fb_.data(),
mutable_buffers_1{b}), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec)
@@ -494,9 +494,9 @@ operator()(error_code ec,
// Allow outgoing control frames to
// be sent in between message frames:
ws_.wr_block_.reset();
if( ws_.close_op_.maybe_invoke() ||
ws_.rd_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke())
if( ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke())
{
BOOST_ASIO_CORO_YIELD
ws_.get_io_service().post(
@@ -524,9 +524,9 @@ operator()(error_code ec,
upcall:
BOOST_ASSERT(ws_.wr_block_ == tok_);
ws_.wr_block_.reset();
ws_.close_op_.maybe_invoke() ||
ws_.rd_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke();
h_(ec);
}
}
@@ -573,10 +573,10 @@ write_some(bool fin,
return;
}
detail::frame_header fh;
if(! wr_.cont)
if(! wr_cont_)
{
wr_begin();
fh.rsv1 = wr_.compress;
begin_msg();
fh.rsv1 = wr_compress_;
}
else
{
@@ -584,18 +584,18 @@ write_some(bool fin,
}
fh.rsv2 = false;
fh.rsv3 = false;
fh.op = wr_.cont ?
fh.op = wr_cont_ ?
detail::opcode::cont : wr_opcode_;
fh.mask = role_ == role_type::client;
auto remain = buffer_size(buffers);
if(wr_.compress)
if(wr_compress_)
{
consuming_buffers<
ConstBufferSequence> cb{buffers};
for(;;)
{
auto b = buffer(
wr_.buf.get(), wr_.buf_size);
wr_buf_.get(), wr_buf_size_);
auto const more = detail::deflate(
pmd_->zo, b, cb, fin, ec);
open_ = ! ec;
@@ -614,7 +614,7 @@ write_some(bool fin,
}
if(fh.mask)
{
fh.key = maskgen_();
fh.key = wr_gen_();
detail::prepared_key key;
detail::prepare_key(key, fh.key);
detail::mask_inplace(b, key);
@@ -624,7 +624,7 @@ write_some(bool fin,
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
wr_.cont = ! fin;
wr_cont_ = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
open_ = ! ec;
@@ -645,7 +645,7 @@ write_some(bool fin,
}
if(! fh.mask)
{
if(! wr_.autofrag)
if(! wr_frag_)
{
// no mask, no autofrag
fh.fin = fin;
@@ -653,7 +653,7 @@ write_some(bool fin,
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
wr_.cont = ! fin;
wr_cont_ = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffers), ec);
open_ = ! ec;
@@ -663,19 +663,19 @@ write_some(bool fin,
else
{
// no mask, autofrag
BOOST_ASSERT(wr_.buf_size != 0);
BOOST_ASSERT(wr_buf_size_ != 0);
consuming_buffers<
ConstBufferSequence> cb{buffers};
for(;;)
{
auto const n = clamp(remain, wr_.buf_size);
auto const n = clamp(remain, wr_buf_size_);
remain -= n;
fh.len = n;
fh.fin = fin ? remain == 0 : false;
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);
wr_.cont = ! fin;
wr_cont_ = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(),
buffer_prefix(n, cb)), ec);
@@ -690,12 +690,12 @@ write_some(bool fin,
}
return;
}
if(! wr_.autofrag)
if(! wr_frag_)
{
// mask, no autofrag
fh.fin = fin;
fh.len = remain;
fh.key = maskgen_();
fh.key = wr_gen_();
detail::prepared_key key;
detail::prepare_key(key, fh.key);
detail::fh_buffer fh_buf;
@@ -704,13 +704,13 @@ write_some(bool fin,
consuming_buffers<
ConstBufferSequence> cb{buffers};
{
auto const n = clamp(remain, wr_.buf_size);
auto const b = buffer(wr_.buf.get(), n);
auto const n = clamp(remain, wr_buf_size_);
auto const b = buffer(wr_buf_.get(), n);
buffer_copy(b, cb);
cb.consume(n);
remain -= n;
detail::mask_inplace(b, key);
wr_.cont = ! fin;
wr_cont_ = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
open_ = ! ec;
@@ -719,8 +719,8 @@ write_some(bool fin,
}
while(remain > 0)
{
auto const n = clamp(remain, wr_.buf_size);
auto const b = buffer(wr_.buf.get(), n);
auto const n = clamp(remain, wr_buf_size_);
auto const b = buffer(wr_buf_.get(), n);
buffer_copy(b, cb);
cb.consume(n);
remain -= n;
@@ -734,22 +734,22 @@ write_some(bool fin,
}
{
// mask, autofrag
BOOST_ASSERT(wr_.buf_size != 0);
BOOST_ASSERT(wr_buf_size_ != 0);
consuming_buffers<
ConstBufferSequence> cb{buffers};
for(;;)
{
fh.key = maskgen_();
fh.key = wr_gen_();
detail::prepared_key key;
detail::prepare_key(key, fh.key);
auto const n = clamp(remain, wr_.buf_size);
auto const b = buffer(wr_.buf.get(), n);
auto const n = clamp(remain, wr_buf_size_);
auto const b = buffer(wr_buf_.get(), n);
buffer_copy(b, cb);
detail::mask_inplace(b, key);
fh.len = n;
remain -= n;
fh.fin = fin ? remain == 0 : false;
wr_.cont = ! fh.fin;
wr_cont_ = ! fh.fin;
detail::fh_buffer fh_buf;
detail::write<
flat_static_buffer_base>(fh_buf, fh);

View File

@@ -146,67 +146,6 @@ class stream
void reset() { id_ = 0; }
};
using control_cb_type =
std::function<void(frame_type, string_view)>;
// State information for the message being received
//
struct rd_t
{
detail::frame_header fh; // current frame header
detail::prepared_key key; // current stateful mask key
std::uint64_t size; // total size of current message so far
std::uint64_t remain; // message frame bytes left in current frame
detail::frame_buffer fb; // to write control frames (during reads)
detail::utf8_checker utf8; // to validate utf8
// A small, circular buffer to read frame headers.
// This improves performance by avoiding small reads.
static_buffer<+tcp_frame_size> buf;
// opcode of current message being read
detail::opcode op;
// `true` if the next frame is a continuation.
bool cont;
bool done; // set when a message is done
};
// State information for the message being sent
//
struct wr_t
{
// `true` if next frame is a continuation,
// `false` if next frame starts a new message
bool cont;
// `true` if this message should be auto-fragmented
// This gets set to the auto-fragment option at the beginning
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool autofrag;
// `true` if this message should be compressed.
// This gets set to the compress option at the beginning of
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool compress;
// Size of the write buffer.
// This gets set to the write buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t buf_size;
// The write buffer. Used for compression and masking.
// The buffer is allocated or reallocated at the beginning of
// sending a message.
std::unique_ptr<std::uint8_t[]> buf;
detail::fh_buffer fb;
};
// State information for the permessage-deflate extension
struct pmd_t
{
@@ -217,46 +156,61 @@ class stream
zlib::inflate_stream zi;
};
NextLayer stream_; // the wrapped stream
detail::maskgen maskgen_; // source of mask keys
std::size_t rd_msg_max_ =
16 * 1024 * 1024; // max message size
bool wr_autofrag_ = true; // auto fragment
std::size_t wr_buf_size_ = 4096; // write buffer size
std::size_t rd_buf_size_ = 4096; // read buffer size
detail::opcode wr_opcode_ =
detail::opcode::text; // outgoing message type
control_cb_type ctrl_cb_; // control callback
role_type role_; // server or client
bool open_ = false; // `true` if established
using control_cb_type =
std::function<void(frame_type, string_view)>;
bool rd_close_; // read close frame
bool wr_close_; // sent close frame
token wr_block_; // op currenly writing
token rd_block_; // op currenly reading
NextLayer stream_; // the wrapped stream
close_reason cr_; // set from received close frame
control_cb_type ctrl_cb_; // control callback
ping_data* ping_data_; // where to put the payload
detail::pausation rd_op_; // paused read op
detail::pausation wr_op_; // paused write op
detail::pausation ping_op_; // paused ping op
detail::pausation close_op_; // paused close op
detail::pausation r_rd_op_; // paused read op (read)
detail::pausation r_close_op_; // paused close op (read)
close_reason cr_; // set from received close frame
rd_t rd_; // read state
wr_t wr_; // write state
std::size_t rd_msg_max_ // max message size
= 16 * 1024 * 1024;
std::uint64_t rd_size_; // total size of current message so far
std::uint64_t rd_remain_; // message frame bytes left in current frame
detail::frame_header rd_fh_; // current frame header
detail::prepared_key rd_key_; // current stateful mask key
detail::frame_buffer rd_fb_; // to write control frames (during reads)
detail::utf8_checker rd_utf8_; // to validate utf8
static_buffer<
+tcp_frame_size> rd_buf_; // buffer for reads
detail::opcode rd_op_; // current message binary or text
bool rd_cont_; // `true` if the next frame is a continuation
bool rd_done_; // set when a message is done
bool rd_close_; // did we read a close frame?
token rd_block_; // op currenly reading
// If not engaged, then permessage-deflate is not
// enabled for the currently active session.
std::unique_ptr<pmd_t> pmd_;
token tok_; // used to order asynchronous ops
role_type role_; // server or client
bool open_ // `true` if connected
= false;
// Local options for permessage-deflate
permessage_deflate pmd_opts_;
token wr_block_; // op currenly writing
bool wr_close_; // did we write a close frame?
bool wr_cont_; // next write is a continuation
bool wr_frag_; // autofrag the current message
bool wr_frag_opt_ // autofrag option setting
= true;
bool wr_compress_; // compress current message
detail::opcode wr_opcode_ // message type
= detail::opcode::text;
std::unique_ptr<
std::uint8_t[]> wr_buf_; // write buffer
std::size_t wr_buf_size_; // write buffer size (current message)
std::size_t wr_buf_opt_ // write buffer size option setting
= 4096;
detail::fh_buffer wr_fb_; // header buffer used for writes
detail::maskgen wr_gen_; // source of mask keys
// Offer for clients, negotiated result for servers
detail::pmd_offer pmd_config_;
detail::pausation paused_rd_; // paused read op
detail::pausation paused_wr_; // paused write op
detail::pausation paused_ping_; // paused ping op
detail::pausation paused_close_; // paused close op
detail::pausation paused_r_rd_; // paused read op (read)
detail::pausation paused_r_close_;// paused close op (read)
token t_;
std::unique_ptr<pmd_t> pmd_; // pmd settings or nullptr
permessage_deflate pmd_opts_; // local pmd options
detail::pmd_offer pmd_config_; // offer (client) or negotiation (server)
public:
/// The type of the next layer.
@@ -402,7 +356,7 @@ public:
bool
got_binary() const
{
return rd_.op == detail::opcode::binary;
return rd_op_ == detail::opcode::binary;
}
/** Returns `true` if the latest message data indicates text.
@@ -424,7 +378,7 @@ public:
bool
is_message_done() const
{
return rd_.done;
return rd_done_;
}
/** Returns the close reason received from the peer.
@@ -516,14 +470,14 @@ public:
void
auto_fragment(bool value)
{
wr_autofrag_ = value;
wr_frag_opt_ = value;
}
/// Returns `true` if the automatic fragmentation option is set.
bool
auto_fragment() const
{
return wr_autofrag_;
return wr_frag_opt_;
}
/** Set the binary message option.
@@ -682,14 +636,14 @@ public:
if(amount < 8)
BOOST_THROW_EXCEPTION(std::invalid_argument{
"write buffer size underflow"});
wr_buf_size_ = amount;
wr_buf_opt_ = amount;
};
/// Returns the size of the write buffer.
std::size_t
write_buffer_size() const
{
return wr_buf_size_;
return wr_buf_opt_;
}
/** Set the text message option.
@@ -3412,7 +3366,7 @@ private:
void open(role_type role);
void close();
void reset();
void wr_begin();
void begin_msg();
template<class DynamicBuffer>
bool