From 1d5b3f488e2516e5b965960a51592c4abf127243 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sat, 26 Aug 2017 15:18:02 -0700 Subject: [PATCH] Refactor websocket stream members --- CHANGELOG.md | 1 + include/boost/beast/websocket/impl/accept.ipp | 16 +- include/boost/beast/websocket/impl/close.ipp | 102 +++--- include/boost/beast/websocket/impl/fail.ipp | 4 +- .../boost/beast/websocket/impl/handshake.ipp | 4 +- include/boost/beast/websocket/impl/ping.ipp | 10 +- include/boost/beast/websocket/impl/read.ipp | 324 +++++++++--------- include/boost/beast/websocket/impl/stream.ipp | 90 +++-- include/boost/beast/websocket/impl/write.ipp | 182 +++++----- include/boost/beast/websocket/stream.hpp | 158 +++------ 10 files changed, 422 insertions(+), 469 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef090b80..12892c8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ Version 110: * Refactor stream open state variable +* Refactor websocket stream members -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/impl/accept.ipp b/include/boost/beast/websocket/impl/accept.ipp index 517d2888..dad361a3 100644 --- a/include/boost/beast/websocket/impl/accept.ipp +++ b/include/boost/beast/websocket/impl/accept.ipp @@ -225,14 +225,14 @@ run(Buffers const& buffers) auto const len = buffer_size(buffers); try { - mb.emplace(d.ws.rd_.buf.prepare(len)); + mb.emplace(d.ws.rd_buf_.prepare(len)); } catch(std::length_error const&) { ec = error::buffer_overflow; return (*this)(ec); } - d.ws.rd_.buf.commit( + d.ws.rd_buf_.commit( buffer_copy(*mb, buffers)); (*this)(ec); } @@ -257,7 +257,7 @@ operator()(error_code ec) { BOOST_ASIO_CORO_YIELD http::async_read( - d.ws.next_layer(), d.ws.rd_.buf, + d.ws.next_layer(), d.ws.rd_buf_, d.p, std::move(*this)); if(ec == http::error::end_of_stream) ec = error::closed; @@ -407,7 +407,7 @@ accept( static_buffer_base::mutable_buffers_type> mb; try { - mb.emplace(rd_.buf.prepare( + mb.emplace(rd_buf_.prepare( buffer_size(buffers))); } catch(std::length_error const&) @@ -415,7 +415,7 @@ accept( ec = error::buffer_overflow; return; } - rd_.buf.commit( + rd_buf_.commit( buffer_copy(*mb, buffers)); do_accept(&default_decorate_res, ec); } @@ -447,7 +447,7 @@ accept_ex( static_buffer_base::mutable_buffers_type> mb; try { - mb.emplace(rd_.buf.prepare( + mb.emplace(rd_buf_.prepare( buffer_size(buffers))); } catch(std::length_error const&) @@ -455,7 +455,7 @@ accept_ex( ec = error::buffer_overflow; return; } - rd_.buf.commit(buffer_copy(*mb, buffers)); + rd_buf_.commit(buffer_copy(*mb, buffers)); do_accept(decorator, ec); } @@ -713,7 +713,7 @@ do_accept( error_code& ec) { http::request_parser p; - http::read(next_layer(), rd_.buf, p, ec); + http::read(next_layer(), rd_buf_, p, ec); if(ec == http::error::end_of_stream) ec = error::closed; if(ec) diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index 5eec4918..ea1cfc7f 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -52,7 +52,7 @@ class stream::close_op stream& ws_, close_reason const& cr) : ws(ws_) - , tok(ws.t_.unique()) + , tok(ws.tok_.unique()) { // Serialize the close frame ws.template write_close< @@ -148,7 +148,7 @@ operator()(error_code ec, std::size_t bytes_transferred) // Suspend BOOST_ASSERT(d.ws.wr_block_ != d.tok); BOOST_ASIO_CORO_YIELD - d.ws.close_op_.emplace(std::move(*this)); + d.ws.paused_close_.emplace(std::move(*this)); // Acquire the write block BOOST_ASSERT(! d.ws.wr_block_); @@ -202,7 +202,7 @@ operator()(error_code ec, std::size_t bytes_transferred) // Suspend BOOST_ASSERT(d.ws.rd_block_ != d.tok); BOOST_ASIO_CORO_YIELD - d.ws.r_close_op_.emplace(std::move(*this)); + d.ws.paused_r_close_.emplace(std::move(*this)); // Acquire the read block BOOST_ASSERT(! d.ws.rd_block_); @@ -222,13 +222,13 @@ operator()(error_code ec, std::size_t bytes_transferred) } // Drain - if(d.ws.rd_.remain > 0) + if(d.ws.rd_remain_ > 0) goto read_payload; for(;;) { // Read frame header while(! d.ws.parse_fh( - d.ws.rd_.fh, d.ws.rd_.buf, code)) + d.ws.rd_fh_, d.ws.rd_buf_, code)) { if(code != close_code::none) { @@ -237,28 +237,28 @@ operator()(error_code ec, std::size_t bytes_transferred) } BOOST_ASIO_CORO_YIELD d.ws.stream_.async_read_some( - d.ws.rd_.buf.prepare(read_size(d.ws.rd_.buf, - d.ws.rd_.buf.max_size())), + d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, + d.ws.rd_buf_.max_size())), std::move(*this)); if(ec) { d.ws.open_ = false; goto upcall; } - d.ws.rd_.buf.commit(bytes_transferred); + d.ws.rd_buf_.commit(bytes_transferred); } - if(detail::is_control(d.ws.rd_.fh.op)) + if(detail::is_control(d.ws.rd_fh_.op)) { // Process control frame - if(d.ws.rd_.fh.op == detail::opcode::close) + if(d.ws.rd_fh_.op == detail::opcode::close) { BOOST_ASSERT(! d.ws.rd_close_); d.ws.rd_close_ = true; auto const mb = buffer_prefix( - clamp(d.ws.rd_.fh.len), - d.ws.rd_.buf.data()); - if(d.ws.rd_.fh.len > 0 && d.ws.rd_.fh.mask) - detail::mask_inplace(mb, d.ws.rd_.key); + clamp(d.ws.rd_fh_.len), + d.ws.rd_buf_.data()); + if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask) + detail::mask_inplace(mb, d.ws.rd_key_); detail::read_close(d.ws.cr_, mb, code); if(code != close_code::none) { @@ -266,33 +266,33 @@ operator()(error_code ec, std::size_t bytes_transferred) d.ev = error::failed; goto teardown; } - d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len)); + d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len)); goto teardown; } - d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len)); + d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len)); } else { read_payload: - while(d.ws.rd_.buf.size() < d.ws.rd_.remain) + while(d.ws.rd_buf_.size() < d.ws.rd_remain_) { - d.ws.rd_.remain -= d.ws.rd_.buf.size(); - d.ws.rd_.buf.consume(d.ws.rd_.buf.size()); + d.ws.rd_remain_ -= d.ws.rd_buf_.size(); + d.ws.rd_buf_.consume(d.ws.rd_buf_.size()); BOOST_ASIO_CORO_YIELD d.ws.stream_.async_read_some( - d.ws.rd_.buf.prepare(read_size(d.ws.rd_.buf, - d.ws.rd_.buf.max_size())), + d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, + d.ws.rd_buf_.max_size())), std::move(*this)); if(ec) { d.ws.open_ = false; goto upcall; } - d.ws.rd_.buf.commit(bytes_transferred); + d.ws.rd_buf_.commit(bytes_transferred); } - BOOST_ASSERT(d.ws.rd_.buf.size() >= d.ws.rd_.remain); - d.ws.rd_.buf.consume(clamp(d.ws.rd_.remain)); - d.ws.rd_.remain = 0; + BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_); + d.ws.rd_buf_.consume(clamp(d.ws.rd_remain_)); + d.ws.rd_remain_ = 0; } } @@ -321,11 +321,11 @@ operator()(error_code ec, std::size_t bytes_transferred) { BOOST_ASSERT(d.ws.rd_block_ = d.tok); d.ws.rd_block_.reset(); - d.ws.r_rd_op_.maybe_invoke(); + d.ws.paused_r_rd_.maybe_invoke(); } - d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke() || - d.ws.wr_op_.maybe_invoke(); + d.ws.paused_rd_.maybe_invoke() || + d.ws.paused_ping_.maybe_invoke() || + d.ws.paused_wr_.maybe_invoke(); d_.invoke(ec); } } @@ -373,66 +373,66 @@ close(close_reason const& cr, error_code& ec) return; // Drain the connection close_code code{}; - if(rd_.remain > 0) + if(rd_remain_ > 0) goto read_payload; for(;;) { // Read frame header - while(! parse_fh(rd_.fh, rd_.buf, code)) + while(! parse_fh(rd_fh_, rd_buf_, code)) { if(code != close_code::none) return do_fail(close_code::none, error::failed, ec); auto const bytes_transferred = stream_.read_some( - rd_.buf.prepare(read_size(rd_.buf, - rd_.buf.max_size())), ec); + rd_buf_.prepare(read_size(rd_buf_, + rd_buf_.max_size())), ec); open_ = ! ec; if(! open_) return; - rd_.buf.commit(bytes_transferred); + rd_buf_.commit(bytes_transferred); } - if(detail::is_control(rd_.fh.op)) + if(detail::is_control(rd_fh_.op)) { // Process control frame - if(rd_.fh.op == detail::opcode::close) + if(rd_fh_.op == detail::opcode::close) { BOOST_ASSERT(! rd_close_); rd_close_ = true; auto const mb = buffer_prefix( - clamp(rd_.fh.len), - rd_.buf.data()); - if(rd_.fh.len > 0 && rd_.fh.mask) - detail::mask_inplace(mb, rd_.key); + clamp(rd_fh_.len), + rd_buf_.data()); + if(rd_fh_.len > 0 && rd_fh_.mask) + detail::mask_inplace(mb, rd_key_); detail::read_close(cr_, mb, code); if(code != close_code::none) // Protocol error return do_fail(close_code::none, error::failed, ec); - rd_.buf.consume(clamp(rd_.fh.len)); + rd_buf_.consume(clamp(rd_fh_.len)); break; } - rd_.buf.consume(clamp(rd_.fh.len)); + rd_buf_.consume(clamp(rd_fh_.len)); } else { read_payload: - while(rd_.buf.size() < rd_.remain) + while(rd_buf_.size() < rd_remain_) { - rd_.remain -= rd_.buf.size(); - rd_.buf.consume(rd_.buf.size()); + rd_remain_ -= rd_buf_.size(); + rd_buf_.consume(rd_buf_.size()); auto const bytes_transferred = stream_.read_some( - rd_.buf.prepare(read_size(rd_.buf, - rd_.buf.max_size())), ec); + rd_buf_.prepare(read_size(rd_buf_, + rd_buf_.max_size())), ec); open_ = ! ec; if(! open_) return; - rd_.buf.commit(bytes_transferred); + rd_buf_.commit(bytes_transferred); } - BOOST_ASSERT(rd_.buf.size() >= rd_.remain); - rd_.buf.consume(clamp(rd_.remain)); - rd_.remain = 0; + BOOST_ASSERT(rd_buf_.size() >= rd_remain_); + rd_buf_.consume(clamp(rd_remain_)); + rd_remain_ = 0; } } // _Close the WebSocket Connection_ diff --git a/include/boost/beast/websocket/impl/fail.ipp b/include/boost/beast/websocket/impl/fail.ipp index 5a2cda0d..e0507075 100644 --- a/include/boost/beast/websocket/impl/fail.ipp +++ b/include/boost/beast/websocket/impl/fail.ipp @@ -55,7 +55,7 @@ class stream::fail_op : ws(ws_) , code(code_) , ev(ev_) - , tok(ws.t_.unique()) + , tok(ws.tok_.unique()) { } }; @@ -150,7 +150,7 @@ operator()(error_code ec, std::size_t) // Suspend BOOST_ASSERT(d.ws.wr_block_ != d.tok); BOOST_ASIO_CORO_YIELD - d.ws.rd_op_.emplace(std::move(*this)); // VFALCO emplace to rd_op_ + d.ws.paused_rd_.emplace(std::move(*this)); // VFALCO emplace to paused_rd_ // Acquire the write block BOOST_ASSERT(! d.ws.wr_block_); diff --git a/include/boost/beast/websocket/impl/handshake.ipp b/include/boost/beast/websocket/impl/handshake.ipp index 7923e04c..5b1879db 100644 --- a/include/boost/beast/websocket/impl/handshake.ipp +++ b/include/boost/beast/websocket/impl/handshake.ipp @@ -140,7 +140,7 @@ operator()(error_code ec) // Read HTTP response BOOST_ASIO_CORO_YIELD http::async_read(d.ws.next_layer(), - d.ws.rd_.buf, d.res, + d.ws.rd_buf_, d.res, std::move(*this)); if(ec) goto upcall; @@ -398,7 +398,7 @@ do_handshake( } if(ec) return; - http::read(next_layer(), rd_.buf, res, ec); + http::read(next_layer(), rd_buf_, res, ec); if(ec) return; on_response(res, key, ec); diff --git a/include/boost/beast/websocket/impl/ping.ipp b/include/boost/beast/websocket/impl/ping.ipp index cca37e36..70eca861 100644 --- a/include/boost/beast/websocket/impl/ping.ipp +++ b/include/boost/beast/websocket/impl/ping.ipp @@ -48,7 +48,7 @@ class stream::ping_op detail::opcode op, ping_data const& payload) : ws(ws_) - , tok(ws.t_.unique()) + , tok(ws.tok_.unique()) { // Serialize the control frame ws.template write_ping< @@ -145,7 +145,7 @@ operator()(error_code ec, std::size_t) // Suspend BOOST_ASSERT(d.ws.wr_block_ != d.tok); BOOST_ASIO_CORO_YIELD - d.ws.ping_op_.emplace(std::move(*this)); + d.ws.paused_ping_.emplace(std::move(*this)); // Acquire the write block BOOST_ASSERT(! d.ws.wr_block_); @@ -174,9 +174,9 @@ operator()(error_code ec, std::size_t) upcall: BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.ws.wr_block_.reset(); - d.ws.close_op_.maybe_invoke() || - d.ws.rd_op_.maybe_invoke() || - d.ws.wr_op_.maybe_invoke(); + d.ws.paused_close_.maybe_invoke() || + d.ws.paused_rd_.maybe_invoke() || + d.ws.paused_wr_.maybe_invoke(); d_.invoke(ec); } } diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index b71709a9..113a14b8 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -66,7 +66,7 @@ public: : h_(std::forward(h)) , ws_(ws) , cb_(bs) - , tok_(ws_.t_.unique()) + , tok_(ws_.tok_.unique()) { } @@ -154,7 +154,7 @@ operator()( // Suspend BOOST_ASSERT(ws_.rd_block_ != tok_); BOOST_ASIO_CORO_YIELD - ws_.r_rd_op_.save(std::move(*this)); + ws_.paused_r_rd_.save(std::move(*this)); // Acquire the read block BOOST_ASSERT(! ws_.rd_block_); @@ -178,12 +178,12 @@ operator()( // condition is structured to give the decompressor // a chance to emit the final empty deflate block // - if(ws_.rd_.remain == 0 && - (! ws_.rd_.fh.fin || ws_.rd_.done)) + if(ws_.rd_remain_ == 0 && + (! ws_.rd_fh_.fin || ws_.rd_done_)) { // Read frame header while(! ws_.parse_fh( - ws_.rd_.fh, ws_.rd_.buf, code)) + ws_.rd_fh_, ws_.rd_buf_, code)) { if(code != close_code::none) { @@ -193,48 +193,48 @@ operator()( } BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some( - ws_.rd_.buf.prepare(read_size( - ws_.rd_.buf, ws_.rd_.buf.max_size())), + ws_.rd_buf_.prepare(read_size( + ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); dispatched_ = true; ws_.open_ = ! ec; if(! ws_.open_) goto upcall; - ws_.rd_.buf.commit(bytes_transferred); + ws_.rd_buf_.commit(bytes_transferred); } // Immediately apply the mask to the portion // of the buffer holding payload data. - if(ws_.rd_.fh.len > 0 && ws_.rd_.fh.mask) + if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask) detail::mask_inplace(buffer_prefix( - clamp(ws_.rd_.fh.len), - ws_.rd_.buf.data()), - ws_.rd_.key); - if(detail::is_control(ws_.rd_.fh.op)) + clamp(ws_.rd_fh_.len), + ws_.rd_buf_.data()), + ws_.rd_key_); + if(detail::is_control(ws_.rd_fh_.op)) { // Clear this otherwise the next // frame will be considered final. - ws_.rd_.fh.fin = false; + ws_.rd_fh_.fin = false; // Handle ping frame - if(ws_.rd_.fh.op == detail::opcode::ping) + if(ws_.rd_fh_.op == detail::opcode::ping) { { auto const b = buffer_prefix( - clamp(ws_.rd_.fh.len), - ws_.rd_.buf.data()); + clamp(ws_.rd_fh_.len), + ws_.rd_buf_.data()); auto const len = buffer_size(b); - BOOST_ASSERT(len == ws_.rd_.fh.len); + BOOST_ASSERT(len == ws_.rd_fh_.len); ping_data payload; detail::read_ping(payload, b); - ws_.rd_.buf.consume(len); + ws_.rd_buf_.consume(len); // Ignore ping when closing if(ws_.wr_close_) goto loop; if(ws_.ctrl_cb_) ws_.ctrl_cb_(frame_type::ping, payload); - ws_.rd_.fb.reset(); + ws_.rd_fb_.reset(); ws_.template write_ping< - flat_static_buffer_base>(ws_.rd_.fb, + flat_static_buffer_base>(ws_.rd_fb_, detail::opcode::pong, payload); } // Maybe suspend @@ -248,7 +248,7 @@ operator()( // Suspend BOOST_ASSERT(ws_.wr_block_ != tok_); BOOST_ASIO_CORO_YIELD - ws_.rd_op_.save(std::move(*this)); + ws_.paused_rd_.save(std::move(*this)); // Acquire the write block BOOST_ASSERT(! ws_.wr_block_); @@ -280,7 +280,7 @@ operator()( BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, - ws_.rd_.fb.data(), std::move(*this)); + ws_.rd_fb_.data(), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); dispatched_ = true; ws_.wr_block_.reset(); @@ -290,28 +290,28 @@ operator()( goto loop; } // Handle pong frame - if(ws_.rd_.fh.op == detail::opcode::pong) + if(ws_.rd_fh_.op == detail::opcode::pong) { auto const cb = buffer_prefix(clamp( - ws_.rd_.fh.len), ws_.rd_.buf.data()); + ws_.rd_fh_.len), ws_.rd_buf_.data()); auto const len = buffer_size(cb); - BOOST_ASSERT(len == ws_.rd_.fh.len); + BOOST_ASSERT(len == ws_.rd_fh_.len); code = close_code::none; ping_data payload; detail::read_ping(payload, cb); - ws_.rd_.buf.consume(len); + ws_.rd_buf_.consume(len); // Ignore pong when closing if(! ws_.wr_close_ && ws_.ctrl_cb_) ws_.ctrl_cb_(frame_type::pong, payload); goto loop; } // Handle close frame - BOOST_ASSERT(ws_.rd_.fh.op == detail::opcode::close); + BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close); { auto const cb = buffer_prefix(clamp( - ws_.rd_.fh.len), ws_.rd_.buf.data()); + ws_.rd_fh_.len), ws_.rd_buf_.data()); auto const len = buffer_size(cb); - BOOST_ASSERT(len == ws_.rd_.fh.len); + BOOST_ASSERT(len == ws_.rd_fh_.len); BOOST_ASSERT(! ws_.rd_close_); ws_.rd_close_ = true; close_reason cr; @@ -323,7 +323,7 @@ operator()( goto close; } ws_.cr_ = cr; - ws_.rd_.buf.consume(len); + ws_.rd_buf_.consume(len); if(ws_.ctrl_cb_) ws_.ctrl_cb_(frame_type::close, ws_.cr_.reason); @@ -342,52 +342,52 @@ operator()( goto close; } } - if(ws_.rd_.fh.len == 0 && ! ws_.rd_.fh.fin) + if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin) { // Empty non-final frame goto loop; } - ws_.rd_.done = false; + ws_.rd_done_ = false; } if(! ws_.pmd_ || ! ws_.pmd_->rd_set) { - if(ws_.rd_.remain > 0) + if(ws_.rd_remain_ > 0) { - if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > - (std::min)(clamp(ws_.rd_.remain), + if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() > + (std::min)(clamp(ws_.rd_remain_), buffer_size(cb_))) { // Fill the read buffer first, otherwise we // get fewer bytes at the cost of one I/O. BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some( - ws_.rd_.buf.prepare(read_size( - ws_.rd_.buf, ws_.rd_.buf.max_size())), + ws_.rd_buf_.prepare(read_size( + ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); dispatched_ = true; ws_.open_ = ! ec; if(! ws_.open_) goto upcall; - ws_.rd_.buf.commit(bytes_transferred); - if(ws_.rd_.fh.mask) + ws_.rd_buf_.commit(bytes_transferred); + if(ws_.rd_fh_.mask) detail::mask_inplace(buffer_prefix(clamp( - ws_.rd_.remain), ws_.rd_.buf.data()), - ws_.rd_.key); + ws_.rd_remain_), ws_.rd_buf_.data()), + ws_.rd_key_); } - if(ws_.rd_.buf.size() > 0) + if(ws_.rd_buf_.size() > 0) { // Copy from the read buffer. // The mask was already applied. bytes_transferred = buffer_copy(cb_, - ws_.rd_.buf.data(), clamp(ws_.rd_.remain)); + ws_.rd_buf_.data(), clamp(ws_.rd_remain_)); auto const mb = buffer_prefix( bytes_transferred, cb_); - ws_.rd_.remain -= bytes_transferred; - if(ws_.rd_.op == detail::opcode::text) + ws_.rd_remain_ -= bytes_transferred; + if(ws_.rd_op_ == detail::opcode::text) { - if(! ws_.rd_.utf8.write(mb) || - (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && - ! ws_.rd_.utf8.finish())) + if(! ws_.rd_utf8_.write(mb) || + (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin && + ! ws_.rd_utf8_.finish())) { // _Fail the WebSocket Connection_ code = close_code::bad_payload; @@ -396,17 +396,17 @@ operator()( } } bytes_written_ += bytes_transferred; - ws_.rd_.size += bytes_transferred; - ws_.rd_.buf.consume(bytes_transferred); + ws_.rd_size_ += bytes_transferred; + ws_.rd_buf_.consume(bytes_transferred); } else { // Read into caller's buffer - BOOST_ASSERT(ws_.rd_.remain > 0); + BOOST_ASSERT(ws_.rd_remain_ > 0); BOOST_ASSERT(buffer_size(cb_) > 0); BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some(buffer_prefix( - clamp(ws_.rd_.remain), cb_), std::move(*this)); + clamp(ws_.rd_remain_), cb_), std::move(*this)); dispatched_ = true; ws_.open_ = ! ec; if(! ws_.open_) @@ -414,14 +414,14 @@ operator()( BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffer_prefix( bytes_transferred, cb_); - ws_.rd_.remain -= bytes_transferred; - if(ws_.rd_.fh.mask) - detail::mask_inplace(mb, ws_.rd_.key); - if(ws_.rd_.op == detail::opcode::text) + ws_.rd_remain_ -= bytes_transferred; + if(ws_.rd_fh_.mask) + detail::mask_inplace(mb, ws_.rd_key_); + if(ws_.rd_op_ == detail::opcode::text) { - if(! ws_.rd_.utf8.write(mb) || - (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && - ! ws_.rd_.utf8.finish())) + if(! ws_.rd_utf8_.write(mb) || + (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin && + ! ws_.rd_utf8_.finish())) { // _Fail the WebSocket Connection_ code = close_code::bad_payload; @@ -430,38 +430,38 @@ operator()( } } bytes_written_ += bytes_transferred; - ws_.rd_.size += bytes_transferred; + ws_.rd_size_ += bytes_transferred; } } - ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin; + ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin; goto upcall; } else { // Read compressed message frame payload: - // inflate even if rd_.fh.len == 0, otherwise we + // inflate even if rd_fh_.len == 0, otherwise we // never emit the end-of-stream deflate block. while(buffer_size(cb_) > 0) { - if( ws_.rd_.remain > 0 && - ws_.rd_.buf.size() == 0 && + if( ws_.rd_remain_ > 0 && + ws_.rd_buf_.size() == 0 && ! did_read_) { // read new BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some( - ws_.rd_.buf.prepare(read_size( - ws_.rd_.buf, ws_.rd_.buf.max_size())), + ws_.rd_buf_.prepare(read_size( + ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); ws_.open_ = ! ec; if(! ws_.open_) goto upcall; BOOST_ASSERT(bytes_transferred > 0); - ws_.rd_.buf.commit(bytes_transferred); - if(ws_.rd_.fh.mask) + ws_.rd_buf_.commit(bytes_transferred); + if(ws_.rd_fh_.mask) detail::mask_inplace( - buffer_prefix(clamp(ws_.rd_.remain), - ws_.rd_.buf.data()), ws_.rd_.key); + buffer_prefix(clamp(ws_.rd_remain_), + ws_.rd_buf_.data()), ws_.rd_key_); did_read_ = true; } zlib::z_params zs; @@ -471,14 +471,14 @@ operator()( zs.avail_out = buffer_size(out); BOOST_ASSERT(zs.avail_out > 0); } - if(ws_.rd_.remain > 0) + if(ws_.rd_remain_ > 0) { - if(ws_.rd_.buf.size() > 0) + if(ws_.rd_buf_.size() > 0) { // use what's there auto const in = buffer_prefix( - clamp(ws_.rd_.remain), buffer_front( - ws_.rd_.buf.data())); + clamp(ws_.rd_remain_), buffer_front( + ws_.rd_buf_.data())); zs.avail_in = buffer_size(in); zs.next_in = buffer_cast(in); } @@ -487,7 +487,7 @@ operator()( break; } } - else if(ws_.rd_.fh.fin) + else if(ws_.rd_fh_.fin) { // append the empty block codes static std::uint8_t constexpr @@ -504,7 +504,7 @@ operator()( // https://github.com/madler/zlib/issues/280 BOOST_ASSERT(zs.total_out == 0); cb_.consume(zs.total_out); - ws_.rd_.size += zs.total_out; + ws_.rd_size_ += zs.total_out; bytes_written_ += zs.total_out; if( (ws_.role_ == role_type::client && @@ -512,7 +512,7 @@ operator()( (ws_.role_ == role_type::server && ws_.pmd_config_.client_no_context_takeover)) ws_.pmd_->zi.reset(); - ws_.rd_.done = true; + ws_.rd_done_ = true; break; } else @@ -525,7 +525,7 @@ operator()( if(! ws_.open_) break; if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( - ws_.rd_.size, zs.total_out, ws_.rd_msg_max_)) + ws_.rd_size_, zs.total_out, ws_.rd_msg_max_)) { // _Fail the WebSocket Connection_ code = close_code::too_big; @@ -533,17 +533,17 @@ operator()( goto close; } cb_.consume(zs.total_out); - ws_.rd_.size += zs.total_out; - ws_.rd_.remain -= zs.total_in; - ws_.rd_.buf.consume(zs.total_in); + ws_.rd_size_ += zs.total_out; + ws_.rd_remain_ -= zs.total_in; + ws_.rd_buf_.consume(zs.total_in); bytes_written_ += zs.total_out; } - if(ws_.rd_.op == detail::opcode::text) + if(ws_.rd_op_ == detail::opcode::text) { // check utf8 - if(! ws_.rd_.utf8.write( + if(! ws_.rd_utf8_.write( buffer_prefix(bytes_written_, cb_.get())) || ( - ws_.rd_.done && ! ws_.rd_.utf8.finish())) + ws_.rd_done_ && ! ws_.rd_utf8_.finish())) { // _Fail the WebSocket Connection_ code = close_code::bad_payload; @@ -564,10 +564,10 @@ operator()( upcall: BOOST_ASSERT(ws_.rd_block_ == tok_); ws_.rd_block_.reset(); - ws_.r_close_op_.maybe_invoke(); - ws_.close_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke() || - ws_.wr_op_.maybe_invoke(); + ws_.paused_r_close_.maybe_invoke(); + ws_.paused_close_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke() || + ws_.paused_wr_.maybe_invoke(); if(! dispatched_) { ws_.stream_.get_io_service().post( @@ -903,10 +903,10 @@ loop: // condition is structured to give the decompressor // a chance to emit the final empty deflate block // - if(rd_.remain == 0 && (! rd_.fh.fin || rd_.done)) + if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_)) { // Read frame header - while(! parse_fh(rd_.fh, rd_.buf, code)) + while(! parse_fh(rd_fh_, rd_buf_, code)) { if(code != close_code::none) { @@ -916,38 +916,38 @@ loop: } auto const bytes_transferred = stream_.read_some( - rd_.buf.prepare(read_size( - rd_.buf, rd_.buf.max_size())), + rd_buf_.prepare(read_size( + rd_buf_, rd_buf_.max_size())), ec); open_ = ! ec; if(! open_) return bytes_written; - rd_.buf.commit(bytes_transferred); + rd_buf_.commit(bytes_transferred); } // Immediately apply the mask to the portion // of the buffer holding payload data. - if(rd_.fh.len > 0 && rd_.fh.mask) + if(rd_fh_.len > 0 && rd_fh_.mask) detail::mask_inplace(buffer_prefix( - clamp(rd_.fh.len), rd_.buf.data()), - rd_.key); - if(detail::is_control(rd_.fh.op)) + clamp(rd_fh_.len), rd_buf_.data()), + rd_key_); + if(detail::is_control(rd_fh_.op)) { // Get control frame payload auto const b = buffer_prefix( - clamp(rd_.fh.len), rd_.buf.data()); + clamp(rd_fh_.len), rd_buf_.data()); auto const len = buffer_size(b); - BOOST_ASSERT(len == rd_.fh.len); + BOOST_ASSERT(len == rd_fh_.len); // Clear this otherwise the next // frame will be considered final. - rd_.fh.fin = false; + rd_fh_.fin = false; // Handle ping frame - if(rd_.fh.op == detail::opcode::ping) + if(rd_fh_.op == detail::opcode::ping) { ping_data payload; detail::read_ping(payload, b); - rd_.buf.consume(len); + rd_buf_.consume(len); if(wr_close_) { // Ignore ping when closing @@ -965,17 +965,17 @@ loop: goto loop; } // Handle pong frame - if(rd_.fh.op == detail::opcode::pong) + if(rd_fh_.op == detail::opcode::pong) { ping_data payload; detail::read_ping(payload, b); - rd_.buf.consume(len); + rd_buf_.consume(len); if(ctrl_cb_) ctrl_cb_(frame_type::pong, payload); goto loop; } // Handle close frame - BOOST_ASSERT(rd_.fh.op == detail::opcode::close); + BOOST_ASSERT(rd_fh_.op == detail::opcode::close); { BOOST_ASSERT(! rd_close_); rd_close_ = true; @@ -988,7 +988,7 @@ loop: return bytes_written; } cr_ = cr; - rd_.buf.consume(len); + rd_buf_.consume(len); if(ctrl_cb_) ctrl_cb_(frame_type::close, cr_.reason); BOOST_ASSERT(! wr_close_); @@ -1000,12 +1000,12 @@ loop: return bytes_written; } } - if(rd_.fh.len == 0 && ! rd_.fh.fin) + if(rd_fh_.len == 0 && ! rd_fh_.fin) { // Empty non-final frame goto loop; } - rd_.done = false; + rd_done_ = false; } else { @@ -1013,40 +1013,40 @@ loop: } if(! pmd_ || ! pmd_->rd_set) { - if(rd_.remain > 0) + if(rd_remain_ > 0) { - if(rd_.buf.size() == 0 && rd_.buf.max_size() > - (std::min)(clamp(rd_.remain), + if(rd_buf_.size() == 0 && rd_buf_.max_size() > + (std::min)(clamp(rd_remain_), buffer_size(buffers))) { // Fill the read buffer first, otherwise we // get fewer bytes at the cost of one I/O. - rd_.buf.commit(stream_.read_some( - rd_.buf.prepare(read_size(rd_.buf, - rd_.buf.max_size())), ec)); + rd_buf_.commit(stream_.read_some( + rd_buf_.prepare(read_size(rd_buf_, + rd_buf_.max_size())), ec)); open_ = ! ec; if(! open_) return bytes_written; - if(rd_.fh.mask) + if(rd_fh_.mask) detail::mask_inplace( - buffer_prefix(clamp(rd_.remain), - rd_.buf.data()), rd_.key); + buffer_prefix(clamp(rd_remain_), + rd_buf_.data()), rd_key_); } - if(rd_.buf.size() > 0) + if(rd_buf_.size() > 0) { // Copy from the read buffer. // The mask was already applied. auto const bytes_transferred = - buffer_copy(buffers, rd_.buf.data(), - clamp(rd_.remain)); + buffer_copy(buffers, rd_buf_.data(), + clamp(rd_remain_)); auto const mb = buffer_prefix( bytes_transferred, buffers); - rd_.remain -= bytes_transferred; - if(rd_.op == detail::opcode::text) + rd_remain_ -= bytes_transferred; + if(rd_op_ == detail::opcode::text) { - if(! rd_.utf8.write(mb) || - (rd_.remain == 0 && rd_.fh.fin && - ! rd_.utf8.finish())) + if(! rd_utf8_.write(mb) || + (rd_remain_ == 0 && rd_fh_.fin && + ! rd_utf8_.finish())) { // _Fail the WebSocket Connection_ do_fail( @@ -1057,31 +1057,31 @@ loop: } } bytes_written += bytes_transferred; - rd_.size += bytes_transferred; - rd_.buf.consume(bytes_transferred); + rd_size_ += bytes_transferred; + rd_buf_.consume(bytes_transferred); } else { // Read into caller's buffer - BOOST_ASSERT(rd_.remain > 0); + BOOST_ASSERT(rd_remain_ > 0); BOOST_ASSERT(buffer_size(buffers) > 0); auto const bytes_transferred = stream_.read_some(buffer_prefix( - clamp(rd_.remain), buffers), ec); + clamp(rd_remain_), buffers), ec); open_ = ! ec; if(! open_) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffer_prefix( bytes_transferred, buffers); - rd_.remain -= bytes_transferred; - if(rd_.fh.mask) - detail::mask_inplace(mb, rd_.key); - if(rd_.op == detail::opcode::text) + rd_remain_ -= bytes_transferred; + if(rd_fh_.mask) + detail::mask_inplace(mb, rd_key_); + if(rd_op_ == detail::opcode::text) { - if(! rd_.utf8.write(mb) || - (rd_.remain == 0 && rd_.fh.fin && - ! rd_.utf8.finish())) + if(! rd_utf8_.write(mb) || + (rd_remain_ == 0 && rd_fh_.fin && + ! rd_utf8_.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, @@ -1090,15 +1090,15 @@ loop: } } bytes_written += bytes_transferred; - rd_.size += bytes_transferred; + rd_size_ += bytes_transferred; } } - rd_.done = rd_.remain == 0 && rd_.fh.fin; + rd_done_ = rd_remain_ == 0 && rd_fh_.fin; } else { // Read compressed message frame payload: - // inflate even if rd_.fh.len == 0, otherwise we + // inflate even if rd_fh_.len == 0, otherwise we // never emit the end-of-stream deflate block. // bool did_read = false; @@ -1112,14 +1112,14 @@ loop: zs.avail_out = buffer_size(out); BOOST_ASSERT(zs.avail_out > 0); } - if(rd_.remain > 0) + if(rd_remain_ > 0) { - if(rd_.buf.size() > 0) + if(rd_buf_.size() > 0) { // use what's there auto const in = buffer_prefix( - clamp(rd_.remain), buffer_front( - rd_.buf.data())); + clamp(rd_remain_), buffer_front( + rd_buf_.data())); zs.avail_in = buffer_size(in); zs.next_in = buffer_cast(in); } @@ -1128,21 +1128,21 @@ loop: // read new auto const bytes_transferred = stream_.read_some( - rd_.buf.prepare(read_size( - rd_.buf, rd_.buf.max_size())), + rd_buf_.prepare(read_size( + rd_buf_, rd_buf_.max_size())), ec); open_ = ! ec; if(! open_) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); - rd_.buf.commit(bytes_transferred); - if(rd_.fh.mask) + rd_buf_.commit(bytes_transferred); + if(rd_fh_.mask) detail::mask_inplace( - buffer_prefix(clamp(rd_.remain), - rd_.buf.data()), rd_.key); + buffer_prefix(clamp(rd_remain_), + rd_buf_.data()), rd_key_); auto const in = buffer_prefix( - clamp(rd_.remain), buffer_front( - rd_.buf.data())); + clamp(rd_remain_), buffer_front( + rd_buf_.data())); zs.avail_in = buffer_size(in); zs.next_in = buffer_cast(in); did_read = true; @@ -1152,7 +1152,7 @@ loop: break; } } - else if(rd_.fh.fin) + else if(rd_fh_.fin) { // append the empty block codes static std::uint8_t constexpr @@ -1169,7 +1169,7 @@ loop: // https://github.com/madler/zlib/issues/280 BOOST_ASSERT(zs.total_out == 0); cb.consume(zs.total_out); - rd_.size += zs.total_out; + rd_size_ += zs.total_out; bytes_written += zs.total_out; if( (role_ == role_type::client && @@ -1177,7 +1177,7 @@ loop: (role_ == role_type::server && pmd_config_.client_no_context_takeover)) pmd_->zi.reset(); - rd_.done = true; + rd_done_ = true; break; } else @@ -1190,24 +1190,24 @@ loop: if(! open_) return bytes_written; if(rd_msg_max_ && beast::detail::sum_exceeds( - rd_.size, zs.total_out, rd_msg_max_)) + rd_size_, zs.total_out, rd_msg_max_)) { do_fail(close_code::too_big, error::failed, ec); return bytes_written; } cb.consume(zs.total_out); - rd_.size += zs.total_out; - rd_.remain -= zs.total_in; - rd_.buf.consume(zs.total_in); + rd_size_ += zs.total_out; + rd_remain_ -= zs.total_in; + rd_buf_.consume(zs.total_in); bytes_written += zs.total_out; } - if(rd_.op == detail::opcode::text) + if(rd_op_ == detail::opcode::text) { // check utf8 - if(! rd_.utf8.write( + if(! rd_utf8_.write( buffer_prefix(bytes_written, buffers)) || ( - rd_.done && ! rd_.utf8.finish())) + rd_done_ && ! rd_utf8_.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, diff --git a/include/boost/beast/websocket/impl/stream.ipp b/include/boost/beast/websocket/impl/stream.ipp index 6bd75d12..b311d0b2 100644 --- a/include/boost/beast/websocket/impl/stream.ipp +++ b/include/boost/beast/websocket/impl/stream.ipp @@ -45,9 +45,9 @@ template stream:: stream(Args&&... args) : stream_(std::forward(args)...) - , t_(1) + , tok_(1) { - BOOST_ASSERT(rd_.buf.max_size() >= + BOOST_ASSERT(rd_buf_.max_size() >= max_control_frame_size); } @@ -60,26 +60,26 @@ read_size_hint( using beast::detail::clamp; std::size_t result; BOOST_ASSERT(initial_size > 0); - if(! pmd_ || (! rd_.done && ! pmd_->rd_set)) + if(! pmd_ || (! rd_done_ && ! pmd_->rd_set)) { // current message is uncompressed - if(rd_.done) + if(rd_done_) { // first message frame result = initial_size; goto done; } - else if(rd_.fh.fin) + else if(rd_fh_.fin) { // last message frame - BOOST_ASSERT(rd_.remain > 0); - result = clamp(rd_.remain); + BOOST_ASSERT(rd_remain_ > 0); + result = clamp(rd_remain_); goto done; } } result = (std::max)( - initial_size, clamp(rd_.remain)); + initial_size, clamp(rd_remain_)); done: BOOST_ASSERT(result != 0); return result; @@ -135,21 +135,20 @@ open(role_type role) // VFALCO TODO analyze and remove dupe code in reset() role_ = role; open_ = true; - rd_.remain = 0; - rd_.cont = false; - rd_.done = true; + rd_remain_ = 0; + rd_cont_ = false; + rd_done_ = true; // Can't clear this because accept uses it - //rd_.buf.reset(); - rd_.fh.fin = false; + //rd_buf_.reset(); + rd_fh_.fin = false; rd_close_ = false; wr_close_ = false; wr_block_.reset(); rd_block_.reset(); cr_.code = close_code::none; - ping_data_ = nullptr; // should be nullptr on close anyway - wr_.cont = false; - wr_.buf_size = 0; + wr_cont_ = false; + wr_buf_size_ = 0; if(((role_ == role_type::client && pmd_opts_.client_enable) || (role_ == role_type::server && pmd_opts_.server_enable)) && @@ -185,7 +184,7 @@ void stream:: close() { - wr_.buf.reset(); + wr_buf_.reset(); pmd_.reset(); } @@ -196,44 +195,43 @@ reset() { BOOST_ASSERT(! open_); open_ = false; // VFALCO is this needed? - rd_.remain = 0; - rd_.cont = false; - rd_.done = true; - rd_.buf.consume(rd_.buf.size()); - rd_.fh.fin = false; + rd_remain_ = 0; + rd_cont_ = false; + rd_done_ = true; + rd_buf_.consume(rd_buf_.size()); + rd_fh_.fin = false; rd_close_ = false; wr_close_ = false; - wr_.cont = false; + wr_cont_ = false; wr_block_.reset(); rd_block_.reset(); cr_.code = close_code::none; - ping_data_ = nullptr; // should be nullptr on close anyway } // Called before each write frame template void stream:: -wr_begin() +begin_msg() { - wr_.autofrag = wr_autofrag_; - wr_.compress = static_cast(pmd_); + wr_frag_ = wr_frag_opt_; + wr_compress_ = static_cast(pmd_); // Maintain the write buffer - if( wr_.compress || + if( wr_compress_ || role_ == role_type::client) { - if(! wr_.buf || wr_.buf_size != wr_buf_size_) + if(! wr_buf_ || wr_buf_size_ != wr_buf_opt_) { - wr_.buf_size = wr_buf_size_; - wr_.buf = boost::make_unique_noinit< - std::uint8_t[]>(wr_.buf_size); + wr_buf_size_ = wr_buf_opt_; + wr_buf_ = boost::make_unique_noinit< + std::uint8_t[]>(wr_buf_size_); } } else { - wr_.buf_size = wr_buf_size_; - wr_.buf.reset(); + wr_buf_size_ = wr_buf_opt_; + wr_buf_.reset(); } } @@ -298,7 +296,7 @@ parse_fh( { case detail::opcode::binary: case detail::opcode::text: - if(rd_.cont) + if(rd_cont_) { // new data frame when continuation expected return err(close_code::protocol_error); @@ -314,7 +312,7 @@ parse_fh( break; case detail::opcode::cont: - if(! rd_.cont) + if(! rd_cont_) { // continuation without an active message return err(close_code::protocol_error); @@ -393,7 +391,7 @@ parse_fh( BOOST_ASSERT(buffer_size(cb) >= sizeof(tmp)); cb.consume(buffer_copy(buffer(tmp), cb)); fh.key = detail::little_uint32_to_native(&tmp[0]); - detail::prepare_key(rd_.key, fh.key); + detail::prepare_key(rd_key_, fh.key); } else { @@ -404,23 +402,23 @@ parse_fh( { if(fh.op != detail::opcode::cont) { - rd_.size = 0; - rd_.op = fh.op; + rd_size_ = 0; + rd_op_ = fh.op; } else { - if(rd_.size > (std::numeric_limits< + if(rd_size_ > (std::numeric_limits< std::uint64_t>::max)() - fh.len) return err(close_code::too_big); } if(! pmd_ || ! pmd_->rd_set) { if(rd_msg_max_ && beast::detail::sum_exceeds( - rd_.size, fh.len, rd_msg_max_)) + rd_size_, fh.len, rd_msg_max_)) return err(close_code::too_big); } - rd_.cont = ! fh.fin; - rd_.remain = fh.len; + rd_cont_ = ! fh.fin; + rd_remain_ = fh.len; } b.consume(b.size() - buffer_size(cb)); code = close_code::none; @@ -444,7 +442,7 @@ write_close(DynamicBuffer& db, close_reason const& cr) 0 : 2 + cr.reason.size(); fh.mask = role_ == role_type::client; if(fh.mask) - fh.key = maskgen_(); + fh.key = wr_gen_(); detail::write(db, fh); if(cr.code != close_code::none) { @@ -491,7 +489,7 @@ write_ping(DynamicBuffer& db, fh.len = data.size(); fh.mask = role_ == role_type::client; if(fh.mask) - fh.key = maskgen_(); + fh.key = wr_gen_(); detail::write(db, fh); if(data.empty()) return; @@ -525,7 +523,7 @@ build_request(detail::sec_ws_key_type& key, req.set(http::field::host, host); req.set(http::field::upgrade, "websocket"); req.set(http::field::connection, "upgrade"); - detail::make_sec_ws_key(key, maskgen_); + detail::make_sec_ws_key(key, wr_gen_); req.set(http::field::sec_websocket_key, key); req.set(http::field::sec_websocket_version, "13"); if(pmd_opts_.client_enable) diff --git a/include/boost/beast/websocket/impl/write.ipp b/include/boost/beast/websocket/impl/write.ipp index 7866fa58..52cf919e 100644 --- a/include/boost/beast/websocket/impl/write.ipp +++ b/include/boost/beast/websocket/impl/write.ipp @@ -63,7 +63,7 @@ public: : h_(std::forward(h)) , ws_(ws) , cb_(bs) - , tok_(ws_.t_.unique()) + , tok_(ws_.tok_.unique()) , fin_(fin) { } @@ -149,10 +149,10 @@ operator()(error_code ec, BOOST_ASIO_CORO_REENTER(*this) { // Set up the outgoing frame header - if(! ws_.wr_.cont) + if(! ws_.wr_cont_) { - ws_.wr_begin(); - fh_.rsv1 = ws_.wr_.compress; + ws_.begin_msg(); + fh_.rsv1 = ws_.wr_compress_; } else { @@ -160,27 +160,27 @@ operator()(error_code ec, } fh_.rsv2 = false; fh_.rsv3 = false; - fh_.op = ws_.wr_.cont ? + fh_.op = ws_.wr_cont_ ? detail::opcode::cont : ws_.wr_opcode_; fh_.mask = ws_.role_ == role_type::client; // Choose a write algorithm - if(ws_.wr_.compress) + if(ws_.wr_compress_) { how_ = do_deflate; } else if(! fh_.mask) { - if(! ws_.wr_.autofrag) + if(! ws_.wr_frag_) { how_ = do_nomask_nofrag; } else { - BOOST_ASSERT(ws_.wr_.buf_size != 0); + BOOST_ASSERT(ws_.wr_buf_size_ != 0); remain_ = buffer_size(cb_); - if(remain_ > ws_.wr_.buf_size) + if(remain_ > ws_.wr_buf_size_) how_ = do_nomask_frag; else how_ = do_nomask_nofrag; @@ -188,15 +188,15 @@ operator()(error_code ec, } else { - if(! ws_.wr_.autofrag) + if(! ws_.wr_frag_) { how_ = do_mask_nofrag; } else { - BOOST_ASSERT(ws_.wr_.buf_size != 0); + BOOST_ASSERT(ws_.wr_buf_size_ != 0); remain_ = buffer_size(cb_); - if(remain_ > ws_.wr_.buf_size) + if(remain_ > ws_.wr_buf_size_) how_ = do_mask_frag; else how_ = do_mask_nofrag; @@ -225,7 +225,7 @@ operator()(error_code ec, // Suspend BOOST_ASSERT(ws_.wr_block_ != tok_); BOOST_ASIO_CORO_YIELD - ws_.wr_op_.save(std::move(*this)); + ws_.paused_wr_.save(std::move(*this)); // Acquire the write block BOOST_ASSERT(! ws_.wr_block_); @@ -250,15 +250,15 @@ operator()(error_code ec, { fh_.fin = fin_; fh_.len = buffer_size(cb_); - ws_.wr_.fb.reset(); + ws_.wr_fb_.reset(); detail::write( - ws_.wr_.fb, fh_); - ws_.wr_.cont = ! fin_; + ws_.wr_fb_, fh_); + ws_.wr_cont_ = ! fin_; // Send frame BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, - buffer_cat(ws_.wr_.fb.data(), cb_), + buffer_cat(ws_.wr_fb_.data(), cb_), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); if(ec) @@ -272,19 +272,19 @@ operator()(error_code ec, { for(;;) { - fh_.len = clamp(remain_, ws_.wr_.buf_size); + fh_.len = clamp(remain_, ws_.wr_buf_size_); remain_ -= clamp(fh_.len); fh_.fin = fin_ ? remain_ == 0 : false; - ws_.wr_.fb.reset(); + ws_.wr_fb_.reset(); detail::write( - ws_.wr_.fb, fh_); - ws_.wr_.cont = ! fin_; + ws_.wr_fb_, fh_); + ws_.wr_cont_ = ! fin_; // Send frame BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write( ws_.stream_, buffer_cat( - ws_.wr_.fb.data(), buffer_prefix( + ws_.wr_fb_.data(), buffer_prefix( clamp(fh_.len), cb_)), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); @@ -296,14 +296,14 @@ operator()(error_code ec, if(remain_ == 0) goto upcall; cb_.consume( - bytes_transferred - ws_.wr_.fb.size()); + bytes_transferred - ws_.wr_fb_.size()); fh_.op = detail::opcode::cont; // Allow outgoing control frames to // be sent in between message frames ws_.wr_block_.reset(); - if( ws_.close_op_.maybe_invoke() || - ws_.rd_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke()) + if( ws_.paused_close_.maybe_invoke() || + ws_.paused_rd_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke()) { BOOST_ASIO_CORO_YIELD ws_.get_io_service().post( @@ -321,24 +321,24 @@ operator()(error_code ec, remain_ = buffer_size(cb_); fh_.fin = fin_; fh_.len = remain_; - fh_.key = ws_.maskgen_(); + fh_.key = ws_.wr_gen_(); detail::prepare_key(key_, fh_.key); - ws_.wr_.fb.reset(); + ws_.wr_fb_.reset(); detail::write( - ws_.wr_.fb, fh_); - n = clamp(remain_, ws_.wr_.buf_size); + ws_.wr_fb_, fh_); + n = clamp(remain_, ws_.wr_buf_size_); buffer_copy(buffer( - ws_.wr_.buf.get(), n), cb_); + ws_.wr_buf_.get(), n), cb_); detail::mask_inplace(buffer( - ws_.wr_.buf.get(), n), key_); + ws_.wr_buf_.get(), n), key_); remain_ -= n; - ws_.wr_.cont = ! fin_; + ws_.wr_cont_ = ! fin_; // Send frame header and partial payload BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write( - ws_.stream_, buffer_cat(ws_.wr_.fb.data(), - buffer(ws_.wr_.buf.get(), n)), + ws_.stream_, buffer_cat(ws_.wr_fb_.data(), + buffer(ws_.wr_buf_.get(), n)), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); if(ec) @@ -348,18 +348,18 @@ operator()(error_code ec, } while(remain_ > 0) { - cb_.consume(ws_.wr_.buf_size); - n = clamp(remain_, ws_.wr_.buf_size); + cb_.consume(ws_.wr_buf_size_); + n = clamp(remain_, ws_.wr_buf_size_); buffer_copy(buffer( - ws_.wr_.buf.get(), n), cb_); + ws_.wr_buf_.get(), n), cb_); detail::mask_inplace(buffer( - ws_.wr_.buf.get(), n), key_); + ws_.wr_buf_.get(), n), key_); remain_ -= n; // Send partial payload BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, - buffer(ws_.wr_.buf.get(), n), + buffer(ws_.wr_buf_.get(), n), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); if(ec) @@ -377,26 +377,26 @@ operator()(error_code ec, { for(;;) { - n = clamp(remain_, ws_.wr_.buf_size); + n = clamp(remain_, ws_.wr_buf_size_); remain_ -= n; fh_.len = n; - fh_.key = ws_.maskgen_(); + fh_.key = ws_.wr_gen_(); fh_.fin = fin_ ? remain_ == 0 : false; detail::prepare_key(key_, fh_.key); buffer_copy(buffer( - ws_.wr_.buf.get(), n), cb_); + ws_.wr_buf_.get(), n), cb_); detail::mask_inplace(buffer( - ws_.wr_.buf.get(), n), key_); - ws_.wr_.fb.reset(); + ws_.wr_buf_.get(), n), key_); + ws_.wr_fb_.reset(); detail::write( - ws_.wr_.fb, fh_); - ws_.wr_.cont = ! fin_; + ws_.wr_fb_, fh_); + ws_.wr_cont_ = ! fin_; // Send frame BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, - buffer_cat(ws_.wr_.fb.data(), - buffer(ws_.wr_.buf.get(), n)), + buffer_cat(ws_.wr_fb_.data(), + buffer(ws_.wr_buf_.get(), n)), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); if(ec) @@ -407,14 +407,14 @@ operator()(error_code ec, if(remain_ == 0) goto upcall; cb_.consume( - bytes_transferred - ws_.wr_.fb.size()); + bytes_transferred - ws_.wr_fb_.size()); fh_.op = detail::opcode::cont; // Allow outgoing control frames to // be sent in between message frames: ws_.wr_block_.reset(); - if( ws_.close_op_.maybe_invoke() || - ws_.rd_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke()) + if( ws_.paused_close_.maybe_invoke() || + ws_.paused_rd_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke()) { BOOST_ASIO_CORO_YIELD ws_.get_io_service().post( @@ -431,8 +431,8 @@ operator()(error_code ec, { for(;;) { - b = buffer(ws_.wr_.buf.get(), - ws_.wr_.buf_size); + b = buffer(ws_.wr_buf_.get(), + ws_.wr_buf_size_); more_ = detail::deflate( ws_.pmd_->zo, b, cb_, fin_, ec); ws_.open_ = ! ec; @@ -464,22 +464,22 @@ operator()(error_code ec, } if(fh_.mask) { - fh_.key = ws_.maskgen_(); + fh_.key = ws_.wr_gen_(); detail::prepared_key key; detail::prepare_key(key, fh_.key); detail::mask_inplace(b, key); } fh_.fin = ! more_; fh_.len = n; - ws_.wr_.fb.reset(); + ws_.wr_fb_.reset(); detail::write< - flat_static_buffer_base>(ws_.wr_.fb, fh_); - ws_.wr_.cont = ! fin_; + flat_static_buffer_base>(ws_.wr_fb_, fh_); + ws_.wr_cont_ = ! fin_; // Send frame BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, - buffer_cat(ws_.wr_.fb.data(), + buffer_cat(ws_.wr_fb_.data(), mutable_buffers_1{b}), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); if(ec) @@ -494,9 +494,9 @@ operator()(error_code ec, // Allow outgoing control frames to // be sent in between message frames: ws_.wr_block_.reset(); - if( ws_.close_op_.maybe_invoke() || - ws_.rd_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke()) + if( ws_.paused_close_.maybe_invoke() || + ws_.paused_rd_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke()) { BOOST_ASIO_CORO_YIELD ws_.get_io_service().post( @@ -524,9 +524,9 @@ operator()(error_code ec, upcall: BOOST_ASSERT(ws_.wr_block_ == tok_); ws_.wr_block_.reset(); - ws_.close_op_.maybe_invoke() || - ws_.rd_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke(); + ws_.paused_close_.maybe_invoke() || + ws_.paused_rd_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke(); h_(ec); } } @@ -573,10 +573,10 @@ write_some(bool fin, return; } detail::frame_header fh; - if(! wr_.cont) + if(! wr_cont_) { - wr_begin(); - fh.rsv1 = wr_.compress; + begin_msg(); + fh.rsv1 = wr_compress_; } else { @@ -584,18 +584,18 @@ write_some(bool fin, } fh.rsv2 = false; fh.rsv3 = false; - fh.op = wr_.cont ? + fh.op = wr_cont_ ? detail::opcode::cont : wr_opcode_; fh.mask = role_ == role_type::client; auto remain = buffer_size(buffers); - if(wr_.compress) + if(wr_compress_) { consuming_buffers< ConstBufferSequence> cb{buffers}; for(;;) { auto b = buffer( - wr_.buf.get(), wr_.buf_size); + wr_buf_.get(), wr_buf_size_); auto const more = detail::deflate( pmd_->zo, b, cb, fin, ec); open_ = ! ec; @@ -614,7 +614,7 @@ write_some(bool fin, } if(fh.mask) { - fh.key = maskgen_(); + fh.key = wr_gen_(); detail::prepared_key key; detail::prepare_key(key, fh.key); detail::mask_inplace(b, key); @@ -624,7 +624,7 @@ write_some(bool fin, detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); - wr_.cont = ! fin; + wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); open_ = ! ec; @@ -645,7 +645,7 @@ write_some(bool fin, } if(! fh.mask) { - if(! wr_.autofrag) + if(! wr_frag_) { // no mask, no autofrag fh.fin = fin; @@ -653,7 +653,7 @@ write_some(bool fin, detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); - wr_.cont = ! fin; + wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), buffers), ec); open_ = ! ec; @@ -663,19 +663,19 @@ write_some(bool fin, else { // no mask, autofrag - BOOST_ASSERT(wr_.buf_size != 0); + BOOST_ASSERT(wr_buf_size_ != 0); consuming_buffers< ConstBufferSequence> cb{buffers}; for(;;) { - auto const n = clamp(remain, wr_.buf_size); + auto const n = clamp(remain, wr_buf_size_); remain -= n; fh.len = n; fh.fin = fin ? remain == 0 : false; detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); - wr_.cont = ! fin; + wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), buffer_prefix(n, cb)), ec); @@ -690,12 +690,12 @@ write_some(bool fin, } return; } - if(! wr_.autofrag) + if(! wr_frag_) { // mask, no autofrag fh.fin = fin; fh.len = remain; - fh.key = maskgen_(); + fh.key = wr_gen_(); detail::prepared_key key; detail::prepare_key(key, fh.key); detail::fh_buffer fh_buf; @@ -704,13 +704,13 @@ write_some(bool fin, consuming_buffers< ConstBufferSequence> cb{buffers}; { - auto const n = clamp(remain, wr_.buf_size); - auto const b = buffer(wr_.buf.get(), n); + auto const n = clamp(remain, wr_buf_size_); + auto const b = buffer(wr_buf_.get(), n); buffer_copy(b, cb); cb.consume(n); remain -= n; detail::mask_inplace(b, key); - wr_.cont = ! fin; + wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); open_ = ! ec; @@ -719,8 +719,8 @@ write_some(bool fin, } while(remain > 0) { - auto const n = clamp(remain, wr_.buf_size); - auto const b = buffer(wr_.buf.get(), n); + auto const n = clamp(remain, wr_buf_size_); + auto const b = buffer(wr_buf_.get(), n); buffer_copy(b, cb); cb.consume(n); remain -= n; @@ -734,22 +734,22 @@ write_some(bool fin, } { // mask, autofrag - BOOST_ASSERT(wr_.buf_size != 0); + BOOST_ASSERT(wr_buf_size_ != 0); consuming_buffers< ConstBufferSequence> cb{buffers}; for(;;) { - fh.key = maskgen_(); + fh.key = wr_gen_(); detail::prepared_key key; detail::prepare_key(key, fh.key); - auto const n = clamp(remain, wr_.buf_size); - auto const b = buffer(wr_.buf.get(), n); + auto const n = clamp(remain, wr_buf_size_); + auto const b = buffer(wr_buf_.get(), n); buffer_copy(b, cb); detail::mask_inplace(b, key); fh.len = n; remain -= n; fh.fin = fin ? remain == 0 : false; - wr_.cont = ! fh.fin; + wr_cont_ = ! fh.fin; detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index 159b3743..58ddfecf 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -146,67 +146,6 @@ class stream void reset() { id_ = 0; } }; - using control_cb_type = - std::function; - - // State information for the message being received - // - struct rd_t - { - detail::frame_header fh; // current frame header - detail::prepared_key key; // current stateful mask key - std::uint64_t size; // total size of current message so far - std::uint64_t remain; // message frame bytes left in current frame - detail::frame_buffer fb; // to write control frames (during reads) - detail::utf8_checker utf8; // to validate utf8 - - // A small, circular buffer to read frame headers. - // This improves performance by avoiding small reads. - static_buffer<+tcp_frame_size> buf; - - // opcode of current message being read - detail::opcode op; - - // `true` if the next frame is a continuation. - bool cont; - - bool done; // set when a message is done - }; - - // State information for the message being sent - // - struct wr_t - { - // `true` if next frame is a continuation, - // `false` if next frame starts a new message - bool cont; - - // `true` if this message should be auto-fragmented - // This gets set to the auto-fragment option at the beginning - // of sending a message, so that the option can be changed - // mid-send without affecting the current message. - bool autofrag; - - // `true` if this message should be compressed. - // This gets set to the compress option at the beginning of - // of sending a message, so that the option can be changed - // mid-send without affecting the current message. - bool compress; - - // Size of the write buffer. - // This gets set to the write buffer size option at the - // beginning of sending a message, so that the option can be - // changed mid-send without affecting the current message. - std::size_t buf_size; - - // The write buffer. Used for compression and masking. - // The buffer is allocated or reallocated at the beginning of - // sending a message. - std::unique_ptr buf; - - detail::fh_buffer fb; - }; - // State information for the permessage-deflate extension struct pmd_t { @@ -217,46 +156,61 @@ class stream zlib::inflate_stream zi; }; - NextLayer stream_; // the wrapped stream - detail::maskgen maskgen_; // source of mask keys - std::size_t rd_msg_max_ = - 16 * 1024 * 1024; // max message size - bool wr_autofrag_ = true; // auto fragment - std::size_t wr_buf_size_ = 4096; // write buffer size - std::size_t rd_buf_size_ = 4096; // read buffer size - detail::opcode wr_opcode_ = - detail::opcode::text; // outgoing message type - control_cb_type ctrl_cb_; // control callback - role_type role_; // server or client - bool open_ = false; // `true` if established + using control_cb_type = + std::function; - bool rd_close_; // read close frame - bool wr_close_; // sent close frame - token wr_block_; // op currenly writing - token rd_block_; // op currenly reading + NextLayer stream_; // the wrapped stream + close_reason cr_; // set from received close frame + control_cb_type ctrl_cb_; // control callback - ping_data* ping_data_; // where to put the payload - detail::pausation rd_op_; // paused read op - detail::pausation wr_op_; // paused write op - detail::pausation ping_op_; // paused ping op - detail::pausation close_op_; // paused close op - detail::pausation r_rd_op_; // paused read op (read) - detail::pausation r_close_op_; // paused close op (read) - close_reason cr_; // set from received close frame - rd_t rd_; // read state - wr_t wr_; // write state + std::size_t rd_msg_max_ // max message size + = 16 * 1024 * 1024; + std::uint64_t rd_size_; // total size of current message so far + std::uint64_t rd_remain_; // message frame bytes left in current frame + detail::frame_header rd_fh_; // current frame header + detail::prepared_key rd_key_; // current stateful mask key + detail::frame_buffer rd_fb_; // to write control frames (during reads) + detail::utf8_checker rd_utf8_; // to validate utf8 + static_buffer< + +tcp_frame_size> rd_buf_; // buffer for reads + detail::opcode rd_op_; // current message binary or text + bool rd_cont_; // `true` if the next frame is a continuation + bool rd_done_; // set when a message is done + bool rd_close_; // did we read a close frame? + token rd_block_; // op currenly reading - // If not engaged, then permessage-deflate is not - // enabled for the currently active session. - std::unique_ptr pmd_; + token tok_; // used to order asynchronous ops + role_type role_; // server or client + bool open_ // `true` if connected + = false; - // Local options for permessage-deflate - permessage_deflate pmd_opts_; + token wr_block_; // op currenly writing + bool wr_close_; // did we write a close frame? + bool wr_cont_; // next write is a continuation + bool wr_frag_; // autofrag the current message + bool wr_frag_opt_ // autofrag option setting + = true; + bool wr_compress_; // compress current message + detail::opcode wr_opcode_ // message type + = detail::opcode::text; + std::unique_ptr< + std::uint8_t[]> wr_buf_; // write buffer + std::size_t wr_buf_size_; // write buffer size (current message) + std::size_t wr_buf_opt_ // write buffer size option setting + = 4096; + detail::fh_buffer wr_fb_; // header buffer used for writes + detail::maskgen wr_gen_; // source of mask keys - // Offer for clients, negotiated result for servers - detail::pmd_offer pmd_config_; + detail::pausation paused_rd_; // paused read op + detail::pausation paused_wr_; // paused write op + detail::pausation paused_ping_; // paused ping op + detail::pausation paused_close_; // paused close op + detail::pausation paused_r_rd_; // paused read op (read) + detail::pausation paused_r_close_;// paused close op (read) - token t_; + std::unique_ptr pmd_; // pmd settings or nullptr + permessage_deflate pmd_opts_; // local pmd options + detail::pmd_offer pmd_config_; // offer (client) or negotiation (server) public: /// The type of the next layer. @@ -402,7 +356,7 @@ public: bool got_binary() const { - return rd_.op == detail::opcode::binary; + return rd_op_ == detail::opcode::binary; } /** Returns `true` if the latest message data indicates text. @@ -424,7 +378,7 @@ public: bool is_message_done() const { - return rd_.done; + return rd_done_; } /** Returns the close reason received from the peer. @@ -516,14 +470,14 @@ public: void auto_fragment(bool value) { - wr_autofrag_ = value; + wr_frag_opt_ = value; } /// Returns `true` if the automatic fragmentation option is set. bool auto_fragment() const { - return wr_autofrag_; + return wr_frag_opt_; } /** Set the binary message option. @@ -682,14 +636,14 @@ public: if(amount < 8) BOOST_THROW_EXCEPTION(std::invalid_argument{ "write buffer size underflow"}); - wr_buf_size_ = amount; + wr_buf_opt_ = amount; }; /// Returns the size of the write buffer. std::size_t write_buffer_size() const { - return wr_buf_size_; + return wr_buf_opt_; } /** Set the text message option. @@ -3412,7 +3366,7 @@ private: void open(role_type role); void close(); void reset(); - void wr_begin(); + void begin_msg(); template bool