read_frame returns bool fin (API Change):

* frame_info struct is removed

* read_frame and async_read_frame return a bool indicating
  if the frame is the last frame of the current message.

Actions Required:

* Remove the frame_info parameter from all read frame call sites

* Check the return value 'fin' for calls to read_frame

* Change ReadHandlers passed to async_read_frame to have
  the signature void(error_code, bool fin), use the bool
  to indicate if the frame is the last frame.
This commit is contained in:
Vinnie Falco
2017-06-08 20:34:27 -07:00
parent 620ce08d6a
commit 9e910761cc
4 changed files with 92 additions and 147 deletions

View File

@@ -9,6 +9,7 @@ API Changes:
* write_buffer_size is a member of stream * write_buffer_size is a member of stream
* ping_callback is a member of stream * ping_callback is a member of stream
* Remove opcode from read, async_read * Remove opcode from read, async_read
* read_frame returns `bool` fin
Actions Required: Actions Required:
@@ -34,6 +35,14 @@ Actions Required:
and asynchronous read functions, replace the logic with calls to and asynchronous read functions, replace the logic with calls to
stream::got_binary and stream::got_text instead. stream::got_binary and stream::got_text instead.
* Remove the frame_info parameter from all read frame call sites
* Check the return value 'fin' for calls to read_frame
* Change ReadHandlers passed to async_read_frame to have
the signature void(error_code, bool fin), use the bool
to indicate if the frame is the last frame.
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
Version 51 Version 51

View File

