diff --git a/CHANGELOG.md b/CHANGELOG.md index 269e10dd..17deee17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ WebSocket: * Refactor fail_op * Refactor read_op * Refactor close_op +* Refactor read_op + fail_op -------------------------------------------------------------------------------- diff --git a/include/boost/beast/core/flat_static_buffer.hpp b/include/boost/beast/core/flat_static_buffer.hpp index b9194512..556a1eef 100644 --- a/include/boost/beast/core/flat_static_buffer.hpp +++ b/include/boost/beast/core/flat_static_buffer.hpp @@ -54,12 +54,6 @@ public: */ using const_buffers_type = boost::asio::mutable_buffers_1; - /** The type used to represent the mutable input sequence as a list of buffers. - - This buffer sequence is guaranteed to have length 1. - */ - using mutable_data_type = boost::asio::mutable_buffers_1; - /** The type used to represent the output sequence as a list of buffers. This buffer sequence is guaranteed to have length 1. @@ -107,13 +101,6 @@ public: const_buffers_type data() const; - /** Get a list of mutable buffers that represent the input sequence. - - @note These buffers remain valid across subsequent calls to `prepare`. - */ - mutable_data_type - mutable_data(); - /// Set the input and output sequences to size 0 void reset(); diff --git a/include/boost/beast/core/impl/flat_static_buffer.ipp b/include/boost/beast/core/impl/flat_static_buffer.ipp index f8654033..3d38b929 100644 --- a/include/boost/beast/core/impl/flat_static_buffer.ipp +++ b/include/boost/beast/core/impl/flat_static_buffer.ipp @@ -35,15 +35,6 @@ data() const -> return {in_, dist(in_, out_)}; } -inline -auto -flat_static_buffer_base:: -mutable_data() -> - mutable_data_type -{ - return {in_, dist(in_, out_)}; -} - inline void flat_static_buffer_base:: diff --git a/include/boost/beast/core/impl/static_buffer.ipp b/include/boost/beast/core/impl/static_buffer.ipp index 9574cc4d..d93a1e69 100644 --- a/include/boost/beast/core/impl/static_buffer.ipp +++ b/include/boost/beast/core/impl/static_buffer.ipp @@ -50,27 +50,6 @@ data() const -> return result; } -inline -auto -static_buffer_base:: -mutable_data() -> - mutable_data_type -{ - using boost::asio::mutable_buffer; - mutable_data_type result; - if(in_off_ + in_size_ <= capacity_) - { - result[0] = mutable_buffer{begin_ + in_off_, in_size_}; - result[1] = mutable_buffer{begin_, 0}; - } - else - { - result[0] = mutable_buffer{begin_ + in_off_, capacity_ - in_off_}; - result[1] = mutable_buffer{begin_, in_size_ - (capacity_ - in_off_)}; - } - return result; -} - inline auto static_buffer_base:: diff --git a/include/boost/beast/core/static_buffer.hpp b/include/boost/beast/core/static_buffer.hpp index 031de235..df865c66 100644 --- a/include/boost/beast/core/static_buffer.hpp +++ b/include/boost/beast/core/static_buffer.hpp @@ -54,10 +54,6 @@ public: using const_buffers_type = std::array; - /// The type used to represent the mutable input sequence as a list of buffers. - using mutable_data_type = - std::array; - /// The type used to represent the output sequence as a list of buffers. using mutable_buffers_type = std::array; @@ -98,11 +94,6 @@ public: const_buffers_type data() const; - /** Get a list of mutable buffers that represent the input sequence. - */ - mutable_data_type - mutable_data(); - /** Get a list of buffers that represent the output sequence, with the given size. @param size The number of bytes to request. diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index a40d30fd..da751d6c 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -251,7 +251,7 @@ close(close_reason const& cr, error_code& ec) rd_close_ = true; auto const mb = buffer_prefix( clamp(rd_.fh.len), - rd_.buf.mutable_data()); + rd_.buf.data()); if(rd_.fh.len > 0 && rd_.fh.mask) detail::mask_inplace(mb, rd_.key); detail::read_close(cr_, mb, code); diff --git a/include/boost/beast/websocket/impl/fail.ipp b/include/boost/beast/websocket/impl/fail.ipp index bf447d28..da0b4db9 100644 --- a/include/boost/beast/websocket/impl/fail.ipp +++ b/include/boost/beast/websocket/impl/fail.ipp @@ -202,10 +202,7 @@ operator()(error_code ec, std::size_t) upcall: BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.ws.wr_block_.reset(); - d.ws.close_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke() || - d.ws.wr_op_.maybe_invoke(); - d_.invoke(ec, 0); + d_.invoke(ec); } } diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index 88f04410..7f7c5c69 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -33,299 +34,22 @@ namespace boost { namespace beast { namespace websocket { -//------------------------------------------------------------------------------ +/* Read some message frame data. -// read a frame header, process control frames -template -template -class stream::read_fh_op -{ - Handler h_; - stream& ws_; - int step_ = 0; - bool dispatched_ = false; - token tok_; - -public: - read_fh_op(read_fh_op&&) = default; - read_fh_op(read_fh_op const&) = default; - - template - read_fh_op( - DeducedHandler&& h, - stream& ws) - : h_(std::forward(h)) - , ws_(ws) - , tok_(ws_.t_.unique()) - { - } - - Handler& - handler() - { - return h_; - } - - void operator()(error_code ec = {}, - std::size_t bytes_transferred = 0); - - friend - void* asio_handler_allocate( - std::size_t size, read_fh_op* op) - { - using boost::asio::asio_handler_allocate; - return asio_handler_allocate( - size, std::addressof(op->h_)); - } - - friend - void asio_handler_deallocate( - void* p, std::size_t size, read_fh_op* op) - { - using boost::asio::asio_handler_deallocate; - asio_handler_deallocate( - p, size, std::addressof(op->h_)); - } - - friend - bool asio_handler_is_continuation(read_fh_op* op) - { - using boost::asio::asio_handler_is_continuation; - return op->dispatched_ || - asio_handler_is_continuation( - std::addressof(op->h_)); - } - - template - friend - void asio_handler_invoke(Function&& f, read_fh_op* op) - { - using boost::asio::asio_handler_invoke; - asio_handler_invoke(f, std::addressof(op->h_)); - } -}; - -template -template -void -stream:: -read_fh_op:: -operator()( - error_code ec, - std::size_t bytes_transferred) -{ - using beast::detail::clamp; - enum - { - do_loop = 0, - do_pong = 10 - }; - switch(step_) - { - case do_loop: - go_loop: - { - BOOST_ASSERT( - ws_.rd_.remain == 0 && - (! ws_.rd_.fh.fin || ws_.rd_.done)); - if(ws_.failed_) - { - // Reads after failure are aborted - ec = boost::asio::error::operation_aborted; - break; - } - close_code code{}; - // Read frame header - if(! ws_.parse_fh(ws_.rd_.fh, ws_.rd_.buf, code)) - { - if(code != close_code::none) - // _Fail the WebSocket Connection_ - return ws_.do_async_fail( - code, error::failed, std::move(h_)); - step_ = do_loop + 1; - return ws_.stream_.async_read_some( - ws_.rd_.buf.prepare(read_size( - ws_.rd_.buf, ws_.rd_.buf.max_size())), - std::move(*this)); - } - // Immediately apply the mask to the portion - // of the buffer holding payload data. - if(ws_.rd_.fh.len > 0 && ws_.rd_.fh.mask) - detail::mask_inplace(buffer_prefix( - clamp(ws_.rd_.fh.len), - ws_.rd_.buf.mutable_data()), - ws_.rd_.key); - if(detail::is_control(ws_.rd_.fh.op)) - { - // Get control frame payload - auto const cb = buffer_prefix(clamp( - ws_.rd_.fh.len), ws_.rd_.buf.data()); - auto const len = buffer_size(cb); - BOOST_ASSERT(len == ws_.rd_.fh.len); - // Process control frame - if(ws_.rd_.fh.op == detail::opcode::ping) - { - ping_data payload; - detail::read_ping(payload, cb); - ws_.rd_.buf.consume(len); - if(ws_.wr_close_) - { - // Ignore ping when closing - goto go_loop; - } - if(ws_.ctrl_cb_) - ws_.ctrl_cb_(frame_type::ping, payload); - ws_.rd_.fb.consume(ws_.rd_.fb.size()); - ws_.template write_ping< - flat_static_buffer_base>(ws_.rd_.fb, - detail::opcode::pong, payload); - goto go_pong; - } - if(ws_.rd_.fh.op == detail::opcode::pong) - { - code = close_code::none; - ping_data payload; - detail::read_ping(payload, cb); - ws_.rd_.buf.consume(len); - // Ignore pong when closing - if(! ws_.wr_close_ && ws_.ctrl_cb_) - ws_.ctrl_cb_(frame_type::pong, payload); - goto go_loop; - } - BOOST_ASSERT(ws_.rd_.fh.op == detail::opcode::close); - { - BOOST_ASSERT(! ws_.rd_close_); - ws_.rd_close_ = true; - close_reason cr; - detail::read_close(cr, cb, code); - if(code != close_code::none) - { - // _Fail the WebSocket Connection_ - return ws_.do_async_fail( - code, error::failed, std::move(h_)); - } - ws_.cr_ = cr; - ws_.rd_.buf.consume(len); - if(ws_.ctrl_cb_) - ws_.ctrl_cb_(frame_type::close, - ws_.cr_.reason); - if(! ws_.wr_close_) - // _Start the WebSocket Closing Handshake_ - return ws_.do_async_fail( - cr.code == close_code::none ? - close_code::normal : cr.code, - error::closed, std::move(h_)); - // _Close the WebSocket Connection_ - return ws_.do_async_fail(close_code::none, - error::closed, std::move(h_)); - } - } - if(ws_.rd_.fh.len == 0 && ! ws_.rd_.fh.fin) - { - // Empty non-final frame - goto go_loop; - } - ws_.rd_.done = - ws_.rd_.remain == 0 && ws_.rd_.fh.fin; - break; - } - - case do_loop + 1: - dispatched_ = true; - ws_.failed_ = !!ec; - if(ws_.failed_) - break; - ws_.rd_.buf.commit(bytes_transferred); - goto go_loop; - - go_pong: - if(ws_.wr_block_) - { - // suspend - BOOST_ASSERT(ws_.wr_block_ != tok_); - step_ = do_pong; - ws_.rd_op_.save(std::move(*this)); - return; - } - ws_.wr_block_ = tok_; - goto go_pong_send; - - case do_pong: - BOOST_ASSERT(! ws_.wr_block_); - ws_.wr_block_ = tok_; - step_ = do_pong + 1; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - ws_.get_io_service().post(bind_handler( - std::move(*this), ec, 0)); - return; - - case do_pong + 1: - BOOST_ASSERT(ws_.wr_block_ == tok_); - dispatched_ = true; - if(ws_.failed_) - { - // call handler - ws_.wr_block_.reset(); - ec = boost::asio::error::operation_aborted; - break; - } - if(ws_.wr_close_) - { - // ignore ping when closing - ws_.wr_block_.reset(); - ws_.rd_.fb.consume(ws_.rd_.fb.size()); - goto go_loop; - } - go_pong_send: - // send pong - BOOST_ASSERT(ws_.wr_block_ == tok_); - step_ = do_pong + 2; - boost::asio::async_write(ws_.stream_, - ws_.rd_.fb.data(), std::move(*this)); - return; - - case do_pong + 2: - BOOST_ASSERT(ws_.wr_block_ == tok_); - dispatched_ = true; - ws_.wr_block_.reset(); - ws_.failed_ = !!ec; - if(ws_.failed_) - break; - ws_.rd_.fb.consume(ws_.rd_.fb.size()); - goto go_loop; - } - // upcall - BOOST_ASSERT(ws_.wr_block_ != tok_); - ws_.close_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke() || - ws_.wr_op_.maybe_invoke(); - if(! dispatched_) - ws_.stream_.get_io_service().post( - bind_handler(std::move(h_), ec)); - else - h_(ec); -} - -//------------------------------------------------------------------------------ - -// Reads a single message frame, -// processes any received control frames. -// + Also reads and handles control frames. +*/ template template< class MutableBufferSequence, class Handler> class stream::read_some_op + : public boost::asio::coroutine { Handler h_; stream& ws_; consuming_buffers cb_; std::size_t bytes_written_ = 0; - int step_ = 0; + token tok_; bool did_read_ = false; bool dispatched_ = false; @@ -341,10 +65,18 @@ public: : h_(std::forward(h)) , ws_(ws) , cb_(bs) + , tok_(ws_.t_.unique()) { } - void operator()(error_code ec = {}, + Handler& + handler() + { + return h_; + } + + void operator()( + error_code ec = {}, std::size_t bytes_transferred = 0); friend @@ -393,26 +125,54 @@ operator()( error_code ec, std::size_t bytes_transferred) { - enum - { - do_start = 0, - do_maybe_fill = 10, - do_read = 20, - do_inflate = 30 - }; using beast::detail::clamp; using boost::asio::buffer; using boost::asio::buffer_cast; using boost::asio::buffer_size; - switch(step_) + close_code code{}; + BOOST_ASIO_CORO_REENTER(*this) { - case do_start: - if(ws_.failed_) + // Maybe suspend + if(! ws_.rd_block_) { - // Reads after failure are aborted - ec = boost::asio::error::operation_aborted; - break; + // Acquire the read block + ws_.rd_block_ = tok_; + + // Make sure the stream is open + if(ws_.failed_) + { + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + bind_handler(std::move(*this), + boost::asio::error::operation_aborted)); + goto upcall; + } } + else + { + // Suspend + BOOST_ASSERT(ws_.rd_block_ != tok_); + BOOST_ASIO_CORO_YIELD + ws_.r_rd_op_.save(std::move(*this)); + + // Acquire the read block + BOOST_ASSERT(! ws_.rd_block_); + ws_.rd_block_ = tok_; + + // Resume + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post(std::move(*this)); + BOOST_ASSERT(ws_.rd_block_ == tok_); + dispatched_ = true; + + // Handle the stream closing while suspended + if(ws_.failed_) + { + ec = boost::asio::error::operation_aborted; + goto upcall; + } + } + loop: // See if we need to read a frame header. This // condition is structured to give the decompressor // a chance to emit the final empty deflate block @@ -420,236 +180,399 @@ operator()( if(ws_.rd_.remain == 0 && (! ws_.rd_.fh.fin || ws_.rd_.done)) { - step_ = do_maybe_fill; - return read_fh_op{ - std::move(*this), ws_}({}, 0); - } - goto go_maybe_fill; - - case do_maybe_fill: - dispatched_ = true; - if(ec) - break; - if(ws_.rd_.done) - break; - - go_maybe_fill: - if(ws_.pmd_ && ws_.pmd_->rd_set) - goto go_inflate; - if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > - (std::min)(clamp(ws_.rd_.remain), - buffer_size(cb_))) - { - // Fill the read buffer first, otherwise we - // get fewer bytes at the cost of one I/O. - auto const mb = ws_.rd_.buf.prepare( - read_size(ws_.rd_.buf, - ws_.rd_.buf.max_size())); - step_ = do_maybe_fill + 1; - return ws_.stream_.async_read_some( - mb, std::move(*this)); - } - goto go_rd_buf; - - case do_maybe_fill + 1: - dispatched_ = true; - ws_.failed_ = !!ec; - if(ws_.failed_) - break; - ws_.rd_.buf.commit(bytes_transferred); - if(ws_.rd_.fh.mask) - detail::mask_inplace(buffer_prefix(clamp( - ws_.rd_.remain), ws_.rd_.buf.mutable_data()), - ws_.rd_.key); - - go_rd_buf: - if(ws_.rd_.buf.size() > 0) - { - // Copy from the read buffer. - // The mask was already applied. - bytes_transferred = buffer_copy(cb_, - ws_.rd_.buf.data(), clamp(ws_.rd_.remain)); - auto const mb = buffer_prefix( - bytes_transferred, cb_); - ws_.rd_.remain -= bytes_transferred; - if(ws_.rd_.op == detail::opcode::text) + // Read frame header + while(! ws_.parse_fh( + ws_.rd_.fh, ws_.rd_.buf, code)) { - if(! ws_.rd_.utf8.write(mb) || - (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && - ! ws_.rd_.utf8.finish())) - // _Fail the WebSocket Connection_ - return ws_.do_async_fail(close_code::bad_payload, - error::failed, std::move(h_)); - } - bytes_written_ += bytes_transferred; - ws_.rd_.size += bytes_transferred; - ws_.rd_.buf.consume(bytes_transferred); - goto go_done; - } - // Read into caller's buffer - step_ = do_read; - BOOST_ASSERT(ws_.rd_.remain > 0); - BOOST_ASSERT(buffer_size(cb_) > 0); - return ws_.stream_.async_read_some(buffer_prefix( - clamp(ws_.rd_.remain), cb_), std::move(*this)); - - case do_read: - { - dispatched_ = true; - ws_.failed_ = !!ec; - if(ws_.failed_) - break; - BOOST_ASSERT(bytes_transferred > 0); - auto const mb = buffer_prefix( - bytes_transferred, cb_); - ws_.rd_.remain -= bytes_transferred; - if(ws_.rd_.fh.mask) - detail::mask_inplace(mb, ws_.rd_.key); - if(ws_.rd_.op == detail::opcode::text) - { - if(! ws_.rd_.utf8.write(mb) || - (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && - ! ws_.rd_.utf8.finish())) - // _Fail the WebSocket Connection_ - return ws_.do_async_fail(close_code::bad_payload, - error::failed, std::move(h_)); - } - bytes_written_ += bytes_transferred; - ws_.rd_.size += bytes_transferred; - } - - go_done: - ws_.rd_.done = - ws_.rd_.remain == 0 && ws_.rd_.fh.fin; - break; - - case do_inflate: - go_inflate: - { - // Read compressed message frame payload: - // inflate even if rd_.fh.len == 0, otherwise we - // never emit the end-of-stream deflate block. - while(buffer_size(cb_) > 0) - { - zlib::z_params zs; - { - auto const out = buffer_front(cb_); - zs.next_out = buffer_cast(out); - zs.avail_out = buffer_size(out); - BOOST_ASSERT(zs.avail_out > 0); - } - if(ws_.rd_.remain > 0) - { - if(ws_.rd_.buf.size() > 0) + if(code != close_code::none) { - // use what's there - auto const in = buffer_prefix( - clamp(ws_.rd_.remain), buffer_front( - ws_.rd_.buf.data())); - zs.avail_in = buffer_size(in); - zs.next_in = buffer_cast(in); + // _Fail the WebSocket Connection_ + ec = error::failed; + goto close; } - else if(! did_read_) + BOOST_ASIO_CORO_YIELD + ws_.stream_.async_read_some( + ws_.rd_.buf.prepare(read_size( + ws_.rd_.buf, ws_.rd_.buf.max_size())), + std::move(*this)); + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + goto upcall; + ws_.rd_.buf.commit(bytes_transferred); + } + // Immediately apply the mask to the portion + // of the buffer holding payload data. + if(ws_.rd_.fh.len > 0 && ws_.rd_.fh.mask) + detail::mask_inplace(buffer_prefix( + clamp(ws_.rd_.fh.len), + ws_.rd_.buf.data()), + ws_.rd_.key); + if(detail::is_control(ws_.rd_.fh.op)) + { + // Handle ping frame + if(ws_.rd_.fh.op == detail::opcode::ping) + { + { + auto const cb = buffer_prefix( + clamp(ws_.rd_.fh.len), + ws_.rd_.buf.data()); + auto const len = buffer_size(cb); + BOOST_ASSERT(len == ws_.rd_.fh.len); + ping_data payload; + detail::read_ping(payload, cb); + ws_.rd_.buf.consume(len); + // Ignore ping when closing + if(ws_.wr_close_) + goto loop; + if(ws_.ctrl_cb_) + ws_.ctrl_cb_(frame_type::ping, payload); + ws_.rd_.fb.reset(); + ws_.template write_ping< + flat_static_buffer_base>(ws_.rd_.fb, + detail::opcode::pong, payload); + } + // Maybe suspend + if(! ws_.wr_block_) + { + // Acquire the write block + ws_.wr_block_ = tok_; + } + else + { + // Suspend + BOOST_ASSERT(ws_.wr_block_ != tok_); + BOOST_ASIO_CORO_YIELD + ws_.rd_op_.save(std::move(*this)); + + // Acquire the write block + BOOST_ASSERT(! ws_.wr_block_); + ws_.wr_block_ = tok_; + + // Resume + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post(std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + dispatched_ = true; + + // Make sure the stream is open + if(ws_.failed_) + { + ws_.wr_block_.reset(); + ec = boost::asio::error::operation_aborted; + goto upcall; + } + + // Ignore ping when closing + if(ws_.wr_close_) + { + ws_.wr_block_.reset(); + goto loop; + } + } + + // Send pong + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write(ws_.stream_, + ws_.rd_.fb.data(), std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + dispatched_ = true; + ws_.wr_block_.reset(); + ws_.failed_ = !!ec; + if(ws_.failed_) + goto upcall; + goto loop; + } + // Handle pong frame + if(ws_.rd_.fh.op == detail::opcode::pong) + { + auto const cb = buffer_prefix(clamp( + ws_.rd_.fh.len), ws_.rd_.buf.data()); + auto const len = buffer_size(cb); + BOOST_ASSERT(len == ws_.rd_.fh.len); + code = close_code::none; + ping_data payload; + detail::read_ping(payload, cb); + ws_.rd_.buf.consume(len); + // Ignore pong when closing + if(! ws_.wr_close_ && ws_.ctrl_cb_) + ws_.ctrl_cb_(frame_type::pong, payload); + goto loop; + } + // Handle close frame + BOOST_ASSERT(ws_.rd_.fh.op == detail::opcode::close); + { + auto const cb = buffer_prefix(clamp( + ws_.rd_.fh.len), ws_.rd_.buf.data()); + auto const len = buffer_size(cb); + BOOST_ASSERT(len == ws_.rd_.fh.len); + BOOST_ASSERT(! ws_.rd_close_); + ws_.rd_close_ = true; + close_reason cr; + detail::read_close(cr, cb, code); + if(code != close_code::none) + { + // _Fail the WebSocket Connection_ + ec = error::failed; + goto close; + } + ws_.cr_ = cr; + ws_.rd_.buf.consume(len); + if(ws_.ctrl_cb_) + ws_.ctrl_cb_(frame_type::close, + ws_.cr_.reason); + if(! ws_.wr_close_) + { + // _Start the WebSocket Closing Handshake_ + code = cr.code == close_code::none ? + close_code::normal : + static_cast(cr.code); + ec = error::closed; + goto close; + } + // _Close the WebSocket Connection_ + code = close_code::none; + ec = error::closed; + goto close; + } + } + if(ws_.rd_.fh.len == 0 && ! ws_.rd_.fh.fin) + { + // Empty non-final frame + goto loop; + } + ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin; + if(ws_.rd_.done) + goto upcall; // Empty final frame + } + if(! ws_.pmd_ || ! ws_.pmd_->rd_set) + { + if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > + (std::min)(clamp(ws_.rd_.remain), + buffer_size(cb_))) + { + // Fill the read buffer first, otherwise we + // get fewer bytes at the cost of one I/O. + BOOST_ASIO_CORO_YIELD + ws_.stream_.async_read_some( + ws_.rd_.buf.prepare( + read_size( + ws_.rd_.buf, + ws_.rd_.buf.max_size())), + std::move(*this)); + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + goto upcall; + ws_.rd_.buf.commit(bytes_transferred); + if(ws_.rd_.fh.mask) + detail::mask_inplace(buffer_prefix(clamp( + ws_.rd_.remain), ws_.rd_.buf.data()), + ws_.rd_.key); + } + if(ws_.rd_.buf.size() > 0) + { + // Copy from the read buffer. + // The mask was already applied. + bytes_transferred = buffer_copy(cb_, + ws_.rd_.buf.data(), clamp(ws_.rd_.remain)); + auto const mb = buffer_prefix( + bytes_transferred, cb_); + ws_.rd_.remain -= bytes_transferred; + if(ws_.rd_.op == detail::opcode::text) + { + if(! ws_.rd_.utf8.write(mb) || + (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + code = close_code::bad_payload; + ec = error::failed; + goto close; + } + } + bytes_written_ += bytes_transferred; + ws_.rd_.size += bytes_transferred; + ws_.rd_.buf.consume(bytes_transferred); + } + else + { + // Read into caller's buffer + BOOST_ASSERT(ws_.rd_.remain > 0); + BOOST_ASSERT(buffer_size(cb_) > 0); + BOOST_ASIO_CORO_YIELD + ws_.stream_.async_read_some(buffer_prefix( + clamp(ws_.rd_.remain), cb_), std::move(*this)); + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + goto upcall; + BOOST_ASSERT(bytes_transferred > 0); + auto const mb = buffer_prefix( + bytes_transferred, cb_); + ws_.rd_.remain -= bytes_transferred; + if(ws_.rd_.fh.mask) + detail::mask_inplace(mb, ws_.rd_.key); + if(ws_.rd_.op == detail::opcode::text) + { + if(! ws_.rd_.utf8.write(mb) || + (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + code = close_code::bad_payload; + ec = error::failed; + goto close; + } + } + bytes_written_ += bytes_transferred; + ws_.rd_.size += bytes_transferred; + } + ws_.rd_.done = + ws_.rd_.remain == 0 && ws_.rd_.fh.fin; + goto upcall; + } + else + { + // Read compressed message frame payload: + // inflate even if rd_.fh.len == 0, otherwise we + // never emit the end-of-stream deflate block. + while(buffer_size(cb_) > 0) + { + if( ws_.rd_.remain > 0 && + ws_.rd_.buf.size() == 0 && + ! did_read_) { // read new - step_ = do_inflate + 1; - return ws_.stream_.async_read_some( + BOOST_ASIO_CORO_YIELD + ws_.stream_.async_read_some( ws_.rd_.buf.prepare(read_size( ws_.rd_.buf, ws_.rd_.buf.max_size())), std::move(*this)); + ws_.failed_ = !!ec; + if(ws_.failed_) + goto upcall; + BOOST_ASSERT(bytes_transferred > 0); + ws_.rd_.buf.commit(bytes_transferred); + if(ws_.rd_.fh.mask) + detail::mask_inplace( + buffer_prefix(clamp(ws_.rd_.remain), + ws_.rd_.buf.data()), ws_.rd_.key); + did_read_ = true; + } + zlib::z_params zs; + { + auto const out = buffer_front(cb_); + zs.next_out = buffer_cast(out); + zs.avail_out = buffer_size(out); + BOOST_ASSERT(zs.avail_out > 0); + } + if(ws_.rd_.remain > 0) + { + if(ws_.rd_.buf.size() > 0) + { + // use what's there + auto const in = buffer_prefix( + clamp(ws_.rd_.remain), buffer_front( + ws_.rd_.buf.data())); + zs.avail_in = buffer_size(in); + zs.next_in = buffer_cast(in); + } + else + { + break; + } + } + else if(ws_.rd_.fh.fin) + { + // append the empty block codes + static std::uint8_t constexpr + empty_block[4] = { + 0x00, 0x00, 0xff, 0xff }; + zs.next_in = empty_block; + zs.avail_in = sizeof(empty_block); + ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); + BOOST_ASSERT(! ec); + ws_.failed_ = !!ec; + if(ws_.failed_) + break; + // VFALCO See: + // https://github.com/madler/zlib/issues/280 + BOOST_ASSERT(zs.total_out == 0); + cb_.consume(zs.total_out); + ws_.rd_.size += zs.total_out; + bytes_written_ += zs.total_out; + if( + (ws_.role_ == role_type::client && + ws_.pmd_config_.server_no_context_takeover) || + (ws_.role_ == role_type::server && + ws_.pmd_config_.client_no_context_takeover)) + ws_.pmd_->zi.reset(); + ws_.rd_.done = true; + break; } else { break; } - } - else if(ws_.rd_.fh.fin) - { - // append the empty block codes - static std::uint8_t constexpr - empty_block[4] = { - 0x00, 0x00, 0xff, 0xff }; - zs.next_in = empty_block; - zs.avail_in = sizeof(empty_block); ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); - BOOST_ASSERT(! ec); + BOOST_ASSERT(ec != zlib::error::end_of_stream); ws_.failed_ = !!ec; if(ws_.failed_) break; - // VFALCO See: - // https://github.com/madler/zlib/issues/280 - BOOST_ASSERT(zs.total_out == 0); + if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( + ws_.rd_.size, zs.total_out, ws_.rd_msg_max_)) + { + // _Fail the WebSocket Connection_ + code = close_code::too_big; + ec = error::failed; + goto close; + } cb_.consume(zs.total_out); ws_.rd_.size += zs.total_out; + ws_.rd_.remain -= zs.total_in; + ws_.rd_.buf.consume(zs.total_in); bytes_written_ += zs.total_out; - if( - (ws_.role_ == role_type::client && - ws_.pmd_config_.server_no_context_takeover) || - (ws_.role_ == role_type::server && - ws_.pmd_config_.client_no_context_takeover)) - ws_.pmd_->zi.reset(); - ws_.rd_.done = true; - break; } - else + if(ws_.rd_.op == detail::opcode::text) { - break; + // check utf8 + if(! ws_.rd_.utf8.write( + buffer_prefix(bytes_written_, cb_.get())) || ( + ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + code = close_code::bad_payload; + ec = error::failed; + goto close; + } } - ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); - BOOST_ASSERT(ec != zlib::error::end_of_stream); - ws_.failed_ = !!ec; - if(ws_.failed_) - break; - if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( - ws_.rd_.size, zs.total_out, ws_.rd_msg_max_)) - // _Fail the WebSocket Connection_ - return ws_.do_async_fail(close_code::too_big, - error::failed, std::move(h_)); - cb_.consume(zs.total_out); - ws_.rd_.size += zs.total_out; - ws_.rd_.remain -= zs.total_in; - ws_.rd_.buf.consume(zs.total_in); - bytes_written_ += zs.total_out; + goto upcall; } - if(ws_.rd_.op == detail::opcode::text) - { - // check utf8 - if(! ws_.rd_.utf8.write( - buffer_prefix(bytes_written_, cb_.get())) || ( - ws_.rd_.remain == 0 && ws_.rd_.fh.fin && - ! ws_.rd_.utf8.finish())) - // _Fail the WebSocket Connection_ - return ws_.do_async_fail(close_code::bad_payload, - error::failed, std::move(h_)); - } - break; - } + close: + // Maybe send close frame, then teardown + BOOST_ASIO_CORO_YIELD + ws_.do_async_fail(code, ec, std::move(*this)); + BOOST_ASSERT(! ws_.wr_block_); - case do_inflate + 1: - { - ws_.failed_ = !!ec; - if(ws_.failed_) - break; - BOOST_ASSERT(bytes_transferred > 0); - ws_.rd_.buf.commit(bytes_transferred); - if(ws_.rd_.fh.mask) - detail::mask_inplace( - buffer_prefix(clamp(ws_.rd_.remain), - ws_.rd_.buf.mutable_data()), ws_.rd_.key); - did_read_ = true; - goto go_inflate; - } - } - // upcall - if(! dispatched_) - { - ws_.stream_.get_io_service().post( - bind_handler(std::move(h_), - ec, bytes_written_)); - } - else - { - h_(ec, bytes_written_); + upcall: + BOOST_ASSERT(ws_.rd_block_ == tok_); + ws_.rd_block_.reset(); + ws_.close_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke() || + ws_.wr_op_.maybe_invoke(); + if(! dispatched_) + { + ws_.stream_.get_io_service().post( + bind_handler(std::move(h_), + ec, bytes_written_)); + } + else + { + h_(ec, bytes_written_); + } } } @@ -753,9 +676,8 @@ operator()( do_read: using buffers_type = typename DynamicBuffer::mutable_buffers_type; -auto const rsh = ws_.read_size_hint(b_); auto const size = clamp( - rsh, limit_); + ws_.read_size_hint(b_), limit_); boost::optional mb; try { @@ -819,6 +741,12 @@ read(DynamicBuffer& buffer, error_code& ec) "SyncStream requirements not met"); static_assert(beast::is_dynamic_buffer::value, "DynamicBuffer requirements not met"); + // Make sure the stream is open + if(failed_) + { + ec = boost::asio::error::operation_aborted; + return 0; + } std::size_t bytes_written = 0; do { @@ -888,6 +816,12 @@ read_some( "SyncStream requirements not met"); static_assert(is_dynamic_buffer::value, "DynamicBuffer requirements not met"); + // Make sure the stream is open + if(failed_) + { + ec = boost::asio::error::operation_aborted; + return 0; + } using beast::detail::clamp; if(! limit) limit = (std::numeric_limits::max)(); @@ -972,16 +906,16 @@ read_some( static_assert(is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence requirements not met"); - using beast::detail::clamp; - using boost::asio::buffer; - using boost::asio::buffer_cast; - using boost::asio::buffer_size; // Make sure the stream is open if(failed_) { ec = boost::asio::error::operation_aborted; return 0; } + using beast::detail::clamp; + using boost::asio::buffer; + using boost::asio::buffer_cast; + using boost::asio::buffer_size; close_code code{}; std::size_t bytes_written = 0; loop: @@ -1014,7 +948,7 @@ loop: // of the buffer holding payload data. if(rd_.fh.len > 0 && rd_.fh.mask) detail::mask_inplace(buffer_prefix( - clamp(rd_.fh.len), rd_.buf.mutable_data()), + clamp(rd_.fh.len), rd_.buf.data()), rd_.key); if(detail::is_control(rd_.fh.op)) { @@ -1115,6 +1049,7 @@ loop: if(rd_.fh.mask) detail::mask_inplace(buffer_prefix( clamp(rd_.remain), mb), rd_.key); + // VFALCO Do this before masking for symmetry with the async version rd_.buf.commit(bytes_transferred); } if(rd_.buf.size() > 0) @@ -1225,7 +1160,7 @@ loop: if(rd_.fh.mask) detail::mask_inplace( buffer_prefix(clamp(rd_.remain), - rd_.buf.mutable_data()), rd_.key); + rd_.buf.data()), rd_.key); auto const in = buffer_prefix( clamp(rd_.remain), buffer_front( rd_.buf.data())); @@ -1337,4 +1272,4 @@ async_read_some( } // beast } // boost -#endif +#endif \ No newline at end of file diff --git a/include/boost/beast/websocket/impl/stream.ipp b/include/boost/beast/websocket/impl/stream.ipp index 743d5697..7c3e95b7 100644 --- a/include/boost/beast/websocket/impl/stream.ipp +++ b/include/boost/beast/websocket/impl/stream.ipp @@ -177,11 +177,13 @@ open(role_type role) rd_.cont = false; rd_.done = true; // Can't clear this because accept uses it - //rd_.buf.consume(rd_.buf.size()); + //rd_.buf.reset(); rd_.fh.fin = false; rd_close_ = false; wr_close_ = false; wr_block_.reset(); + rd_block_.reset(); + cr_.code = close_code::none; ping_data_ = nullptr; // should be nullptr on close anyway wr_.cont = false; @@ -240,6 +242,8 @@ reset() wr_close_ = false; wr_.cont = false; wr_block_.reset(); + rd_block_.reset(); + cr_.code = close_code::none; ping_data_ = nullptr; // should be nullptr on close anyway } diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index eeef041a..fb693063 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -230,12 +230,15 @@ class stream bool rd_close_; // read close frame bool wr_close_; // sent close frame token wr_block_; // op currenly writing + token rd_block_; // op currenly reading ping_data* ping_data_; // where to put the payload detail::pausation rd_op_; // paused read op detail::pausation wr_op_; // paused write op detail::pausation ping_op_; // paused ping op detail::pausation close_op_; // paused close op + detail::pausation r_rd_op_; // paused read op (read) + detail::pausation r_close_op_; // paused close op (read) close_reason cr_; // set from received close frame rd_t rd_; // read state wr_t wr_; // write state diff --git a/test/beast/core/flat_static_buffer.cpp b/test/beast/core/flat_static_buffer.cpp index 60745bdd..66f5a907 100644 --- a/test/beast/core/flat_static_buffer.cpp +++ b/test/beast/core/flat_static_buffer.cpp @@ -105,7 +105,6 @@ public: ba.commit(buffer_copy(d, buffer(s.data()+x+y, z))); } ba.commit(2); - BEAST_EXPECT(buffer_size(ba.data()) == buffer_size(ba.mutable_data())); BEAST_EXPECT(ba.size() == x + y + z); BEAST_EXPECT(buffer_size(ba.data()) == ba.size()); BEAST_EXPECT(to_string(ba.data()) == s); diff --git a/test/beast/core/static_buffer.cpp b/test/beast/core/static_buffer.cpp index b3ce143a..cb6d44db 100644 --- a/test/beast/core/static_buffer.cpp +++ b/test/beast/core/static_buffer.cpp @@ -105,7 +105,6 @@ public: ba.commit(buffer_copy(d, buffer(s.data()+x+y, z))); } ba.commit(2); - BEAST_EXPECT(buffer_size(ba.data()) == buffer_size(ba.mutable_data())); BEAST_EXPECT(ba.size() == x + y + z); BEAST_EXPECT(buffer_size(ba.data()) == ba.size()); BEAST_EXPECT(to_string(ba.data()) == s);