WebSocket optimizations (API Change):

The websocket stream is optimized to contain a small
circular static buffer, reducing the number of I/O calls when
reading data. The size of the buffer is tuned for maximum
performance with TCP/IP and no long needs configuration:

* read_some replaces read_frame
* write_some replaces write_Frame
* async_read_some replaces async_read_frame
* async_write_some replaces async_write_frame

* websocket::stream::read_buffer_size is removed

Actions Required:

* Remove calls websocket::stream::read_buffer_size

* Use read_some and write_some instead of read_frame and write_frame
This commit is contained in:
Vinnie Falco
2017-07-15 17:05:24 -07:00
parent 28b5275c73
commit cb501a07c8
21 changed files with 2350 additions and 1241 deletions

View File

@@ -6,6 +6,19 @@ Version 84:
* bind_handler allows placeholders
* Add consuming_buffers::get
WebSocket:
* WebSocket read optimization
API Changes:
* websocket::stream::read_buffer_size is removed
Actions Required:
* Remove calls websocket::stream::read_buffer_size
* Use read_some and write_some instead of read_frame and write_frame
--------------------------------------------------------------------------------
Version 83:

View File

@@ -328,7 +328,7 @@ without bringing it all into memory.
[```
template<class ConstBufferSequence>
void
write_frame(bool fin,
write_some(bool fin,
ConstBufferSequence const& buffers);
```]
[

View File

@@ -22,6 +22,7 @@
#include "file_service.hpp"
#include "ws_upgrade_service.hpp"
#include <boost/asio/signal_set.hpp>
#include <boost/program_options.hpp>
#include <iostream>

View File

@@ -1,10 +1,12 @@
# Part of Beast
GroupSources(include/beast beast)
GroupSources(example/common common)
GroupSources(example/websocket-server-async "/")
add_executable (websocket-server-async
${BEAST_INCLUDES}
${COMMON_INCLUDES}
websocket_server_async.cpp
)

View File

@@ -96,14 +96,14 @@ enum class opcode : std::uint8_t
// Contents of a WebSocket frame header
struct frame_header
{
opcode op;
bool fin;
bool mask;
bool rsv1;
bool rsv2;
bool rsv3;
std::uint64_t len;
std::uint32_t key;
opcode op;
bool fin : 1;
bool mask : 1;
bool rsv1 : 1;
bool rsv2 : 1;
bool rsv3 : 1;
};
// holds the largest possible frame header
@@ -226,7 +226,7 @@ write(DynamicBuffer& db, frame_header const& fh)
//
template<class Buffers>
void
read(ping_data& data, Buffers const& bs)
read_ping(ping_data& data, Buffers const& bs)
{
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
@@ -242,7 +242,7 @@ read(ping_data& data, Buffers const& bs)
//
template<class Buffers>
void
read(close_reason& cr,
read_close(close_reason& cr,
Buffers const& bs, close_code& code)
{
using boost::asio::buffer;
@@ -279,7 +279,7 @@ read(close_reason& cr,
{
cr.reason.resize(n);
buffer_copy(buffer(&cr.reason[0], n), cb);
if(! detail::check_utf8(
if(! check_utf8(
cr.reason.data(), cr.reason.size()))
{
code = close_code::protocol_error;

View File

@@ -82,6 +82,60 @@ class pausation
void operator()();
};
template<class Op>
class saved_op
{
Op* op_ = nullptr;
public:
~saved_op()
{
using boost::asio::asio_handler_deallocate;
if(op_)
{
auto h = std::move(op_->handler());
op_->~Op();
asio_handler_deallocate(op_,
sizeof(*op_), std::addressof(h));
}
}
saved_op(saved_op&& other)
: op_(other.op_)
{
other.op_ = nullptr;
}
saved_op& operator=(saved_op&& other)
{
BOOST_ASSERT(! op_);
op_ = other.op_;
other.op_ = 0;
return *this;
}
explicit
saved_op(Op&& op)
{
using boost::asio::asio_handler_allocate;
new(asio_handler_allocate(sizeof(Op),
std::addressof(op.handler()))) Op{
std::move(op)};
}
void
operator()()
{
BOOST_ASSERT(op_);
Op op{std::move(*op_)};
using boost::asio::asio_handler_deallocate;
asio_handler_deallocate(op_,
sizeof(*op_), std::addressof(op_->handler()));
op_ = nullptr;
op();
}
};
using buf_type = char[sizeof(holder<exemplar>)];
base* base_ = nullptr;
@@ -125,6 +179,10 @@ public:
void
emplace(F&& f);
template<class F>
void
save(F&& f);
bool
maybe_invoke()
{
@@ -150,6 +208,13 @@ pausation::emplace(F&& f)
base_ = ::new(buf_) type{std::forward<F>(f)};
}
template<class F>
void
pausation::save(F&& f)
{
emplace(saved_op<F>{std::move(f)});
}
} // detail
} // websocket
} // beast

View File

@@ -24,7 +24,10 @@ enum class error
failed,
/// Upgrade handshake failed
handshake_failed
handshake_failed,
/// buffer overflow
buffer_overflow
};
} // websocket

View File

@@ -28,8 +28,6 @@
namespace beast {
namespace websocket {
//------------------------------------------------------------------------------
// Respond to an upgrade HTTP request
template<class NextLayer>
template<class Handler>

View File

@@ -35,11 +35,13 @@ class stream<NextLayer>::close_op
close_reason cr;
detail::frame_streambuf fb;
int state = 0;
token tok;
data(Handler&, stream<NextLayer>& ws_,
close_reason const& cr_)
: ws(ws_)
, cr(cr_)
, tok(ws.t_.unique())
{
ws.template write_close<
flat_static_buffer_base>(fb, cr);
@@ -114,7 +116,7 @@ operator()(error_code ec, std::size_t)
auto& d = *d_;
if(ec)
{
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.failed_ = true;
goto upcall;
}
@@ -128,7 +130,7 @@ operator()(error_code ec, std::size_t)
d.ws.close_op_.emplace(std::move(*this));
return;
}
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
@@ -140,7 +142,7 @@ operator()(error_code ec, std::size_t)
do_write:
// send close frame
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.state = 3;
d.ws.wr_close_ = true;
boost::asio::async_write(d.ws.stream_,
@@ -149,7 +151,7 @@ operator()(error_code ec, std::size_t)
case 1:
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
d.state = 2;
// The current context is safe but might not be
// the same as the one for this operation (since
@@ -161,7 +163,7 @@ operator()(error_code ec, std::size_t)
return;
case 2:
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
@@ -174,30 +176,15 @@ operator()(error_code ec, std::size_t)
break;
}
upcall:
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.wr_block_.reset();
d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke();
d_.invoke(ec);
}
template<class NextLayer>
template<class CloseHandler>
async_return_type<
CloseHandler, void(error_code)>
stream<NextLayer>::
async_close(close_reason const& cr, CloseHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
"AsyncStream requirements not met");
async_completion<CloseHandler,
void(error_code)> init{handler};
close_op<handler_type<
CloseHandler, void(error_code)>>{
init.completion_handler, *this, cr}({});
return init.result.get();
}
//------------------------------------------------------------------------------
template<class NextLayer>
void
@@ -219,9 +206,12 @@ close(close_reason const& cr, error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
BOOST_ASSERT(! wr_close_);
// If rd_close_ is set then we already sent a close
BOOST_ASSERT(! rd_close_);
if(wr_close_)
{
// Can't call close twice, abort operation
BOOST_ASSERT(! wr_close_);
ec = boost::asio::error::operation_aborted;
return;
}
@@ -230,9 +220,26 @@ close(close_reason const& cr, error_code& ec)
write_close<flat_static_buffer_base>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
failed_ = !!ec;
if(failed_)
return;
}
//------------------------------------------------------------------------------
template<class NextLayer>
template<class CloseHandler>
async_return_type<
CloseHandler, void(error_code)>
stream<NextLayer>::
async_close(close_reason const& cr, CloseHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
"AsyncStream requirements not met");
async_completion<CloseHandler,
void(error_code)> init{handler};
close_op<handler_type<
CloseHandler, void(error_code)>>{
init.completion_handler, *this, cr}({});
return init.result.get();
}
} // websocket
} // beast