@@ -49,7 +49,6 @@ class stream<NextLayer>::read_frame_op
{ {
bool cont; bool cont;
stream<NextLayer>& ws; stream<NextLayer>& ws;
frame_info& fi;
DynamicBuffer& db; DynamicBuffer& db;
fb_type fb; fb_type fb;
std::uint64_t remain; std::uint64_t remain;
@@ -60,9 +59,8 @@ class stream<NextLayer>::read_frame_op
int state = 0; int state = 0;
data(Handler& handler, stream<NextLayer>& ws_, data(Handler& handler, stream<NextLayer>& ws_,
frame_info& fi_, DynamicBuffer& sb_) DynamicBuffer& sb_)
: ws(ws_) : ws(ws_)
, fi(fi_)
, db(sb_) , db(sb_)
{ {
using boost::asio::asio_handler_is_continuation; using boost::asio::asio_handler_is_continuation;
@@ -345,9 +343,6 @@ operator()(error_code ec,
//------------------------------------------------------------------ //------------------------------------------------------------------
case do_frame_done: case do_frame_done:
// call handler
d.fi.op = d.ws.rd_.op;
d.fi.fin = d.fh.fin;
goto upcall; goto upcall;
//------------------------------------------------------------------ //------------------------------------------------------------------
@@ -684,50 +679,51 @@ upcall:
d.ws.wr_block_ = nullptr; d.ws.wr_block_ = nullptr;
d.ws.ping_op_.maybe_invoke() || d.ws.ping_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke(); d.ws.wr_op_.maybe_invoke();
d_.invoke(ec); bool const fin = (! ec) ? d.fh.fin : false;
d_.invoke(ec, fin);
} }
template<class NextLayer> template<class NextLayer>
template<class DynamicBuffer, class ReadHandler> template<class DynamicBuffer, class ReadHandler>
async_return_type< async_return_type<
ReadHandler, void(error_code)> ReadHandler, void(error_code, bool)>
stream<NextLayer>:: stream<NextLayer>::
async_read_frame(frame_info& fi, async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler)
DynamicBuffer& buffer, ReadHandler&& handler)
{ {
static_assert(is_async_stream<next_layer_type>::value, static_assert(is_async_stream<next_layer_type>::value,
"AsyncStream requirements requirements not met"); "AsyncStream requirements requirements not met");
static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value, static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met"); "DynamicBuffer requirements not met");
async_completion<ReadHandler, async_completion<ReadHandler,
void(error_code)> init{handler}; void(error_code, bool)> init{handler};
read_frame_op<DynamicBuffer, handler_type< read_frame_op<DynamicBuffer, handler_type<
ReadHandler, void(error_code)>>{init.completion_handler, ReadHandler, void(error_code, bool)>>{
*this, fi, buffer}; init.completion_handler,*this, buffer};
return init.result.get(); return init.result.get();
} }
template<class NextLayer> template<class NextLayer>
template<class DynamicBuffer> template<class DynamicBuffer>
void bool
stream<NextLayer>:: stream<NextLayer>::
read_frame(frame_info& fi, DynamicBuffer& buffer) read_frame(DynamicBuffer& buffer)
{ {
static_assert(is_sync_stream<next_layer_type>::value, static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met"); "SyncStream requirements not met");
static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value, static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met"); "DynamicBuffer requirements not met");
error_code ec; error_code ec;
read_frame(fi, buffer, ec); auto const fin = read_frame(buffer, ec);
if(ec) if(ec)
BOOST_THROW_EXCEPTION(system_error{ec}); BOOST_THROW_EXCEPTION(system_error{ec});
return fin;
} }
template<class NextLayer> template<class NextLayer>
template<class DynamicBuffer> template<class DynamicBuffer>
void bool
stream<NextLayer>:: stream<NextLayer>::
read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec) read_frame(DynamicBuffer& dynabuf, error_code& ec)
{ {
static_assert(is_sync_stream<next_layer_type>::value, static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met"); "SyncStream requirements not met");
@@ -748,7 +744,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
stream_, fb.prepare(2), ec)); stream_, fb.prepare(2), ec));
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
{ {
auto const n = read_fh1(fh, fb, code); auto const n = read_fh1(fh, fb, code);
if(code != close_code::none) if(code != close_code::none)
@@ -759,14 +755,14 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
stream_, fb.prepare(n), ec)); stream_, fb.prepare(n), ec));
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
} }
} }
read_fh2(fh, fb, code); read_fh2(fh, fb, code);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
if(code != close_code::none) if(code != close_code::none)
goto do_close; goto do_close;
} }
@@ -780,7 +776,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
fb.commit(boost::asio::read(stream_, mb, ec)); fb.commit(boost::asio::read(stream_, mb, ec));
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
if(fh.mask) if(fh.mask)
{ {
detail::prepared_key key; detail::prepared_key key;
@@ -802,7 +798,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
boost::asio::write(stream_, fb.data(), ec); boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
continue; continue;
} }
else if(fh.op == opcode::pong) else if(fh.op == opcode::pong)
@@ -830,7 +826,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
boost::asio::write(stream_, fb.data(), ec); boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
} }
goto do_close; goto do_close;
} }
@@ -865,7 +861,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
stream_.read_some(b, ec); stream_.read_some(b, ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
BOOST_ASSERT(bytes_transferred > 0); BOOST_ASSERT(bytes_transferred > 0);
remain -= bytes_transferred; remain -= bytes_transferred;
auto const pb = buffer_prefix( auto const pb = buffer_prefix(
@@ -897,7 +893,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
clamp(remain, rd_.buf_size)), ec); clamp(remain, rd_.buf_size)), ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
remain -= bytes_transferred; remain -= bytes_transferred;
auto const in = buffer( auto const in = buffer(
rd_.buf.get(), bytes_transferred); rd_.buf.get(), bytes_transferred);
@@ -907,7 +903,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
detail::inflate(pmd_->zi, dynabuf, in, ec); detail::inflate(pmd_->zi, dynabuf, in, ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
if(remain == 0 && fh.fin) if(remain == 0 && fh.fin)
{ {
static std::uint8_t constexpr static std::uint8_t constexpr
@@ -917,7 +913,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
buffer(&empty_block[0], 4), ec); buffer(&empty_block[0], 4), ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
} }
if(rd_.op == opcode::text) if(rd_.op == opcode::text)
{ {
@@ -943,9 +939,7 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
pmd_config_.client_no_context_takeover))) pmd_config_.client_no_context_takeover)))
pmd_->zi.reset(); pmd_->zi.reset();
} }
fi.op = rd_.op; return fh.fin;
fi.fin = fh.fin;
return;
} }
do_close: do_close:
if(code != close_code::none) if(code != close_code::none)
@@ -959,7 +953,7 @@ do_close:
boost::asio::write(stream_, fb.data(), ec); boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
} }
websocket_helpers::call_teardown(next_layer(), ec); websocket_helpers::call_teardown(next_layer(), ec);
if(ec == boost::asio::error::eof) if(ec == boost::asio::error::eof)
@@ -970,10 +964,10 @@ do_close:
} }
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return false;
ec = error::failed; ec = error::failed;
failed_ = true; failed_ = true;
return; return false;
} }
if(! ec) if(! ec)
{ {
@@ -987,6 +981,9 @@ do_close:
if(! ec) if(! ec)
ec = error::closed; ec = error::closed;
failed_ = ec != 0; failed_ = ec != 0;
if(failed_)
return false;
return true;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -997,41 +994,25 @@ template<class NextLayer>
template<class DynamicBuffer, class Handler> template<class DynamicBuffer, class Handler>
class stream<NextLayer>::read_op class stream<NextLayer>::read_op
{ {
struct data int state_ = 0;
{ stream<NextLayer>& ws_;
bool cont; DynamicBuffer& b_;
stream<NextLayer>& ws; Handler h_;
DynamicBuffer& db;
frame_info fi;
int state = 0;
data(Handler& handler, stream<NextLayer>& ws_,
DynamicBuffer& sb_)
: ws(ws_)
, db(sb_)
{
using boost::asio::asio_handler_is_continuation;
cont = asio_handler_is_continuation(std::addressof(handler));
}
};
handler_ptr<data, Handler> d_;
public: public:
read_op(read_op&&) = default; read_op(read_op&&) = default;
read_op(read_op const&) = default; read_op(read_op const&) = default;
template<class DeducedHandler, class... Args> template<class DeducedHandler>
read_op(DeducedHandler&& h, read_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args) stream<NextLayer>& ws, DynamicBuffer& b)
: d_(std::forward<DeducedHandler>(h), : ws_(ws)
ws, std::forward<Args>(args)...) , b_(b)
, h_(std::forward<DeducedHandler>(h))
{ {
(*this)(error_code{}, false);
} }
void operator()( void operator()(error_code const& ec, bool fin);
error_code const& ec, bool again = true);
friend friend
void* asio_handler_allocate( void* asio_handler_allocate(
@@ -1039,7 +1020,7 @@ public:
{ {
using boost::asio::asio_handler_allocate; using boost::asio::asio_handler_allocate;
return asio_handler_allocate( return asio_handler_allocate(
size, std::addressof(op->d_.handler())); size, std::addressof(op->h_));
} }
friend friend
@@ -1048,13 +1029,15 @@ public:
{ {
using boost::asio::asio_handler_deallocate; using boost::asio::asio_handler_deallocate;
asio_handler_deallocate( asio_handler_deallocate(
p, size, std::addressof(op->d_.handler())); p, size, std::addressof(op->h_));
} }
friend friend
bool asio_handler_is_continuation(read_op* op) bool asio_handler_is_continuation(read_op* op)
{ {
return op->d_->cont; using boost::asio::asio_handler_is_continuation;
return op->state_ >= 2 ? true:
asio_handler_is_continuation(std::addressof(op->h_));
} }
template<class Function> template<class Function>
@@ -1062,8 +1045,7 @@ public:
void asio_handler_invoke(Function&& f, read_op* op) void asio_handler_invoke(Function&& f, read_op* op)
{ {
using boost::asio::asio_handler_invoke; using boost::asio::asio_handler_invoke;
asio_handler_invoke( asio_handler_invoke(f, std::addressof(op->h_));
f, std::addressof(op->d_.handler()));
} }
}; };
@@ -1071,31 +1053,29 @@ template<class NextLayer>
template<class DynamicBuffer, class Handler> template<class DynamicBuffer, class Handler>
void void
stream<NextLayer>::read_op<DynamicBuffer, Handler>:: stream<NextLayer>::read_op<DynamicBuffer, Handler>::
operator()(error_code const& ec, bool again) operator()(error_code const& ec, bool fin)
{ {
auto& d = *d_; switch(state_)
d.cont = d.cont || again;
while(! ec)
{ {
switch(d.state) case 0:
{ state_ = 1;
case 0: goto do_read;
// read payload
d.state = 1;
d.ws.async_read_frame(
d.fi, d.db, std::move(*this));
return;
// got payload case 1:
case 1: state_ = 2;
if(d.fi.fin) BOOST_FALLTHROUGH;
goto upcall;
d.state = 0; case 2:
break; if(ec)
} goto upcall;
if(fin)
goto upcall;
do_read:
return ws_.async_read_frame(
b_, std::move(*this));
} }
upcall: upcall:
d_.invoke(ec); h_(ec);
} }
template<class NextLayer> template<class NextLayer>
@@ -1113,7 +1093,8 @@ async_read(DynamicBuffer& buffer, ReadHandler&& handler)
void(error_code)> init{handler}; void(error_code)> init{handler};
read_op<DynamicBuffer, handler_type< read_op<DynamicBuffer, handler_type<
ReadHandler, void(error_code)>>{ ReadHandler, void(error_code)>>{
init.completion_handler, *this, buffer}; init.completion_handler, *this, buffer}(
{}, false);
return init.result.get(); return init.result.get();
} }
@@ -1143,13 +1124,12 @@ read(DynamicBuffer& buffer, error_code& ec)
"SyncStream requirements not met"); "SyncStream requirements not met");
static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value, static_assert(beast::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met"); "DynamicBuffer requirements not met");
frame_info fi;
for(;;) for(;;)
{ {
read_frame(fi, buffer, ec); auto const fin = read_frame(buffer, ec);
if(ec) if(ec)
break; break;
if(fi.fin) if(fin)
break; break;
} }
} }

