From 6496aa89eefd760846df9cf8cdad9e630eb11a83 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Thu, 29 Jun 2017 11:36:14 -0700 Subject: [PATCH] Allow close, ping, and write to happen concurrently --- CHANGELOG.md | 3 + include/beast/websocket/impl/close.ipp | 2 +- include/beast/websocket/impl/ping.ipp | 6 +- include/beast/websocket/impl/read.ipp | 95 ++-- include/beast/websocket/impl/write.ipp | 731 ++++++++++++------------- include/beast/websocket/stream.hpp | 7 +- 6 files changed, 405 insertions(+), 439 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1866596c..d65b9a00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ WebSockets: * Fine tune websocket op asserts * Refactor websocket composed ops +* Allow close, ping, and write to happen concurrently +* Fix race in websocket read op +* Fix websocket write op -------------------------------------------------------------------------------- diff --git a/include/beast/websocket/impl/close.ipp b/include/beast/websocket/impl/close.ipp index 1b1c8461..add4bd6b 100644 --- a/include/beast/websocket/impl/close.ipp +++ b/include/beast/websocket/impl/close.ipp @@ -125,7 +125,7 @@ operator()(error_code ec, std::size_t) { // suspend d.state = 1; - d.ws.wr_op_.emplace(std::move(*this)); + d.ws.close_op_.emplace(std::move(*this)); return; } d.ws.wr_block_ = &d; diff --git a/include/beast/websocket/impl/ping.ipp b/include/beast/websocket/impl/ping.ipp index 1edd2b1d..dc37278e 100644 --- a/include/beast/websocket/impl/ping.ipp +++ b/include/beast/websocket/impl/ping.ipp @@ -133,10 +133,9 @@ operator()(error_code ec, std::size_t) if(d.ws.failed_ || d.ws.wr_close_) { // call handler - d.ws.get_io_service().post( + return d.ws.get_io_service().post( bind_handler(std::move(*this), boost::asio::error::operation_aborted)); - return; } do_write: @@ -176,7 +175,8 @@ operator()(error_code ec, std::size_t) upcall: BOOST_ASSERT(d.ws.wr_block_ == &d); d.ws.wr_block_ = nullptr; - d.ws.rd_op_.maybe_invoke() || + d.ws.close_op_.maybe_invoke() || + d.ws.rd_op_.maybe_invoke() || d.ws.wr_op_.maybe_invoke(); d_.invoke(ec); } diff --git a/include/beast/websocket/impl/read.ipp b/include/beast/websocket/impl/read.ipp index e757bd43..d8ef021a 100644 --- a/include/beast/websocket/impl/read.ipp +++ b/include/beast/websocket/impl/read.ipp @@ -81,7 +81,6 @@ public: : d_(std::forward(h), ws, std::forward(args)...) { - (*this)(error_code{}, 0, false); } void operator()() @@ -165,9 +164,8 @@ operator()(error_code ec, do_control_payload = 8, do_control = 9, do_pong_resume = 10, - do_pong = 12, + do_ponged = 12, do_close_resume = 14, - do_close = 16, do_teardown = 17, do_fail = 19, @@ -221,7 +219,7 @@ operator()(error_code ec, d.remain = d.fh.len; if(d.fh.mask) detail::prepare_key(d.key, d.fh.key); - // fall through + BEAST_FALLTHROUGH; case do_read_payload + 1: d.state = do_read_payload + 2; @@ -452,15 +450,15 @@ operator()(error_code ec, if(d.ws.wr_block_) { // suspend - d.state = do_pong_resume; BOOST_ASSERT(d.ws.wr_block_ != &d); + d.state = do_pong_resume; d.ws.rd_op_.emplace(std::move(*this)); return; } - d.state = do_pong; - break; + d.ws.wr_block_ = &d; + goto go_pong; } - else if(d.fh.op == detail::opcode::pong) + if(d.fh.op == detail::opcode::pong) { code = close_code::none; ping_data payload; @@ -496,12 +494,13 @@ operator()(error_code ec, if(d.ws.wr_block_) { // suspend + BOOST_ASSERT(d.ws.wr_block_ != &d); d.state = do_close_resume; d.ws.rd_op_.emplace(std::move(*this)); return; } - d.state = do_close; - break; + d.ws.wr_block_ = &d; + goto go_close; } d.state = do_teardown; break; @@ -513,48 +512,47 @@ operator()(error_code ec, BOOST_ASSERT(! d.ws.wr_block_); d.ws.wr_block_ = &d; d.state = do_pong_resume + 1; + // The current context is safe but might not be + // the same as the one for this operation (since + // we are being called from a write operation). + // Call post to make sure we are invoked the same + // way as the final handler for this operation. d.ws.get_io_service().post(bind_handler( - std::move(*this), ec, bytes_transferred)); + std::move(*this), ec, 0)); return; case do_pong_resume + 1: + BOOST_ASSERT(d.ws.wr_block_ == &d); if(d.ws.failed_) { // call handler ec = boost::asio::error::operation_aborted; goto upcall; } - BEAST_FALLTHROUGH; - - //------------------------------------------------------------------ - - case do_pong: if(d.ws.wr_close_) { // ignore ping when closing - if(d.ws.wr_block_) - { - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.wr_block_ = nullptr; - } + d.ws.wr_block_ = nullptr; d.fb.consume(d.fb.size()); d.state = do_read_fh; break; } + BEAST_FALLTHROUGH; + + //------------------------------------------------------------------ + + go_pong: // send pong - if(! d.ws.wr_block_) - d.ws.wr_block_ = &d; - else - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.state = do_pong + 1; + BOOST_ASSERT(d.ws.wr_block_ == &d); + d.state = do_ponged; boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); return; - case do_pong + 1: + case do_ponged: + d.ws.wr_block_ = nullptr; d.fb.consume(d.fb.size()); d.state = do_read_fh; - d.ws.wr_block_ = nullptr; break; //------------------------------------------------------------------ @@ -582,20 +580,16 @@ operator()(error_code ec, } if(d.ws.wr_close_) { - // call handler + // already sent a close frame ec = error::closed; goto upcall; } - d.state = do_close; - break; + BEAST_FALLTHROUGH; //------------------------------------------------------------------ - case do_close: - if(! d.ws.wr_block_) - d.ws.wr_block_ = &d; - else - BOOST_ASSERT(d.ws.wr_block_ == &d); + go_close: + BOOST_ASSERT(d.ws.wr_block_ == &d); d.state = do_teardown; d.ws.wr_close_ = true; boost::asio::async_write(d.ws.stream_, @@ -629,34 +623,45 @@ operator()(error_code ec, if(d.ws.wr_block_) { // suspend + BOOST_ASSERT(d.ws.wr_block_ != &d); d.state = do_fail + 2; d.ws.rd_op_.emplace(std::move(*this)); return; } - // fall through + d.ws.wr_block_ = &d; + BEAST_FALLTHROUGH; case do_fail + 1: + BOOST_ASSERT(d.ws.wr_block_ == &d); d.ws.failed_ = true; // send close frame d.state = do_fail + 4; d.ws.wr_close_ = true; - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); return; case do_fail + 2: + // resume + BOOST_ASSERT(! d.ws.wr_block_); + d.ws.wr_block_ = &d; d.state = do_fail + 3; + // The current context is safe but might not be + // the same as the one for this operation (since + // we are being called from a write operation). + // Call post to make sure we are invoked the same + // way as the final handler for this operation. d.ws.get_io_service().post(bind_handler( std::move(*this), ec, bytes_transferred)); return; case do_fail + 3: - if(d.ws.failed_) + BOOST_ASSERT(d.ws.wr_block_ == &d); + if(d.ws.failed_ || d.ws.wr_close_) { - d.state = do_fail + 5; - break; + // call handler + ec = error::failed; + goto upcall; } d.state = do_fail + 1; break; @@ -683,7 +688,8 @@ operator()(error_code ec, upcall: if(d.ws.wr_block_ == &d) d.ws.wr_block_ = nullptr; - d.ws.ping_op_.maybe_invoke() || + d.ws.close_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke() || d.ws.wr_op_.maybe_invoke(); bool const fin = (! ec) ? d.fh.fin : false; d_.invoke(ec, fin); @@ -704,7 +710,8 @@ async_read_frame(DynamicBuffer& buffer, ReadHandler&& handler) void(error_code, bool)> init{handler}; read_frame_op>{ - init.completion_handler,*this, buffer}; + init.completion_handler,*this, buffer}( + {}, 0, false); return init.result.get(); } diff --git a/include/beast/websocket/impl/write.ipp b/include/beast/websocket/impl/write.ipp index e6036fbf..0da91b11 100644 --- a/include/beast/websocket/impl/write.ipp +++ b/include/beast/websocket/impl/write.ipp @@ -70,24 +70,16 @@ public: : d_(std::forward(h), ws, std::forward(args)...) { - (*this)(error_code{}, 0, false); } void operator()() { - (*this)(error_code{}, 0, true); - } - - void operator()(error_code const& ec) - { - (*this)(ec, 0, true); + (*this)({}, 0, true); } void operator()(error_code ec, - std::size_t bytes_transferred); - - void operator()(error_code ec, - std::size_t bytes_transferred, bool again); + std::size_t bytes_transferred, + bool again = true); friend void* asio_handler_allocate( @@ -123,19 +115,6 @@ public: } }; -template -template -void -stream:: -write_frame_op:: -operator()(error_code ec, std::size_t bytes_transferred) -{ - auto& d = *d_; - if(ec) - d.ws.failed_ = true; - (*this)(ec, bytes_transferred, true); -} - template template void @@ -162,387 +141,363 @@ operator()(error_code ec, auto& d = *d_; d.cont = d.cont || again; if(ec) - goto upcall; - for(;;) { - switch(d.step) + BOOST_ASSERT(d.ws.wr_block_ == &d); + d.ws.failed_ = true; + goto upcall; + } +loop: + switch(d.step) + { + case do_init: + if(! d.ws.wr_.cont) { - case do_init: - if(! d.ws.wr_.cont) - { - d.ws.wr_begin(); - d.fh.rsv1 = d.ws.wr_.compress; - } - else - { - d.fh.rsv1 = false; - } - d.fh.rsv2 = false; - d.fh.rsv3 = false; - d.fh.op = d.ws.wr_.cont ? - detail::opcode::cont : d.ws.wr_opcode_; - d.fh.mask = - d.ws.role_ == role_type::client; - - // entry_state determines which algorithm - // we will use to send. If we suspend, we - // will transition to entry_state + 1 on - // the resume. - if(d.ws.wr_.compress) - { - d.entry_state = do_deflate; - } - else if(! d.fh.mask) - { - if(! d.ws.wr_.autofrag) - { - d.entry_state = do_nomask_nofrag; - } - else - { - BOOST_ASSERT(d.ws.wr_.buf_size != 0); - d.remain = buffer_size(d.cb); - if(d.remain > d.ws.wr_.buf_size) - d.entry_state = do_nomask_frag; - else - d.entry_state = do_nomask_nofrag; - } - } - else - { - if(! d.ws.wr_.autofrag) - { - d.entry_state = do_mask_nofrag; - } - else - { - BOOST_ASSERT(d.ws.wr_.buf_size != 0); - d.remain = buffer_size(d.cb); - if(d.remain > d.ws.wr_.buf_size) - d.entry_state = do_mask_frag; - else - d.entry_state = do_mask_nofrag; - } - } - d.step = do_maybe_suspend; - break; - - //---------------------------------------------------------------------- - - case do_nomask_nofrag: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - BEAST_FALLTHROUGH; - - case do_nomask_nofrag + 1: - { - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.fh.fin = d.fin; - d.fh.len = buffer_size(d.cb); - detail::write( - d.fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = do_upcall; - boost::asio::async_write(d.ws.stream_, - buffer_cat(d.fh_buf.data(), d.cb), - std::move(*this)); - return; + d.ws.wr_begin(); + d.fh.rsv1 = d.ws.wr_.compress; } - - //---------------------------------------------------------------------- - - case do_nomask_frag: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - BEAST_FALLTHROUGH; - - case do_nomask_frag + 1: + else { - BOOST_ASSERT(d.ws.wr_block_ == &d); - auto const n = clamp( - d.remain, d.ws.wr_.buf_size); - d.remain -= n; - d.fh.len = n; - d.fh.fin = d.fin ? d.remain == 0 : false; - detail::write( - d.fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = d.remain == 0 ? - do_upcall : do_nomask_frag + 2; - boost::asio::async_write(d.ws.stream_, - buffer_cat(d.fh_buf.data(), - buffer_prefix(n, d.cb)), - std::move(*this)); - return; - } - - case do_nomask_frag + 2: - d.cb.consume( - bytes_transferred - d.fh_buf.size()); - d.fh_buf.consume(d.fh_buf.size()); - d.fh.op = detail::opcode::cont; - if(d.ws.wr_block_ == &d) - d.ws.wr_block_ = nullptr; - // Allow outgoing control frames to - // be sent in between message frames: - if(d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke()) - { - d.step = do_maybe_suspend; - d.ws.get_io_service().post( - std::move(*this)); - return; - } - d.step = d.entry_state; - break; - - //---------------------------------------------------------------------- - - case do_mask_nofrag: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - BEAST_FALLTHROUGH; - - case do_mask_nofrag + 1: - { - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.remain = buffer_size(d.cb); - d.fh.fin = d.fin; - d.fh.len = d.remain; - d.fh.key = d.ws.maskgen_(); - detail::prepare_key(d.key, d.fh.key); - detail::write( - d.fh_buf, d.fh); - auto const n = - clamp(d.remain, d.ws.wr_.buf_size); - auto const b = - buffer(d.ws.wr_.buf.get(), n); - buffer_copy(b, d.cb); - detail::mask_inplace(b, d.key); - d.remain -= n; - d.ws.wr_.cont = ! d.fin; - // Send frame header and partial payload - d.step = d.remain == 0 ? - do_upcall : do_mask_nofrag + 2; - boost::asio::async_write(d.ws.stream_, - buffer_cat(d.fh_buf.data(), b), - std::move(*this)); - return; - } - - case do_mask_nofrag + 2: - { - d.cb.consume(d.ws.wr_.buf_size); - auto const n = - clamp(d.remain, d.ws.wr_.buf_size); - auto const b = - buffer(d.ws.wr_.buf.get(), n); - buffer_copy(b, d.cb); - detail::mask_inplace(b, d.key); - d.remain -= n; - // Send parial payload - if(d.remain == 0) - d.step = do_upcall; - boost::asio::async_write( - d.ws.stream_, b, std::move(*this)); - return; - } - - //---------------------------------------------------------------------- - - case do_mask_frag: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - BEAST_FALLTHROUGH; - - case do_mask_frag + 1: - { - BOOST_ASSERT(d.ws.wr_block_ == &d); - auto const n = clamp( - d.remain, d.ws.wr_.buf_size); - d.remain -= n; - d.fh.len = n; - d.fh.key = d.ws.maskgen_(); - d.fh.fin = d.fin ? d.remain == 0 : false; - detail::prepare_key(d.key, d.fh.key); - auto const b = buffer( - d.ws.wr_.buf.get(), n); - buffer_copy(b, d.cb); - detail::mask_inplace(b, d.key); - detail::write( - d.fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = d.remain == 0 ? - do_upcall : do_mask_frag + 2; - boost::asio::async_write(d.ws.stream_, - buffer_cat(d.fh_buf.data(), b), - std::move(*this)); - return; - } - - case do_mask_frag + 2: - d.cb.consume( - bytes_transferred - d.fh_buf.size()); - d.fh_buf.consume(d.fh_buf.size()); - d.fh.op = detail::opcode::cont; - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.wr_block_ = nullptr; - // Allow outgoing control frames to - // be sent in between message frames: - if(d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke()) - { - d.step = do_maybe_suspend; - d.ws.get_io_service().post( - std::move(*this)); - return; - } - d.step = d.entry_state; - break; - - //---------------------------------------------------------------------- - - case do_deflate: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - BEAST_FALLTHROUGH; - - case do_deflate + 1: - { - BOOST_ASSERT(d.ws.wr_block_ == &d); - auto b = buffer(d.ws.wr_.buf.get(), - d.ws.wr_.buf_size); - auto const more = detail::deflate( - d.ws.pmd_->zo, b, d.cb, d.fin, ec); - d.ws.failed_ = !!ec; - if(d.ws.failed_) - goto upcall; - auto const n = buffer_size(b); - if(n == 0) - { - // The input was consumed, but there - // is no output due to compression - // latency. - BOOST_ASSERT(! d.fin); - BOOST_ASSERT(buffer_size(d.cb) == 0); - - // We can skip the dispatch if the - // asynchronous initiation function is - // not on call stack but its hard to - // figure out so be safe and dispatch. - d.step = do_upcall; - d.ws.get_io_service().post(std::move(*this)); - return; - } - if(d.fh.mask) - { - d.fh.key = d.ws.maskgen_(); - detail::prepared_key key; - detail::prepare_key(key, d.fh.key); - detail::mask_inplace(b, key); - } - d.fh.fin = ! more; - d.fh.len = n; - detail::fh_streambuf fh_buf; - detail::write(fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = more ? - do_deflate + 2 : do_deflate + 3; - boost::asio::async_write(d.ws.stream_, - buffer_cat(fh_buf.data(), b), - std::move(*this)); - return; - } - - case do_deflate + 2: - d.fh.op = detail::opcode::cont; d.fh.rsv1 = false; - BOOST_ASSERT(d.ws.wr_block_ == &d); - d.ws.wr_block_ = nullptr; - // Allow outgoing control frames to - // be sent in between message frames: - if(d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke()) - { - d.step = do_maybe_suspend; - d.ws.get_io_service().post( - std::move(*this)); - return; - } - d.step = d.entry_state; - break; + } + d.fh.rsv2 = false; + d.fh.rsv3 = false; + d.fh.op = d.ws.wr_.cont ? + detail::opcode::cont : d.ws.wr_opcode_; + d.fh.mask = + d.ws.role_ == role_type::client; - case do_deflate + 3: - if(d.fh.fin && ( - (d.ws.role_ == role_type::client && - d.ws.pmd_config_.client_no_context_takeover) || - (d.ws.role_ == role_type::server && - d.ws.pmd_config_.server_no_context_takeover))) - d.ws.pmd_->zo.reset(); - goto upcall; - - //---------------------------------------------------------------------- - - case do_maybe_suspend: + // entry_state determines which algorithm + // we will use to send. If we suspend, we + // will transition to entry_state + 1 on + // the resume. + if(d.ws.wr_.compress) { - if(d.ws.wr_block_) - { - // suspend - d.step = do_maybe_suspend + 1; - d.ws.wr_op_.emplace(std::move(*this)); - return; - } - if(d.ws.failed_ || d.ws.wr_close_) - { - // call handler - d.step = do_upcall; - d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); - return; - } - d.step = d.entry_state; - break; + d.entry_state = do_deflate; } - - case do_maybe_suspend + 1: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = &d; - d.step = do_maybe_suspend + 2; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - d.ws.get_io_service().post(bind_handler( - std::move(*this), ec)); - return; - - case do_maybe_suspend + 2: - BOOST_ASSERT(d.ws.wr_block_ == &d); - if(d.ws.failed_ || d.ws.wr_close_) + else if(! d.fh.mask) + { + if(! d.ws.wr_.autofrag) { - // call handler - ec = boost::asio::error::operation_aborted; - goto upcall; + d.entry_state = do_nomask_nofrag; } - d.step = d.entry_state + 1; - break; + else + { + BOOST_ASSERT(d.ws.wr_.buf_size != 0); + d.remain = buffer_size(d.cb); + if(d.remain > d.ws.wr_.buf_size) + d.entry_state = do_nomask_frag; + else + d.entry_state = do_nomask_nofrag; + } + } + else + { + if(! d.ws.wr_.autofrag) + { + d.entry_state = do_mask_nofrag; + } + else + { + BOOST_ASSERT(d.ws.wr_.buf_size != 0); + d.remain = buffer_size(d.cb); + if(d.remain > d.ws.wr_.buf_size) + d.entry_state = do_mask_frag; + else + d.entry_state = do_mask_nofrag; + } + } + d.step = do_maybe_suspend; + goto loop; - //---------------------------------------------------------------------- + //---------------------------------------------------------------------- - case do_upcall: + case do_nomask_nofrag: + BOOST_ASSERT(d.ws.wr_block_ == &d); + d.fh.fin = d.fin; + d.fh.len = buffer_size(d.cb); + detail::write( + d.fh_buf, d.fh); + d.ws.wr_.cont = ! d.fin; + // Send frame + d.step = do_upcall; + return boost::asio::async_write(d.ws.stream_, + buffer_cat(d.fh_buf.data(), d.cb), + std::move(*this)); + + //---------------------------------------------------------------------- + + go_nomask_frag: + case do_nomask_frag: + { + BOOST_ASSERT(d.ws.wr_block_ == &d); + auto const n = clamp( + d.remain, d.ws.wr_.buf_size); + d.remain -= n; + d.fh.len = n; + d.fh.fin = d.fin ? d.remain == 0 : false; + detail::write( + d.fh_buf, d.fh); + d.ws.wr_.cont = ! d.fin; + // Send frame + d.step = d.remain == 0 ? + do_upcall : do_nomask_frag + 1; + return boost::asio::async_write( + d.ws.stream_, buffer_cat( + d.fh_buf.data(), buffer_prefix( + n, d.cb)), std::move(*this)); + } + + case do_nomask_frag + 1: + BOOST_ASSERT(d.ws.wr_block_ == &d); + d.ws.wr_block_ = nullptr; + d.cb.consume( + bytes_transferred - d.fh_buf.size()); + d.fh_buf.consume(d.fh_buf.size()); + d.fh.op = detail::opcode::cont; + // Allow outgoing control frames to + // be sent in between message frames + if( d.ws.close_op_.maybe_invoke() || + d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke()) + { + d.step = do_maybe_suspend; + return d.ws.get_io_service().post( + std::move(*this)); + } + d.ws.wr_block_ = &d; + goto go_nomask_frag; + + //---------------------------------------------------------------------- + + case do_mask_nofrag: + { + BOOST_ASSERT(d.ws.wr_block_ == &d); + d.remain = buffer_size(d.cb); + d.fh.fin = d.fin; + d.fh.len = d.remain; + d.fh.key = d.ws.maskgen_(); + detail::prepare_key(d.key, d.fh.key); + detail::write( + d.fh_buf, d.fh); + auto const n = + clamp(d.remain, d.ws.wr_.buf_size); + auto const b = + buffer(d.ws.wr_.buf.get(), n); + buffer_copy(b, d.cb); + detail::mask_inplace(b, d.key); + d.remain -= n; + d.ws.wr_.cont = ! d.fin; + // Send frame header and partial payload + d.step = d.remain == 0 ? + do_upcall : do_mask_nofrag + 1; + return boost::asio::async_write( + d.ws.stream_, buffer_cat(d.fh_buf.data(), + b), std::move(*this)); + } + + case do_mask_nofrag + 1: + { + d.cb.consume(d.ws.wr_.buf_size); + auto const n = + clamp(d.remain, d.ws.wr_.buf_size); + auto const b = + buffer(d.ws.wr_.buf.get(), n); + buffer_copy(b, d.cb); + detail::mask_inplace(b, d.key); + d.remain -= n; + // Send partial payload + if(d.remain == 0) + d.step = do_upcall; + return boost::asio::async_write( + d.ws.stream_, b, std::move(*this)); + } + + //---------------------------------------------------------------------- + + go_mask_frag: + case do_mask_frag: + { + BOOST_ASSERT(d.ws.wr_block_ == &d); + auto const n = clamp( + d.remain, d.ws.wr_.buf_size); + d.remain -= n; + d.fh.len = n; + d.fh.key = d.ws.maskgen_(); + d.fh.fin = d.fin ? d.remain == 0 : false; + detail::prepare_key(d.key, d.fh.key); + auto const b = buffer( + d.ws.wr_.buf.get(), n); + buffer_copy(b, d.cb); + detail::mask_inplace(b, d.key); + detail::write( + d.fh_buf, d.fh); + d.ws.wr_.cont = ! d.fin; + // Send frame + d.step = d.remain == 0 ? + do_upcall : do_mask_frag + 1; + return boost::asio::async_write( + d.ws.stream_, buffer_cat( + d.fh_buf.data(), b), + std::move(*this)); + } + + case do_mask_frag + 1: + BOOST_ASSERT(d.ws.wr_block_ == &d); + d.ws.wr_block_ = nullptr; + d.cb.consume( + bytes_transferred - d.fh_buf.size()); + d.fh_buf.consume(d.fh_buf.size()); + d.fh.op = detail::opcode::cont; + // Allow outgoing control frames to + // be sent in between message frames: + if( d.ws.close_op_.maybe_invoke() || + d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke()) + { + d.step = do_maybe_suspend; + d.ws.get_io_service().post( + std::move(*this)); + return; + } + d.ws.wr_block_ = &d; + goto go_mask_frag; + + //---------------------------------------------------------------------- + + go_deflate: + case do_deflate: + { + BOOST_ASSERT(d.ws.wr_block_ == &d); + auto b = buffer(d.ws.wr_.buf.get(), + d.ws.wr_.buf_size); + auto const more = detail::deflate( + d.ws.pmd_->zo, b, d.cb, d.fin, ec); + d.ws.failed_ = !!ec; + if(d.ws.failed_) + goto upcall; + auto const n = buffer_size(b); + if(n == 0) + { + // The input was consumed, but there + // is no output due to compression + // latency. + BOOST_ASSERT(! d.fin); + BOOST_ASSERT(buffer_size(d.cb) == 0); + + // We can skip the dispatch if the + // asynchronous initiation function is + // not on call stack but its hard to + // figure out so be safe and dispatch. + d.step = do_upcall; + d.ws.get_io_service().post(std::move(*this)); + return; + } + if(d.fh.mask) + { + d.fh.key = d.ws.maskgen_(); + detail::prepared_key key; + detail::prepare_key(key, d.fh.key); + detail::mask_inplace(b, key); + } + d.fh.fin = ! more; + d.fh.len = n; + detail::fh_streambuf fh_buf; + detail::write(fh_buf, d.fh); + d.ws.wr_.cont = ! d.fin; + // Send frame + d.step = more ? + do_deflate + 1 : do_deflate + 2; + boost::asio::async_write(d.ws.stream_, + buffer_cat(fh_buf.data(), b), + std::move(*this)); + return; + } + + case do_deflate + 1: + BOOST_ASSERT(d.ws.wr_block_ == &d); + d.ws.wr_block_ = nullptr; + d.fh.op = detail::opcode::cont; + d.fh.rsv1 = false; + // Allow outgoing control frames to + // be sent in between message frames: + if( d.ws.close_op_.maybe_invoke() || + d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke()) + { + d.step = do_maybe_suspend; + d.ws.get_io_service().post( + std::move(*this)); + return; + } + d.ws.wr_block_ = &d; + goto go_deflate; + + case do_deflate + 2: + BOOST_ASSERT(d.ws.wr_block_ == &d); + if(d.fh.fin && ( + (d.ws.role_ == role_type::client && + d.ws.pmd_config_.client_no_context_takeover) || + (d.ws.role_ == role_type::server && + d.ws.pmd_config_.server_no_context_takeover))) + d.ws.pmd_->zo.reset(); + goto upcall; + + //---------------------------------------------------------------------- + + case do_maybe_suspend: + if(d.ws.wr_block_) + { + // suspend + BOOST_ASSERT(d.ws.wr_block_ != &d); + d.step = do_maybe_suspend + 1; + d.ws.wr_op_.emplace(std::move(*this)); + return; + } + d.ws.wr_block_ = &d; + if(d.ws.failed_ || d.ws.wr_close_) + { + // call handler + return d.ws.get_io_service().post( + bind_handler(std::move(*this), + boost::asio::error::operation_aborted, 0)); + } + d.step = d.entry_state; + goto loop; + + case do_maybe_suspend + 1: + BOOST_ASSERT(! d.ws.wr_block_); + d.ws.wr_block_ = &d; + d.step = do_maybe_suspend + 2; + // The current context is safe but might not be + // the same as the one for this operation (since + // we are being called from a write operation). + // Call post to make sure we are invoked the same + // way as the final handler for this operation. + d.ws.get_io_service().post(bind_handler( + std::move(*this), ec, 0)); + return; + + case do_maybe_suspend + 2: + BOOST_ASSERT(d.ws.wr_block_ == &d); + if(d.ws.failed_ || d.ws.wr_close_) + { + // call handler + ec = boost::asio::error::operation_aborted; goto upcall; } + d.step = d.entry_state; + goto loop; + + //---------------------------------------------------------------------- + + case do_upcall: + goto upcall; } upcall: if(d.ws.wr_block_ == &d) d.ws.wr_block_ = nullptr; - d.ws.rd_op_.maybe_invoke() || + d.ws.close_op_.maybe_invoke() || + d.ws.rd_op_.maybe_invoke() || d.ws.ping_op_.maybe_invoke(); d_.invoke(ec); } @@ -901,7 +856,7 @@ async_write_frame(bool fin, void(error_code)> init{handler}; write_frame_op>{init.completion_handler, - *this, fin, bs}; + *this, fin, bs}({}, 0, false); return init.result.get(); } diff --git a/include/beast/websocket/stream.hpp b/include/beast/websocket/stream.hpp index 4f77114e..4c494890 100644 --- a/include/beast/websocket/stream.hpp +++ b/include/beast/websocket/stream.hpp @@ -150,9 +150,10 @@ class stream op* wr_block_; // op currenly writing ping_data* ping_data_; // where to put the payload - detail::pausation rd_op_; // parked read op - detail::pausation wr_op_; // parked write op - detail::pausation ping_op_; // parked ping op + detail::pausation rd_op_; // paused read op + detail::pausation wr_op_; // paused write op + detail::pausation ping_op_; // paused ping op + detail::pausation close_op_; // paused close op close_reason cr_; // set from received close frame // State information for the message being received