View File

@@ -39,6 +39,7 @@ public:
case error::closed: return "WebSocket connection closed normally";
case error::failed: return "WebSocket connection failed due to a protocol violation";
case error::handshake_failed: return "WebSocket Upgrade handshake failed";
case error::buffer_overflow: return "buffer overflow";
default:
return "beast.websocket error";

View File

@@ -0,0 +1,233 @@
//
// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BEAST_WEBSOCKET_IMPL_FAIL_IPP
#define BEAST_WEBSOCKET_IMPL_FAIL_IPP
#include <beast/websocket/teardown.hpp>
#include <beast/core/bind_handler.hpp>
#include <beast/core/handler_ptr.hpp>
#include <beast/core/flat_static_buffer.hpp>
#include <beast/core/detail/config.hpp>
#include <boost/asio/handler_alloc_hook.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/optional.hpp>
#include <memory>
#include <type_traits>
#include <utility>
namespace beast {
namespace websocket {
// _Fail the WebSocket Connection_
//
template<class NextLayer>
template<class Handler>
class stream<NextLayer>::fail_op
{
Handler h_;
stream<NextLayer>& ws_;
int step_ = 0;
bool dispatched_ = false;
fail_how how_;
token tok_;
public:
fail_op(fail_op&&) = default;
fail_op(fail_op const&) = default;
// send close code, then teardown
template<class DeducedHandler>
fail_op(
DeducedHandler&& h,
stream<NextLayer>& ws,
close_code code)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, how_(fail_how::code)
, tok_(ws_.t_.unique())
{
ws_.rd_.fb.consume(ws_.rd_.fb.size());
ws_.template write_close<
flat_static_buffer_base>(
ws_.rd_.fb, code);
}
// maybe send frame in fb, then teardown
template<class DeducedHandler>
fail_op(
DeducedHandler&& h,
stream<NextLayer>& ws,
fail_how how)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, how_(how)
, tok_(ws_.t_.unique())
{
}
Handler&
handler()
{
return h_;
}
void operator()(error_code ec = {},
std::size_t bytes_transferred = 0);
friend
void* asio_handler_allocate(
std::size_t size, fail_op* op)
{
using boost::asio::asio_handler_allocate;
return asio_handler_allocate(
size, std::addressof(op->h_));
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, fail_op* op)
{
using boost::asio::asio_handler_deallocate;
asio_handler_deallocate(
p, size, std::addressof(op->h_));
}
friend
bool asio_handler_is_continuation(fail_op* op)
{
using boost::asio::asio_handler_is_continuation;
return op->dispatched_ ||
asio_handler_is_continuation(
std::addressof(op->h_));
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, fail_op* op)
{
using boost::asio::asio_handler_invoke;
asio_handler_invoke(f,
std::addressof(op->h_));
}
};
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::
fail_op<Handler>::
operator()(error_code ec, std::size_t)
{
enum
{
do_start = 0,
do_resume = 20,
do_teardown = 40
};
switch(step_)
{
case do_start:
// Acquire write block
if(ws_.wr_block_)
{
// suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
step_ = do_resume;
ws_.rd_op_.save(std::move(*this));
return;
}
ws_.wr_block_ = tok_;
goto go_write;
case do_resume:
BOOST_ASSERT(! ws_.wr_block_);
ws_.wr_block_ = tok_;
step_ = do_resume + 1;
// We were invoked from a foreign context, so post
return ws_.get_io_service().post(std::move(*this));
case do_resume + 1:
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
go_write:
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.failed_)
{
ws_.wr_block_.reset();
ec = boost::asio::error::operation_aborted;
break;
}
if(how_ == fail_how::teardown)
goto go_teardown;
if(ws_.wr_close_)
goto go_teardown;
// send close frame
step_ = do_teardown;
ws_.wr_close_ = true;
return boost::asio::async_write(
ws_.stream_, ws_.rd_.fb.data(),
std::move(*this));
case do_teardown:
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
{
ws_.wr_block_.reset();
break;
}
go_teardown:
BOOST_ASSERT(ws_.wr_block_ == tok_);
step_ = do_teardown + 1;
websocket_helpers::call_async_teardown(
ws_.next_layer(), std::move(*this));
return;
case do_teardown + 1:
BOOST_ASSERT(ws_.wr_block_ == tok_);
dispatched_ = true;
ws_.failed_ = true;
ws_.wr_block_.reset();
if(ec == boost::asio::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
if(! ec)
{
switch(how_)
{
default:
case fail_how::code:
case fail_how::teardown: ec = error::failed; break;
case fail_how::close: ec = error::closed; break;
}
}
break;
}
// upcall
BOOST_ASSERT(ws_.wr_block_ != tok_);
ws_.close_op_.maybe_invoke() ||
ws_.ping_op_.maybe_invoke() ||
ws_.wr_op_.maybe_invoke();
if(! dispatched_)
ws_.stream_.get_io_service().post(
bind_handler(std::move(h_), ec));
else
h_(ec);
}
} // websocket
} // beast
#endif

View File

@@ -35,10 +35,12 @@ class stream<NextLayer>::ping_op
stream<NextLayer>& ws;
detail::frame_streambuf fb;
int state = 0;
token tok;
data(Handler&, stream<NextLayer>& ws_,
detail::opcode op_, ping_data const& payload)
: ws(ws_)
, tok(ws.t_.unique())
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
@@ -115,7 +117,7 @@ operator()(error_code ec, std::size_t)
auto& d = *d_;
if(ec)
{
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.failed_ = true;
goto upcall;
}
@@ -129,7 +131,7 @@ operator()(error_code ec, std::size_t)
d.ws.ping_op_.emplace(std::move(*this));
return;
}
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
@@ -140,7 +142,7 @@ operator()(error_code ec, std::size_t)
do_write:
// send ping frame
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.state = 3;
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
@@ -148,7 +150,7 @@ operator()(error_code ec, std::size_t)
case 1:
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
d.state = 2;
// The current context is safe but might not be
// the same as the one for this operation (since
@@ -160,7 +162,7 @@ operator()(error_code ec, std::size_t)
return;
case 2:
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
@@ -173,8 +175,8 @@ operator()(error_code ec, std::size_t)
break;
}
upcall:
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.wr_block_.reset();
d.ws.close_op_.maybe_invoke() ||
d.ws.rd_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke();

File diff suppressed because it is too large Load Diff

View File