View File

@@ -35,20 +35,6 @@ using request_type = http::request<http::empty_body>;
/// The type of object holding HTTP Upgrade responses /// The type of object holding HTTP Upgrade responses
using response_type = http::response<http::string_body>; using response_type = http::response<http::string_body>;
/** Information about a WebSocket frame.
This information is provided to callers during frame
read operations.
*/
struct frame_info
{
/// Indicates the type of message (binary or text).
opcode op;
/// `true` if this is the last frame in the current message.
bool fin;
};
//-------------------------------------------------------------------- //--------------------------------------------------------------------
/** Provides message-oriented functionality using WebSocket. /** Provides message-oriented functionality using WebSocket.
@@ -2730,13 +2716,6 @@ public:
This call is implemented in terms of one or more calls to the This call is implemented in terms of one or more calls to the
stream's `read_some` and `write_some` operations. stream's `read_some` and `write_some` operations.
Upon success, `fi` is filled out to reflect the message payload
contents. `op` is set to binary or text, and the `fin` flag
indicates if all the message data has been read in. To read the
entire message, callers should keep calling @ref read_frame
until `fi.fin == true`. A message with no payload will have
`fi.fin == true`, and zero bytes placed into the stream buffer.
During reads, the implementation handles control frames as During reads, the implementation handles control frames as
follows: follows:
@@ -2749,16 +2728,16 @@ public:
is received. In this case, the operation will eventually is received. In this case, the operation will eventually
complete with the error set to @ref error::closed. complete with the error set to @ref error::closed.
@param fi An object to store metadata about the message.
@param buffer A dynamic buffer to hold the message data after @param buffer A dynamic buffer to hold the message data after
any masking or decompression has been applied. any masking or decompression has been applied.
@return `true` if this is the last frame of the message.
@throws system_error Thrown on failure. @throws system_error Thrown on failure.
*/ */
template<class DynamicBuffer> template<class DynamicBuffer>
void bool
read_frame(frame_info& fi, DynamicBuffer& buffer); read_frame(DynamicBuffer& buffer);
/** Read a message frame from the stream. /** Read a message frame from the stream.
@@ -2773,13 +2752,6 @@ public:
This call is implemented in terms of one or more calls to the This call is implemented in terms of one or more calls to the
stream's `read_some` and `write_some` operations. stream's `read_some` and `write_some` operations.
Upon success, `fi` is filled out to reflect the message payload
contents. `op` is set to binary or text, and the `fin` flag
indicates if all the message data has been read in. To read the
entire message, callers should keep calling @ref read_frame
until `fi.fin == true`. A message with no payload will have
`fi.fin == true`, and zero bytes placed into the stream buffer.
During reads, the implementation handles control frames as During reads, the implementation handles control frames as
follows: follows:
@@ -2792,16 +2764,16 @@ public:
is received. In this case, the operation will eventually is received. In this case, the operation will eventually
complete with the error set to @ref error::closed. complete with the error set to @ref error::closed.
@param fi An object to store metadata about the message.
@param buffer A dynamic buffer to hold the message data after @param buffer A dynamic buffer to hold the message data after
any masking or decompression has been applied. any masking or decompression has been applied.
@param ec Set to indicate what error occurred, if any. @param ec Set to indicate what error occurred, if any.
@return `true` if this is the last frame of the message.
*/ */
template<class DynamicBuffer> template<class DynamicBuffer>
void bool
read_frame(frame_info& fi, DynamicBuffer& buffer, error_code& ec); read_frame(DynamicBuffer& buffer, error_code& ec);
/** Start an asynchronous operation to read a message frame from the stream. /** Start an asynchronous operation to read a message frame from the stream.
@@ -2820,14 +2792,6 @@ public:
ensure that the stream performs no other reads until this operation ensure that the stream performs no other reads until this operation
completes. completes.
Upon a successful completion, `fi` is filled out to reflect the
message payload contents. `op` is set to binary or text, and the
`fin` flag indicates if all the message data has been read in.
To read the entire message, callers should keep calling
@ref read_frame until `fi.fin == true`. A message with no payload
will have `fi.fin == true`, and zero bytes placed into the stream
buffer.
During reads, the implementation handles control frames as During reads, the implementation handles control frames as
follows: follows:
@@ -2846,9 +2810,6 @@ public:
read and asynchronous write operation pending simultaneously read and asynchronous write operation pending simultaneously
(a user initiated call to @ref async_close counts as a write). (a user initiated call to @ref async_close counts as a write).
@param fi An object to store metadata about the message.
This object must remain valid until the handler is called.
@param buffer A dynamic buffer to hold the message data after @param buffer A dynamic buffer to hold the message data after
any masking or decompression has been applied. This object must any masking or decompression has been applied. This object must
remain valid until the handler is called. remain valid until the handler is called.
@@ -2858,7 +2819,8 @@ public:
function signature of the handler must be: function signature of the handler must be:
@code @code
void handler( void handler(
error_code const& ec // Result of operation error_code const& ec, // Result of operation
bool fin // `true` if this is the last frame
); );
@endcode @endcode
Regardless of whether the asynchronous operation completes Regardless of whether the asynchronous operation completes
@@ -2870,11 +2832,9 @@ public:
#if BEAST_DOXYGEN #if BEAST_DOXYGEN
void_or_deduced void_or_deduced
#else #else
async_return_type< async_return_type<ReadHandler, void(error_code, bool)>
ReadHandler, void(error_code)>
#endif #endif
async_read_frame(frame_info& fi, async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler);
DynamicBuffer& buffer, ReadHandler&& handler);
/** Write a message to the stream. /** Write a message to the stream.

View File

@@ -163,14 +163,10 @@ boost::asio::ip::tcp::socket sock{ios};
stream<boost::asio::ip::tcp::socket> ws{ios}; stream<boost::asio::ip::tcp::socket> ws{ios};
//[ws_snippet_16 //[ws_snippet_16
multi_buffer buffer; multi_buffer buffer;
frame_info fi;
for(;;) for(;;)
{ if(ws.read_frame(buffer))
ws.read_frame(fi, buffer);
if(fi.fin)
break; break;
} ws.binary(ws.got_binary());
ws.binary(fi.op == opcode::binary);
consuming_buffers<multi_buffer::const_buffers_type> cb{buffer.data()}; consuming_buffers<multi_buffer::const_buffers_type> cb{buffer.data()};
for(;;) for(;;)
{ {