@@ -21,6 +21,7 @@
#include <beast/core/consuming_buffers.hpp>
#include <beast/core/flat_static_buffer.hpp>
#include <beast/core/type_traits.hpp>
#include <beast/core/detail/clamp.hpp>
#include <beast/core/detail/type_traits.hpp>
#include <boost/assert.hpp>
#include <boost/endian/buffers.hpp>
@@ -42,6 +43,61 @@ stream<NextLayer>::
stream(Args&&... args)
: stream_(std::forward<Args>(args)...)
{
BOOST_ASSERT(rd_.buf.max_size() >=
max_control_frame_size);
}
template<class NextLayer>
std::size_t
stream<NextLayer>::
read_size_hint(
std::size_t initial_size) const
{
using beast::detail::clamp;
// no permessage-deflate
if(! pmd_ || (! rd_.done && ! pmd_->rd_set))
{
// fresh message
if(rd_.done)
return initial_size;
if(rd_.fh.fin)
return clamp(rd_.remain);
}
return (std::max)(
initial_size, clamp(rd_.remain));
}
template<class NextLayer>
template<class DynamicBuffer, class>
std::size_t
stream<NextLayer>::
read_size_hint(
DynamicBuffer& buffer) const
{
static_assert(is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
using beast::detail::clamp;
// no permessage-deflate
if(! pmd_ || (! rd_.done && ! pmd_->rd_set))
{
// fresh message
if(rd_.done)
return (std::min)(
buffer.max_size(),
(std::max)(+tcp_frame_size,
buffer.capacity() - buffer.size()));
if(rd_.fh.fin)
{
BOOST_ASSERT(rd_.remain != 0);
return (std::min)(
buffer.max_size(), clamp(rd_.remain));
}
}
return (std::min)(buffer.max_size(), (std::max)(
(std::max)(+tcp_frame_size, clamp(rd_.remain)),
buffer.capacity() - buffer.size()));
}
template<class NextLayer>
@@ -78,10 +134,13 @@ open(role_type role)
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
failed_ = false;
rd_.remain = 0;
rd_.cont = false;
rd_.done = true;
rd_.buf.consume(rd_.buf.size());
rd_close_ = false;
wr_close_ = false;
wr_block_ = nullptr; // should be nullptr on close anyway
wr_block_.reset();
ping_data_ = nullptr; // should be nullptr on close anyway
wr_.cont = false;
@@ -121,7 +180,6 @@ void
stream<NextLayer>::
close()
{
rd_.buf.reset();
wr_.buf.reset();
pmd_.reset();
}
@@ -132,35 +190,20 @@ stream<NextLayer>::
reset()
{
failed_ = false;
rd_.remain = 0;
rd_.cont = false;
rd_.done = true;
rd_.buf.consume(rd_.buf.size());
rd_close_ = false;
wr_close_ = false;
wr_.cont = false;
wr_block_ = nullptr; // should be nullptr on close anyway
wr_block_.reset();
ping_data_ = nullptr; // should be nullptr on close anyway
stream_.buffer().consume(
stream_.buffer().size());
}
// Called before each read frame
template<class NextLayer>
void
stream<NextLayer>::
rd_begin()
{
// Maintain the read buffer
if(pmd_)
{
if(! rd_.buf || rd_.buf_size != rd_buf_size_)
{
rd_.buf_size = rd_buf_size_;
rd_.buf = boost::make_unique_noinit<
std::uint8_t[]>(rd_.buf_size);
}
}
}
// Called before each write frame
template<class NextLayer>
void
@@ -190,6 +233,194 @@ wr_begin()
//------------------------------------------------------------------------------
// Attempt to read a complete frame header.
// Returns `false` if more bytes are needed
template<class NextLayer>
template<class DynamicBuffer>
bool
stream<NextLayer>::
parse_fh(
detail::frame_header& fh,
DynamicBuffer& b,
close_code& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
auto const err =
[&](close_code cv)
{
code = cv;
return false;
};
if(buffer_size(b.data()) < 2)
{
code = close_code::none;
return false;
}
consuming_buffers<typename
DynamicBuffer::const_buffers_type> cb{
b.data()};
{
std::uint8_t tmp[2];
cb.consume(buffer_copy(buffer(tmp), cb));
std::size_t need;
fh.len = tmp[1] & 0x7f;
switch(fh.len)
{
case 126: need = 2; break;
case 127: need = 8; break;
default:
need = 0;
}
fh.mask = (tmp[1] & 0x80) != 0;
if(fh.mask)
need += 4;
if(buffer_size(cb) < need)
{
code = close_code::none;
return false;
}
fh.op = static_cast<
detail::opcode>(tmp[0] & 0x0f);
fh.fin = (tmp[0] & 0x80) != 0;
fh.rsv1 = (tmp[0] & 0x40) != 0;
fh.rsv2 = (tmp[0] & 0x20) != 0;
fh.rsv3 = (tmp[0] & 0x10) != 0;
}
switch(fh.op)
{
case detail::opcode::binary:
case detail::opcode::text:
if(rd_.cont)
{
// new data frame when continuation expected
return err(close_code::protocol_error);
}
if((fh.rsv1 && ! pmd_) ||
fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
if(pmd_)
pmd_->rd_set = fh.rsv1;
break;
case detail::opcode::cont:
if(! rd_.cont)
{
// continuation without an active message
return err(close_code::protocol_error);
}
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
default:
if(detail::is_reserved(fh.op))
{
// reserved opcode
return err(close_code::protocol_error);
}
if(! fh.fin)
{
// fragmented control message
return err(close_code::protocol_error);
}
if(fh.len > 125)
{
// invalid length for control message
return err(close_code::protocol_error);
}
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
break;
}
// unmasked frame from client
if(role_ == role_type::server && ! fh.mask)
return err(close_code::protocol_error);
// masked frame from server
if(role_ == role_type::client && fh.mask)
return err(close_code::protocol_error);
if(detail::is_control(fh.op) &&
buffer_size(cb) < fh.len)
{
// Make the entire control frame payload
// get read in before we return `true`
return false;
}
switch(fh.len)
{
case 126:
{
std::uint8_t tmp[2];
BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp));
cb.consume(buffer_copy(buffer(tmp), cb));
fh.len = detail::big_uint16_to_native(&tmp[0]);
// length not canonical
if(fh.len < 126)
return err(close_code::protocol_error);
break;
}
case 127:
{
std::uint8_t tmp[8];
BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp));
cb.consume(buffer_copy(buffer(tmp), cb));
fh.len = detail::big_uint64_to_native(&tmp[0]);
// length not canonical
if(fh.len < 65536)
return err(close_code::protocol_error);
break;
}
}
if(fh.mask)
{
std::uint8_t tmp[4];
BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp));
cb.consume(buffer_copy(buffer(tmp), cb));
fh.key = detail::little_uint32_to_native(&tmp[0]);
detail::prepare_key(rd_.key, fh.key);
}
else
{
// initialize this otherwise operator== breaks
fh.key = 0;
}
if(! detail::is_control(fh.op))
{
if(fh.op != detail::opcode::cont)
{
rd_.size = 0;
rd_.op = fh.op;
}
else
{
if(rd_.size > (std::numeric_limits<
std::uint64_t>::max)() - fh.len)
return err(close_code::too_big);
}
if(! pmd_ || ! pmd_->rd_set)
{
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_.size, fh.len, rd_msg_max_))
return err(close_code::too_big);
}
rd_.cont = ! fh.fin;
rd_.remain = fh.len;
}
b.consume(b.size() - buffer_size(cb));
code = close_code::none;
return true;
}
// Read fixed frame header from buffer
// Requires at least 2 bytes
//
@@ -357,7 +588,7 @@ read_fh2(detail::frame_header& fh,
// initialize this otherwise operator== breaks
fh.key = 0;
}
if(! is_control(fh.op))
if(! detail::is_control(fh.op))
{
if(fh.op != detail::opcode::cont)
{

View File

@@ -32,7 +32,7 @@ namespace websocket {
template<class NextLayer>
template<class Buffers, class Handler>
class stream<NextLayer>::write_frame_op
class stream<NextLayer>::write_some_op
{
struct data : op
{
@@ -46,12 +46,14 @@ class stream<NextLayer>::write_frame_op
std::uint64_t remain;
int step = 0;
int entry_state;
token tok;
data(Handler& handler, stream<NextLayer>& ws_,
bool fin_, Buffers const& bs)
: ws(ws_)
, cb(bs)
, fin(fin_)
, tok(ws.t_.unique())
{
using boost::asio::asio_handler_is_continuation;
cont = asio_handler_is_continuation(std::addressof(handler));
@@ -61,11 +63,11 @@ class stream<NextLayer>::write_frame_op
handler_ptr<data, Handler> d_;
public:
write_frame_op(write_frame_op&&) = default;
write_frame_op(write_frame_op const&) = default;
write_some_op(write_some_op&&) = default;
write_some_op(write_some_op const&) = default;
template<class DeducedHandler, class... Args>
write_frame_op(DeducedHandler&& h,
write_some_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
@@ -83,7 +85,7 @@ public:
friend
void* asio_handler_allocate(
std::size_t size, write_frame_op* op)
std::size_t size, write_some_op* op)
{
using boost::asio::asio_handler_allocate;
return asio_handler_allocate(
@@ -92,7 +94,7 @@ public:
friend
void asio_handler_deallocate(
void* p, std::size_t size, write_frame_op* op)
void* p, std::size_t size, write_some_op* op)
{
using boost::asio::asio_handler_deallocate;
asio_handler_deallocate(
@@ -100,14 +102,14 @@ public:
}
friend
bool asio_handler_is_continuation(write_frame_op* op)
bool asio_handler_is_continuation(write_some_op* op)
{
return op->d_->cont;
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, write_frame_op* op)
void asio_handler_invoke(Function&& f, write_some_op* op)
{
using boost::asio::asio_handler_invoke;
asio_handler_invoke(
@@ -119,7 +121,7 @@ template<class NextLayer>
template<class Buffers, class Handler>
void
stream<NextLayer>::
write_frame_op<Buffers, Handler>::
write_some_op<Buffers, Handler>::
operator()(error_code ec,
std::size_t bytes_transferred, bool again)
{
@@ -142,7 +144,7 @@ operator()(error_code ec,
d.cont = d.cont || again;
if(ec)
{
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.failed_ = true;
goto upcall;
}
@@ -212,7 +214,7 @@ loop:
//----------------------------------------------------------------------
case do_nomask_nofrag:
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.fh.fin = d.fin;
d.fh.len = buffer_size(d.cb);
detail::write<flat_static_buffer_base>(
@@ -229,7 +231,7 @@ loop:
go_nomask_frag:
case do_nomask_frag:
{
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
auto const n = clamp(
d.remain, d.ws.wr_.buf_size);
d.remain -= n;
@@ -248,8 +250,8 @@ loop:
}
case do_nomask_frag + 1:
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.wr_block_.reset();
d.cb.consume(
bytes_transferred - d.fh_buf.size());
d.fh_buf.consume(d.fh_buf.size());
@@ -264,14 +266,14 @@ loop:
return d.ws.get_io_service().post(
std::move(*this));
}
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
goto go_nomask_frag;
//----------------------------------------------------------------------
case do_mask_nofrag:
{
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.remain = buffer_size(d.cb);
d.fh.fin = d.fin;
d.fh.len = d.remain;
@@ -317,7 +319,7 @@ loop:
go_mask_frag:
case do_mask_frag:
{
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
auto const n = clamp(
d.remain, d.ws.wr_.buf_size);
d.remain -= n;
@@ -342,8 +344,8 @@ loop:
}
case do_mask_frag + 1:
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.ws.wr_block_.reset();
d.cb.consume(
bytes_transferred - d.fh_buf.size());
d.fh_buf.consume(d.fh_buf.size());
@@ -359,7 +361,7 @@ loop:
std::move(*this));
return;
}
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
goto go_mask_frag;
//----------------------------------------------------------------------
@@ -367,7 +369,7 @@ loop:
go_deflate:
case do_deflate:
{
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
auto b = buffer(d.ws.wr_.buf.get(),
d.ws.wr_.buf_size);
auto const more = detail::deflate(
@@ -414,9 +416,9 @@ loop:
}
case do_deflate + 1:
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
d.fh_buf.consume(d.fh_buf.size());
d.ws.wr_block_ = nullptr;
d.ws.wr_block_.reset();
d.fh.op = detail::opcode::cont;
d.fh.rsv1 = false;
// Allow outgoing control frames to
@@ -430,11 +432,11 @@ loop:
std::move(*this));
return;
}
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
goto go_deflate;
case do_deflate + 2:
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
if(d.fh.fin && (
(d.ws.role_ == role_type::client &&
d.ws.pmd_config_.client_no_context_takeover) ||
@@ -449,12 +451,12 @@ loop:
if(d.ws.wr_block_)
{
// suspend
BOOST_ASSERT(d.ws.wr_block_ != &d);
BOOST_ASSERT(d.ws.wr_block_ != d.tok);
d.step = do_maybe_suspend + 1;
d.ws.wr_op_.emplace(std::move(*this));
return;
}
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
@@ -467,7 +469,7 @@ loop:
case do_maybe_suspend + 1:
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
d.ws.wr_block_ = d.tok;
d.step = do_maybe_suspend + 2;
// The current context is safe but might not be
// the same as the one for this operation (since
@@ -479,7 +481,7 @@ loop:
return;
case do_maybe_suspend + 2:
BOOST_ASSERT(d.ws.wr_block_ == &d);
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
@@ -495,8 +497,8 @@ loop:
goto upcall;
}
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
if(d.ws.wr_block_ == d.tok)
d.ws.wr_block_.reset();
d.ws.close_op_.maybe_invoke() ||
d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke();
@@ -515,12 +517,14 @@ class stream<NextLayer>::write_op
stream<NextLayer>& ws;
consuming_buffers<Buffers> cb;
std::size_t remain;
token tok;
data(Handler&, stream<NextLayer>& ws_,
Buffers const& bs)
: ws(ws_)
, cb(bs)
, remain(boost::asio::buffer_size(cb))
, tok(ws.t_.unique())
{
}
};
@@ -604,7 +608,7 @@ operator()(error_code ec)
d.step = d.step ? 3 : 2;
auto const pb = buffer_prefix(n, d.cb);
d.cb.consume(n);
return d.ws.async_write_frame(
return d.ws.async_write_some(
fin, pb, std::move(*this));
}
@@ -621,7 +625,7 @@ template<class NextLayer>
template<class ConstBufferSequence>
void
stream<NextLayer>::
write_frame(bool fin, ConstBufferSequence const& buffers)
write_some(bool fin, ConstBufferSequence const& buffers)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
@@ -629,7 +633,7 @@ write_frame(bool fin, ConstBufferSequence const& buffers)
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
error_code ec;
write_frame(fin, buffers, ec);
write_some(fin, buffers, ec);
if(ec)
BOOST_THROW_EXCEPTION(system_error{ec});
}
@@ -638,7 +642,7 @@ template<class NextLayer>
template<class ConstBufferSequence>
void
stream<NextLayer>::
write_frame(bool fin,
write_some(bool fin,
ConstBufferSequence const& buffers, error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
@@ -845,7 +849,7 @@ template<class ConstBufferSequence, class WriteHandler>
async_return_type<
WriteHandler, void(error_code)>
stream<NextLayer>::
async_write_frame(bool fin,
async_write_some(bool fin,
ConstBufferSequence const& bs, WriteHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
@@ -855,7 +859,7 @@ async_write_frame(bool fin,
"ConstBufferSequence requirements not met");
async_completion<WriteHandler,
void(error_code)> init{handler};
write_frame_op<ConstBufferSequence, handler_type<
write_some_op<ConstBufferSequence, handler_type<
WriteHandler, void(error_code)>>{init.completion_handler,
*this, fin, bs}({}, 0, false);
return init.result.get();
@@ -891,7 +895,7 @@ write(ConstBufferSequence const& buffers, error_code& ec)
static_assert(beast::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
write_frame(true, buffers, ec);
write_some(true, buffers, ec);
}
template<class NextLayer>

View File

@@ -21,6 +21,7 @@
#include <beast/core/async_result.hpp>
#include <beast/core/buffered_read_stream.hpp>
#include <beast/core/flat_buffer.hpp>
#include <beast/core/static_buffer.hpp>
#include <beast/core/string.hpp>
#include <beast/core/detail/type_traits.hpp>
#include <beast/http/empty_body.hpp>
@@ -29,7 +30,6 @@
#include <beast/http/detail/type_traits.hpp>
#include <beast/zlib/deflate_stream.hpp>
#include <beast/zlib/inflate_stream.hpp>
#include <boost/asio.hpp>
#include <algorithm>
#include <cstdint>
#include <functional>
@@ -117,8 +117,30 @@ class stream
friend class stream_test;
friend class frame_test;
/* The read buffer has to be at least as large
as the largest possible control frame including
the frame header.
*/
static std::size_t constexpr max_control_frame_size = 2 + 8 + 4 + 125;
static std::size_t constexpr tcp_frame_size = 1536;
struct op {};
// tokens are used to order reads and writes
class token
{
unsigned char id_ = 1;
explicit token(unsigned char id) : id_(id) {}
public:
token() = default;
token(token const&) = default;
operator bool() const { return id_ != 0; }
bool operator==(token const& t) { return id_ == t.id_; }
bool operator!=(token const& t) { return id_ != t.id_; }
token unique() { token t{id_++}; if(id_ == 0) ++id_; return t; }
void reset() { id_ = 0; }
};
using control_cb_type =
std::function<void(frame_type, string_view)>;
@@ -136,26 +158,24 @@ class stream
//
struct rd_t
{
detail::frame_header fh; // current frame header
detail::prepared_key key; // current stateful mask key
std::uint64_t size; // total size of current message so far
std::uint64_t remain; // message frame bytes left in current frame
detail::frame_streambuf fb; // to write control frames
detail::utf8_checker utf8; // to validate utf8
// A small, circular buffer to read frame headers.
// This improves performance by avoiding small reads.
static_buffer<+tcp_frame_size> buf;
// opcode of current message being read
detail::opcode op;
// `true` if the next frame is a continuation.
bool cont;
// Checks that test messages are valid utf8
detail::utf8_checker utf8;
// Size of the current message so far.
std::uint64_t size;
// Size of the read buffer.
// This gets set to the read buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t buf_size;
// The read buffer. Used for compression and masking.
std::unique_ptr<std::uint8_t[]> buf;
bool done; // set when a message is done
};
// State information for the message being sent
@@ -216,7 +236,7 @@ class stream
bool rd_close_; // read close frame
bool wr_close_; // sent close frame
op* wr_block_; // op currenly writing
token wr_block_; // op currenly writing
ping_data* ping_data_; // where to put the payload
detail::pausation rd_op_; // paused read op
@@ -237,6 +257,8 @@ class stream
// Offer for clients, negotiated result for servers
detail::pmd_offer pmd_config_;
token t_;
public:
/// The type of the next layer.
using next_layer_type =
@@ -290,6 +312,8 @@ public:
explicit
stream(Args&&... args);
//--------------------------------------------------------------------------
/** Return the `io_service` associated with the stream
This function may be used to obtain the `io_service` object
@@ -361,6 +385,105 @@ public:
return stream_.lowest_layer();
}
//--------------------------------------------------------------------------
//
// Observers
//
//--------------------------------------------------------------------------
/** Returns `true` if the latest message data indicates binary.
This function informs the caller of whether the last
received message frame represents a message with the
binary opcode.
If there is no last message frame, the return value is
undefined.
*/
bool
got_binary() const
{
return rd_.op == detail::opcode::binary;
}
/** Returns `true` if the latest message data indicates text.
This function informs the caller of whether the last
received message frame represents a message with the
text opcode.
If there is no last message frame, the return value is
undefined.
*/
bool
got_text() const
{
return ! got_binary();
}
/// Returns `true` if the last completed read finished the current message.
bool
is_message_done() const
{
return rd_.done;
}
/** Returns the close reason received from the peer.
This is only valid after a read completes with error::closed.
*/
close_reason const&
reason() const
{
return cr_;
}
/** Returns a suggested maximum buffer size for the next call to read.
This function returns a reasonable upper limit on the number
of bytes for the size of the buffer passed in the next call
to read. The number is determined by the state of the current
frame and whether or not the permessage-deflate extension is
enabled.
@param initial_size A size representing the caller's desired
buffer size for when there is no information which may be used
to calculate a more specific value. For example, when reading
the first frame header of a message.
*/
std::size_t
read_size_hint(
std::size_t initial_size = +tcp_frame_size) const;
/** Returns a suggested maximum buffer size for the next call to read.
This function returns a reasonable upper limit on the number
of bytes for the size of the buffer passed in the next call
to read. The number is determined by the state of the current
frame and whether or not the permessage-deflate extension is
enabled.
@param buffer The buffer which will be used for reading. The
implementation will query the buffer to obtain the optimum
size of a subsequent call to `buffer.prepare` based on the
state of the current frame, if any.
*/
template<class DynamicBuffer
#if ! BEAST_DOXYGEN
, class = typename std::enable_if<
! std::is_integral<DynamicBuffer>::value>::type
#endif
>
std::size_t
read_size_hint(
DynamicBuffer& buffer) const;
//--------------------------------------------------------------------------
//
// Settings
//
//--------------------------------------------------------------------------
/// Set the permessage-deflate extension options
void
set_option(permessage_deflate const& o);
@@ -445,9 +568,9 @@ public:
of the following functions:
@li @ref beast::websocket::stream::read
@li @ref beast::websocket::stream::read_frame
@li @ref beast::websocket::stream::read_some
@li @ref beast::websocket::stream::async_read
@li @ref beast::websocket::stream::async_read_frame
@li @ref beast::websocket::stream::async_read_some
Unlike completion handlers, the callback will be invoked
for each control frame during a call to any synchronous
@@ -484,44 +607,6 @@ public:
ctrl_cb_ = std::move(cb);
}
/** Set the read buffer size option.
Sets the size of the read buffer used by the implementation to
receive frames. The read buffer is needed when permessage-deflate
is used.
Lowering the size of the buffer can decrease the memory requirements
for each connection, while increasing the size of the buffer can reduce
the number of calls made to the next layer to read data.
The default setting is 4096. The minimum value is 8.
@param amount The size of the read buffer.
@throws std::invalid_argument If the buffer size is less than 8.
@par Example
Setting the read buffer size.
@code
ws.read_buffer_size(16 * 1024);
@endcode
*/
void
read_buffer_size(std::size_t amount)
{
if(amount < 8)
BOOST_THROW_EXCEPTION(std::invalid_argument{
"read buffer size underflow"});
rd_buf_size_ = amount;
}
/// Returns the read buffer size setting.
std::size_t
read_buffer_size() const
{
return rd_buf_size_;
}
/** Set the maximum incoming message size option.
Sets the largest permissible incoming message size. Message
@@ -627,45 +712,11 @@ public:
return wr_opcode_ == detail::opcode::text;
}
/** Returns the close reason received from the peer.
This is only valid after a read completes with error::closed.
*/
close_reason const&
reason() const
{
return cr_;
}
/** Returns `true` if the latest message data indicates binary.
This function informs the caller of whether the last
received message frame represents a message with the
binary opcode.
If there is no last message frame, the return value is
undefined.
*/
bool
got_binary()
{
return rd_.op == detail::opcode::binary;
}
/** Returns `true` if the latest message data indicates text.
This function informs the caller of whether the last
received message frame represents a message with the
text opcode.
If there is no last message frame, the return value is
undefined.
*/
bool
got_text()
{
return ! got_binary();
}
//--------------------------------------------------------------------------
//
// Handshaking
//
//--------------------------------------------------------------------------
/** Read and respond to a WebSocket HTTP Upgrade request.
@@ -2480,6 +2531,12 @@ public:
RequestDecorator const& decorator,
HandshakeHandler&& handler);
//--------------------------------------------------------------------------
//
// Control Frames
//
//--------------------------------------------------------------------------
/** Send a WebSocket close frame.
This function is used to synchronously send a close frame on
@@ -2553,7 +2610,7 @@ public:
next layer's `async_write_some` functions, and is known as a
<em>composed operation</em>. The program must ensure that the
stream performs no other write operations (such as @ref async_ping,
@ref stream::async_write, @ref stream::async_write_frame, or
@ref stream::async_write, @ref stream::async_write_some, or
@ref stream::async_close) until this operation completes.
If the close reason specifies a close code other than
@@ -2769,6 +2826,75 @@ public:
#endif
async_pong(ping_data const& payload, WriteHandler&& handler);
//--------------------------------------------------------------------------
//
// Reading
//
//--------------------------------------------------------------------------
/** Start an asynchronous operation to read a message frame from the stream.
This function is used to asynchronously read a single message
frame from the websocket. The function call always returns
immediately. The asynchronous operation will continue until
one of the following conditions is true:
@li A complete frame is received.
@li An error occurs on the stream.
This operation is implemented in terms of one or more calls to the
next layer's `async_read_some` and `async_write_some` functions,
and is known as a <em>composed operation</em>. The program must
ensure that the stream performs no other reads until this operation
completes.
During reads, the implementation handles control frames as
follows:
@li The @ref control_callback is invoked when a ping frame
or pong frame is received.
@li A pong frame is sent when a ping frame is received.
@li The WebSocket close procedure is started if a close frame
is received. In this case, the operation will eventually
complete with the error set to @ref error::closed.
Because of the need to handle control frames, read operations
can cause writes to take place. These writes are managed
transparently; callers can still have one active asynchronous
read and asynchronous write operation pending simultaneously
(a user initiated call to @ref async_close counts as a write).
@param buffer A dynamic buffer to hold the message data after
any masking or decompression has been applied. This object must
remain valid until the handler is called.
@param handler The handler to be called when the read operation
completes. Copies will be made of the handler as required. The
function signature of the handler must be:
@code
void handler(
error_code const& ec, // Result of operation
bool fin // `true` if this is the last frame
);
@endcode
Regardless of whether the asynchronous operation completes
immediately or not, the handler will not be invoked from within
this function. Invocation of the handler will be performed in a
manner equivalent to using boost::asio::io_service::post().
*/
template<class DynamicBuffer, class ReadHandler>
#if BEAST_DOXYGEN
void_or_deduced
#else
async_return_type<ReadHandler, void(error_code, bool)>
#endif
async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler);
//--------------------------------------------------------------------------
/** Read a message from the stream.
This function is used to synchronously read a message from
@@ -2897,7 +3023,7 @@ public:
function signature of the handler must be:
@code
void handler(
error_code const& ec // Result of operation
error_code const& ec; // Result of operation
);
@endcode
Regardless of whether the asynchronous operation completes
@@ -2914,24 +3040,30 @@ public:
#endif
async_read(DynamicBuffer& buffer, ReadHandler&& handler);
/** Read a message frame from the stream.
//--------------------------------------------------------------------------
This function is used to synchronously read a single message
frame from the stream. The call blocks until one of the following
/** Read some message data from the stream.
This function is used to synchronously read some message
data from the stream. The call blocks until one of the following
is true:
@li A complete frame is received.
@li One or more message octets are placed into the provided buffers.
@li A final message frame is received.
@li A close frame is received and processed.
@li An error occurs on the stream.
This call is implemented in terms of one or more calls to the
stream's `read_some` and `write_some` operations.
This function is implemented in terms of one or more calls to the
stream's `read_some` operation.
During reads, the implementation handles control frames as
follows:
@li The @ref control_callback is invoked when a ping frame
or pong frame is received.
@li The @ref control_callback is invoked when any control
frame is received.
@li A pong frame is sent when a ping frame is received.
@@ -2939,35 +3071,44 @@ public:
is received. In this case, the operation will eventually
complete with the error set to @ref error::closed.
@param buffer A dynamic buffer to hold the message data after
any masking or decompression has been applied.
@param buffer A dynamic buffer for holding the result
@return `true` if this is the last frame of the message.
@param limit An upper limit on the number of bytes this
function will write. If this value is zero, then a reasonable
size will be chosen automatically.
@throws system_error Thrown on failure.
@return The number of bytes written to the buffers
*/
template<class DynamicBuffer>
bool
read_frame(DynamicBuffer& buffer);
std::size_t
read_some(
DynamicBuffer& buffer,
std::size_t limit);
/** Read a message frame from the stream.
/** Read some message data from the stream.
This function is used to synchronously read a single message
frame from the stream. The call blocks until one of the following
This function is used to synchronously read some message
data from the stream. The call blocks until one of the following
is true:
@li A complete frame is received.
@li One or more message octets are placed into the provided buffers.
@li A final message frame is received.
@li A close frame is received and processed.
@li An error occurs on the stream.
This call is implemented in terms of one or more calls to the
stream's `read_some` and `write_some` operations.
This function is implemented in terms of one or more calls to the
stream's `read_some` operation.
During reads, the implementation handles control frames as
follows:
@li The @ref control_callback is invoked when a ping frame
or pong frame is received.
@li The @ref control_callback is invoked when any control
frame is received.
@li A pong frame is sent when a ping frame is received.
@@ -2975,39 +3116,54 @@ public:
is received. In this case, the operation will eventually
complete with the error set to @ref error::closed.
@param buffer A dynamic buffer to hold the message data after
any masking or decompression has been applied.
@param buffer A dynamic buffer for holding the result
@param limit An upper limit on the number of bytes this
function will write. If this value is zero, then a reasonable
size will be chosen automatically.
@param ec Set to indicate what error occurred, if any.
@return `true` if this is the last frame of the message.
@return The number of bytes written to the buffer
*/
template<class DynamicBuffer>
bool
read_frame(DynamicBuffer& buffer, error_code& ec);
std::size_t
read_some(
DynamicBuffer& buffer,
std::size_t limit,
error_code& ec);
/** Start an asynchronous operation to read a message frame from the stream.
/** Start an asynchronous operation to read some message data from the stream.
This function is used to asynchronously read a single message
frame from the websocket. The function call always returns
immediately. The asynchronous operation will continue until
one of the following conditions is true:
This function is used to asynchronously read some message
data from the stream. The function call always returns immediately.
The asynchronous operation will continue until one of the following
is true:
@li A complete frame is received.
@li One or more message octets are placed into the provided buffers.
@li A final message frame is received.
@li A close frame is received and processed.
@li An error occurs on the stream.
This operation is implemented in terms of one or more calls to the
next layer's `async_read_some` and `async_write_some` functions,
and is known as a <em>composed operation</em>. The program must
ensure that the stream performs no other reads until this operation
completes.
next layer's `async_read_some` function, and is known as a
<em>composed operation</em>. The program must ensure that the
stream performs no other reads until this operation completes.
Upon a success, the input area of the stream buffer will
hold the received message payload bytes (which may be zero
in length). The functions @ref got_binary and @ref got_text
may be used to query the stream and determine the type
of the last received message.
During reads, the implementation handles control frames as
follows:
@li The @ref control_callback is invoked when a ping frame
or pong frame is received.
@li The @ref control_callback is invoked when any control
frame is received.
@li A pong frame is sent when a ping frame is received.
@@ -3021,31 +3177,216 @@ public:
read and asynchronous write operation pending simultaneously
(a user initiated call to @ref async_close counts as a write).
@param buffer A dynamic buffer to hold the message data after
any masking or decompression has been applied. This object must
remain valid until the handler is called.
@param buffer A dynamic buffer for holding the result
@param handler The handler to be called when the read operation
completes. Copies will be made of the handler as required. The
function signature of the handler must be:
@code
void handler(
error_code const& ec, // Result of operation
bool fin // `true` if this is the last frame
error_code const& ec, // Result of operation
std::size_t bytes_transferred // The number of bytes written to the buffer
);
@endcode
Regardless of whether the asynchronous operation completes
immediately or not, the handler will not be invoked from within
this function. Invocation of the handler will be performed in a
manner equivalent to using boost::asio::io_service::post().
manner equivalent to using `boost::asio::io_service::post`.
*/
template<class DynamicBuffer, class ReadHandler>
#if BEAST_DOXYGEN
void_or_deduced
#else
async_return_type<ReadHandler, void(error_code, bool)>
async_return_type<
ReadHandler, void(error_code, std::size_t)>
#endif
async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler);
async_read_some(
DynamicBuffer& buffer,
std::size_t limit,
ReadHandler&& handler);
//--------------------------------------------------------------------------
/** Read some message data from the stream.
This function is used to synchronously read some message
data from the stream. The call blocks until one of the following
is true:
@li One or more message octets are placed into the provided buffers.
@li A final message frame is received.
@li A close frame is received and processed.
@li An error occurs on the stream.
This function is implemented in terms of one or more calls to the
stream's `read_some` operation.
During reads, the implementation handles control frames as
follows:
@li The @ref control_callback is invoked when any control
frame is received.
@li A pong frame is sent when a ping frame is received.
@li The WebSocket close procedure is started if a close frame
is received. In this case, the operation will eventually
complete with the error set to @ref error::closed.
@param buffers A mutable buffer to hold the message data after
any masking or decompression has been applied.
@param buffers The buffers into which message data will be
placed after any masking or decompresison has been applied.
The implementation will make copies of this object as needed,
but ownership of the underlying memory is not transferred.
The caller is responsible for ensuring that the memory
locations pointed to by the buffers remains valid until the
completion handler is called.
@throws system_error Thrown on failure.
@return The number of bytes written to the buffers
*/
template<class MutableBufferSequence>
std::size_t
read_some(
MutableBufferSequence const& buffers);
/** Read some message data from the stream.
This function is used to synchronously read some message
data from the stream. The call blocks until one of the
following is true:
@li One or more message octets are placed into the provided buffers.
@li A final message frame is received.
@li A close frame is received and processed.
@li An error occurs on the stream.
This operation is implemented in terms of one or more calls to the
stream's `read_some` function.
During reads, the implementation handles control frames as
follows:
@li The @ref control_callback is invoked when any control
frame is received.
@li A pong frame is sent when a ping frame is received.
@li The WebSocket close procedure is started if a close frame
is received. In this case, the operation will eventually
complete with the error set to @ref error::closed.
@param buffers A mutable buffer to hold the message data after
any masking or decompression has been applied.
@param buffers The buffers into which message data will be
placed after any masking or decompresison has been applied.
The implementation will make copies of this object as needed,
but ownership of the underlying memory is not transferred.
The caller is responsible for ensuring that the memory
locations pointed to by the buffers remains valid until the
completion handler is called.
@param ec Set to indicate what error occurred, if any.
@return The number of bytes written to the buffers
*/
template<class MutableBufferSequence>
std::size_t
read_some(
MutableBufferSequence const& buffers,
error_code& ec);
/** Start an asynchronous operation to read some message data from the stream.
This function is used to asynchronously read some message
data from the stream. The function call always returns immediately.
The asynchronous operation will continue until one of the following
is true:
@li One or more message octets are placed into the provided buffers.
@li A final message frame is received.
@li A close frame is received and processed.
@li An error occurs on the stream.
This operation is implemented in terms of one or more calls to the
next layer's `async_read_some` function, and is known as a
<em>composed operation</em>. The program must ensure that the
stream performs no other reads until this operation completes.
Upon a success, the input area of the stream buffer will
hold the received message payload bytes (which may be zero
in length). The functions @ref got_binary and @ref got_text
may be used to query the stream and determine the type
of the last received message.
During reads, the implementation handles control frames as
follows:
@li The @ref control_callback is invoked when any control
frame is received.
@li A pong frame is sent when a ping frame is received.
@li The WebSocket close procedure is started if a close frame
is received. In this case, the operation will eventually
complete with the error set to @ref error::closed.
Because of the need to handle control frames, read operations
can cause writes to take place. These writes are managed
transparently; callers can still have one active asynchronous
read and asynchronous write operation pending simultaneously
(a user initiated call to @ref async_close counts as a write).
@param buffers A mutable buffer to hold the message data after
any masking or decompression has been applied.
@param buffers The buffers into which message data will be
placed after any masking or decompresison has been applied.
The implementation will make copies of this object as needed,
but ownership of the underlying memory is not transferred.
The caller is responsible for ensuring that the memory
locations pointed to by the buffers remains valid until the
completion handler is called.
@param handler The handler to be called when the read operation
completes. Copies will be made of the handler as required. The
function signature of the handler must be:
@code
void handler(
error_code const& ec, // Result of operation
std::size_t bytes_transferred // The number of bytes written to buffers
);
@endcode
Regardless of whether the asynchronous operation completes
immediately or not, the handler will not be invoked from within
this function. Invocation of the handler will be performed in a
manner equivalent to using `boost::asio::io_service::post`.
*/
template<class MutableBufferSequence, class ReadHandler>
#if BEAST_DOXYGEN
void_or_deduced
#else
async_return_type<ReadHandler, void(error_code, std::size_t)>
#endif
async_read_some(
MutableBufferSequence const& buffers,
ReadHandler&& handler);
//--------------------------------------------------------------------------
//
// Writing
//
//--------------------------------------------------------------------------
/** Write a message to the stream.
@@ -3076,7 +3417,7 @@ public:
@throws system_error Thrown on failure.
@note This function always sends an entire message. To
send a message in fragments, use @ref write_frame.
send a message in fragments, use @ref write_some.
*/
template<class ConstBufferSequence>
void
@@ -3113,7 +3454,7 @@ public:
@throws system_error Thrown on failure.
@note This function always sends an entire message. To
send a message in fragments, use @ref write_frame.
send a message in fragments, use @ref write_some.
*/
template<class ConstBufferSequence>
void
@@ -3134,7 +3475,7 @@ public:
to the next layer's `async_write_some` functions, and is known
as a <em>composed operation</em>. The program must ensure that
the stream performs no other write operations (such as
stream::async_write, stream::async_write_frame, or
stream::async_write, stream::async_write_some, or
stream::async_close).
The current setting of the @ref binary option controls
@@ -3203,7 +3544,7 @@ public:
*/
template<class ConstBufferSequence>
void
write_frame(bool fin, ConstBufferSequence const& buffers);
write_some(bool fin, ConstBufferSequence const& buffers);
/** Write partial message data on the stream.
@@ -3235,7 +3576,7 @@ public:
*/
template<class ConstBufferSequence>
void
write_frame(bool fin,
write_some(bool fin,
ConstBufferSequence const& buffers, error_code& ec);
/** Start an asynchronous operation to send a message frame on the stream.
@@ -3254,7 +3595,7 @@ public:
as a <em>composed operation</em>. The actual payload sent
may be transformed as per the WebSocket protocol settings. The
program must ensure that the stream performs no other write
operations (such as stream::async_write, stream::async_write_frame,
operations (such as stream::async_write, stream::async_write_some,
or stream::async_close).
If this is the beginning of a new message, the message opcode
@@ -3286,24 +3627,28 @@ public:
async_return_type<
WriteHandler, void(error_code)>
#endif
async_write_frame(bool fin,
async_write_some(bool fin,
ConstBufferSequence const& buffers, WriteHandler&& handler);
private:
template<class Decorator,
class Handler> class accept_op;
template<class Handler> class close_op;
template<class Handler> class handshake_op;
template<class Handler> class ping_op;
template<class DynamicBuffer,
class Handler> class read_op;
template<class DynamicBuffer,
class Handler> class read_frame_op;
template<class Handler> class response_op;
template<class Buffers,
class Handler> class write_frame_op;
template<class Buffers,
class Handler> class write_op;
enum class fail_how
{
code = 1, // send close code, teardown, finish with error::failed
close = 2, // send frame in fb, teardown, finish with error::closed
teardown = 3 // teardown, finish with error::failed
};
template<class, class> class accept_op;
template<class> class close_op;
template<class> class fail_op;
template<class> class handshake_op;
template<class> class ping_op;
template<class> class read_fh_op;
template<class, class> class read_some_op;
template<class, class> class read_op;
template<class> class response_op;
template<class, class> class write_some_op;
template<class, class> class write_op;
static void default_decorate_req(request_type&) {}
static void default_decorate_res(response_type&) {}
@@ -3311,9 +3656,13 @@ private:
void open(role_type role);
void close();
void reset();
void rd_begin();
void wr_begin();
template<class DynamicBuffer>
bool
parse_fh(detail::frame_header& fh,
DynamicBuffer& b, close_code& code);
template<class DynamicBuffer>
std::size_t
read_fh1(detail::frame_header& fh,
@@ -3376,6 +3725,7 @@ private:
#include <beast/websocket/impl/accept.ipp>
#include <beast/websocket/impl/close.ipp>
#include <beast/websocket/impl/fail.ipp>
#include <beast/websocket/impl/handshake.ipp>
#include <beast/websocket/impl/ping.ipp>
#include <beast/websocket/impl/read.ipp>

View File

@@ -163,7 +163,7 @@ boost::asio::ip::tcp::socket sock{ios};
//[ws_snippet_16
multi_buffer buffer;
for(;;)
if(ws.read_frame(buffer))
if(ws.read_some(buffer, 0))
break;
ws.binary(ws.got_binary());
consuming_buffers<multi_buffer::const_buffers_type> cb{buffer.data()};
@@ -172,12 +172,12 @@ boost::asio::ip::tcp::socket sock{ios};
using boost::asio::buffer_size;
if(buffer_size(cb) > 512)
{
ws.write_frame(false, buffer_prefix(512, cb));
ws.write_some(false, buffer_prefix(512, cb));
cb.consume(512);
}
else
{
ws.write_frame(true, cb);
ws.write_some(true, cb);
break;
}
}

View File

@@ -37,6 +37,7 @@ public:
check("beast.websocket", error::closed);
check("beast.websocket", error::failed);
check("beast.websocket", error::handshake_failed);
check("beast.websocket", error::buffer_overflow);
}
};

View File

@@ -264,7 +264,7 @@ public:
read(stream<NextLayer>& ws,
DynamicBuffer& buffer) const
{
ws.read(buffer);
return ws.read(buffer);
}
template<
@@ -279,10 +279,10 @@ public:
template<
class NextLayer, class ConstBufferSequence>
void
write_frame(stream<NextLayer>& ws, bool fin,
write_some(stream<NextLayer>& ws, bool fin,
ConstBufferSequence const& buffers) const
{
ws.write_frame(fin, buffers);
ws.write_some(fin, buffers);
}
template<
@@ -521,11 +521,11 @@ public:
template<
class NextLayer, class ConstBufferSequence>
void
write_frame(stream<NextLayer>& ws, bool fin,
write_some(stream<NextLayer>& ws, bool fin,
ConstBufferSequence const& buffers) const
{
error_code ec;
ws.async_write_frame(fin, buffers, yield_[ec]);
ws.async_write_some(fin, buffers, yield_[ec]);
if(ec)
throw system_error{ec};
}
@@ -551,7 +551,6 @@ public:
ws.auto_fragment(true);
ws.write_buffer_size(2048);
ws.binary(false);
ws.read_buffer_size(8192);
ws.read_message_max(1 * 1024 * 1024);
try
{
@@ -592,7 +591,7 @@ public:
{
static std::size_t constexpr limit = 200;
std::size_t n;
for(n = 0; n < limit; ++n)
for(n = 0; n <= limit; ++n)
{
test::fail_counter fc{n};
try
@@ -904,7 +903,7 @@ public:
{
static std::size_t constexpr limit = 200;
std::size_t n;
for(n = 199; n < limit; ++n)
for(n = 0; n < limit; ++n)
{
test::fail_counter fc{n};
try
@@ -1539,8 +1538,8 @@ public:
ws.handshake("localhost", "/", ec);
if(! BEAST_EXPECTS(! ec, ec.message()))
return;
ws.write_frame(false, sbuf("u"));
ws.write_frame(true, sbuf("v"));
ws.write_some(false, sbuf("u"));
ws.write_some(true, sbuf("v"));
multi_buffer b;
ws.read(b, ec);
if(! BEAST_EXPECTS(! ec, ec.message()))
@@ -1562,7 +1561,7 @@ public:
ws.handshake("localhost", "/", ec);
if(! BEAST_EXPECTS(! ec, ec.message()))
break;
ws.async_write_frame(false,
ws.async_write_some(false,
boost::asio::null_buffers{},
[&](error_code)
{
@@ -1573,7 +1572,7 @@ public:
break;
//
// Destruction of the io_service will cause destruction
// of the write_frame_op without invoking the final handler.
// of the write_some_op without invoking the final handler.
//
break;
}
@@ -1687,9 +1686,9 @@ public:
BEAST_EXPECT(s == "payload");
});
ws.ping("payload");
c.write_frame(ws, false, sbuf("Hello, "));
c.write_frame(ws, false, sbuf(""));
c.write_frame(ws, true, sbuf("World!"));
c.write_some(ws, false, sbuf("Hello, "));
c.write_some(ws, false, sbuf(""));
c.write_some(ws, true, sbuf("World!"));
{
// receive echoed message
multi_buffer db;
@@ -1783,13 +1782,13 @@ public:
if(! pmd.client_enable)
{
// expected cont
c.write_frame(ws, false, boost::asio::null_buffers{});
c.write_some(ws, false, boost::asio::null_buffers{});
c.write_raw(ws,
cbuf(0x81, 0x80, 0xff, 0xff, 0xff, 0xff));
restart(error::closed);
// message size above 2^64
c.write_frame(ws, false, cbuf(0x00));
c.write_some(ws, false, cbuf(0x00));
c.write_raw(ws,
cbuf(0x80, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff));

View File

@@ -10,6 +10,7 @@
#include <beast/core/multi_buffer.hpp>
#include <beast/websocket/stream.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/optional.hpp>
#include <atomic>

View File

@@ -167,7 +167,7 @@ private:
{
std::geometric_distribution<std::size_t> dist{
double(4) / boost::asio::buffer_size(tb_)};
ws_.async_write_frame(true,
ws_.async_write_some(true,
beast::buffer_prefix(dist(rng_), tb_),
alloc_.wrap(std::bind(
&connection::on_write,