From 2b484c0b8b7b005615c9601ca48b9f7f7b4764d2 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sat, 12 Aug 2017 20:50:23 -0700 Subject: [PATCH] Refactor write_op --- CHANGELOG.md | 1 + include/boost/beast/websocket/impl/write.ipp | 777 ++++++++++--------- include/boost/beast/websocket/stream.hpp | 2 + 3 files changed, 406 insertions(+), 374 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d1dbad9..f320e17e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ WebSocket: * websocket test improvements * Remove obsolete write_op +* Refactor write_op -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/impl/write.ipp b/include/boost/beast/websocket/impl/write.ipp index 6a55b557..6c7daa10 100644 --- a/include/boost/beast/websocket/impl/write.ipp +++ b/include/boost/beast/websocket/impl/write.ipp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -36,55 +37,54 @@ namespace websocket { template template class stream::write_some_op + : public boost::asio::coroutine { - struct data : op - { - bool cont; - stream& ws; - consuming_buffers cb; - bool fin; - detail::frame_header fh; - detail::fh_streambuf fh_buf; - detail::prepared_key key; - std::uint64_t remain; - int step = 0; - int entry_state; - token tok; - - data(Handler& handler, stream& ws_, - bool fin_, Buffers const& bs) - : ws(ws_) - , cb(bs) - , fin(fin_) - , tok(ws.t_.unique()) - { - using boost::asio::asio_handler_is_continuation; - cont = asio_handler_is_continuation(std::addressof(handler)); - } - }; - - handler_ptr d_; + Handler h_; + stream& ws_; + consuming_buffers cb_; + detail::frame_header fh_; + detail::prepared_key key_; + std::size_t remain_; + token tok_; + int how_; + bool fin_; + bool more_; public: write_some_op(write_some_op&&) = default; write_some_op(write_some_op const&) = default; - template - write_some_op(DeducedHandler&& h, - stream& ws, Args&&... args) - : d_(std::forward(h), - ws, std::forward(args)...) + template + write_some_op( + DeducedHandler&& h, + stream& ws, + bool fin, + Buffers const& bs) + : h_(std::forward(h)) + , ws_(ws) + , cb_(bs) + , tok_(ws_.t_.unique()) + , fin_(fin) { } - void operator()() + Handler& + handler() { - (*this)({}, 0, true); + return h_; } - void operator()(error_code ec, + void operator()( + error_code ec, std::size_t bytes_transferred, - bool again = true); + bool) + { + (*this)(ec, bytes_transferred); + } + + void operator()( + error_code ec = {}, + std::size_t bytes_transferred = 0); friend void* asio_handler_allocate( @@ -92,7 +92,7 @@ public: { using boost::asio::asio_handler_allocate; return asio_handler_allocate( - size, std::addressof(op->d_.handler())); + size, std::addressof(op->h_)); } friend @@ -101,13 +101,15 @@ public: { using boost::asio::asio_handler_deallocate; asio_handler_deallocate( - p, size, std::addressof(op->d_.handler())); + p, size, std::addressof(op->h_)); } friend bool asio_handler_is_continuation(write_some_op* op) { - return op->d_->cont; + using boost::asio::asio_handler_is_continuation; + return asio_handler_is_continuation( + std::addressof(op->h_)); } template @@ -116,7 +118,7 @@ public: { using boost::asio::asio_handler_invoke; asio_handler_invoke( - f, std::addressof(op->d_.handler())); + f, std::addressof(op->h_)); } }; @@ -126,386 +128,407 @@ void stream:: write_some_op:: operator()(error_code ec, - std::size_t bytes_transferred, bool again) + std::size_t bytes_transferred) { using beast::detail::clamp; using boost::asio::buffer; using boost::asio::buffer_copy; using boost::asio::buffer_size; + using boost::asio::mutable_buffers_1; enum { - do_init = 0, - do_nomask_nofrag = 20, - do_nomask_frag = 30, - do_mask_nofrag = 40, - do_mask_frag = 50, - do_deflate = 60, - do_maybe_suspend = 80, - do_upcall = 99 + do_nomask_nofrag, + do_nomask_frag, + do_mask_nofrag, + do_mask_frag, + do_deflate }; - auto& d = *d_; - d.cont = d.cont || again; - if(ec) + std::size_t n; + boost::asio::mutable_buffer b; + + BOOST_ASIO_CORO_REENTER(*this) { - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.failed_ = true; - goto upcall; - } -loop: - switch(d.step) - { - case do_init: - if(! d.ws.wr_.cont) + // Set up the outgoing frame header + if(! ws_.wr_.cont) { - d.ws.wr_begin(); - d.fh.rsv1 = d.ws.wr_.compress; + ws_.wr_begin(); + fh_.rsv1 = ws_.wr_.compress; } else { - d.fh.rsv1 = false; + fh_.rsv1 = false; } - d.fh.rsv2 = false; - d.fh.rsv3 = false; - d.fh.op = d.ws.wr_.cont ? - detail::opcode::cont : d.ws.wr_opcode_; - d.fh.mask = - d.ws.role_ == role_type::client; + fh_.rsv2 = false; + fh_.rsv3 = false; + fh_.op = ws_.wr_.cont ? + detail::opcode::cont : ws_.wr_opcode_; + fh_.mask = + ws_.role_ == role_type::client; - // entry_state determines which algorithm - // we will use to send. If we suspend, we - // will transition to entry_state + 1 on - // the resume. - if(d.ws.wr_.compress) + // Choose a write algorithm + if(ws_.wr_.compress) { - d.entry_state = do_deflate; + how_ = do_deflate; } - else if(! d.fh.mask) + else if(! fh_.mask) { - if(! d.ws.wr_.autofrag) + if(! ws_.wr_.autofrag) { - d.entry_state = do_nomask_nofrag; + how_ = do_nomask_nofrag; } else { - BOOST_ASSERT(d.ws.wr_.buf_size != 0); - d.remain = buffer_size(d.cb); - if(d.remain > d.ws.wr_.buf_size) - d.entry_state = do_nomask_frag; + BOOST_ASSERT(ws_.wr_.buf_size != 0); + remain_ = buffer_size(cb_); + if(remain_ > ws_.wr_.buf_size) + how_ = do_nomask_frag; else - d.entry_state = do_nomask_nofrag; + how_ = do_nomask_nofrag; } } else { - if(! d.ws.wr_.autofrag) + if(! ws_.wr_.autofrag) { - d.entry_state = do_mask_nofrag; + how_ = do_mask_nofrag; } else { - BOOST_ASSERT(d.ws.wr_.buf_size != 0); - d.remain = buffer_size(d.cb); - if(d.remain > d.ws.wr_.buf_size) - d.entry_state = do_mask_frag; + BOOST_ASSERT(ws_.wr_.buf_size != 0); + remain_ = buffer_size(cb_); + if(remain_ > ws_.wr_.buf_size) + how_ = do_mask_frag; else - d.entry_state = do_mask_nofrag; + how_ = do_mask_nofrag; } } - d.step = do_maybe_suspend; - goto loop; - //---------------------------------------------------------------------- - - case do_nomask_nofrag: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.fh.fin = d.fin; - d.fh.len = buffer_size(d.cb); - detail::write( - d.fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = do_upcall; - return boost::asio::async_write(d.ws.stream_, - buffer_cat(d.fh_buf.data(), d.cb), - std::move(*this)); - - //---------------------------------------------------------------------- - - go_nomask_frag: - case do_nomask_frag: - { - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - auto const n = clamp( - d.remain, d.ws.wr_.buf_size); - d.remain -= n; - d.fh.len = n; - d.fh.fin = d.fin ? d.remain == 0 : false; - detail::write( - d.fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = d.remain == 0 ? - do_upcall : do_nomask_frag + 1; - return boost::asio::async_write( - d.ws.stream_, buffer_cat( - d.fh_buf.data(), buffer_prefix( - n, d.cb)), std::move(*this)); - } - - case do_nomask_frag + 1: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.wr_block_.reset(); - d.cb.consume( - bytes_transferred - d.fh_buf.size()); - d.fh_buf.consume(d.fh_buf.size()); - d.fh.op = detail::opcode::cont; - // Allow outgoing control frames to - // be sent in between message frames - if( d.ws.close_op_.maybe_invoke() || - d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke()) + do_maybe_suspend: + // Maybe suspend + if(! ws_.wr_block_) { - d.step = do_maybe_suspend; - return d.ws.get_io_service().post( - std::move(*this)); + // Acquire the write block + ws_.wr_block_ = tok_; + + // Make sure the stream is open + if(ws_.failed_) + { + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + bind_handler(std::move(*this), + boost::asio::error::operation_aborted)); + goto upcall; + } } - d.ws.wr_block_ = d.tok; - goto go_nomask_frag; + else + { + // Suspend + BOOST_ASSERT(ws_.wr_block_ != tok_); + BOOST_ASIO_CORO_YIELD + ws_.wr_op_.save(std::move(*this)); - //---------------------------------------------------------------------- + // Acquire the write block + BOOST_ASSERT(! ws_.wr_block_); + ws_.wr_block_ = tok_; - case do_mask_nofrag: - { - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.remain = buffer_size(d.cb); - d.fh.fin = d.fin; - d.fh.len = d.remain; - d.fh.key = d.ws.maskgen_(); - detail::prepare_key(d.key, d.fh.key); - detail::write( - d.fh_buf, d.fh); - auto const n = - clamp(d.remain, d.ws.wr_.buf_size); - auto const b = - buffer(d.ws.wr_.buf.get(), n); - buffer_copy(b, d.cb); - detail::mask_inplace(b, d.key); - d.remain -= n; - d.ws.wr_.cont = ! d.fin; - // Send frame header and partial payload - d.step = d.remain == 0 ? - do_upcall : do_mask_nofrag + 1; - return boost::asio::async_write( - d.ws.stream_, buffer_cat(d.fh_buf.data(), - b), std::move(*this)); - } + // Resume + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post(std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); - case do_mask_nofrag + 1: - { - d.cb.consume(d.ws.wr_.buf_size); - auto const n = - clamp(d.remain, d.ws.wr_.buf_size); - auto const b = - buffer(d.ws.wr_.buf.get(), n); - buffer_copy(b, d.cb); - detail::mask_inplace(b, d.key); - d.remain -= n; - // Send partial payload - if(d.remain == 0) - d.step = do_upcall; - return boost::asio::async_write( - d.ws.stream_, b, std::move(*this)); - } + // Make sure the stream is open + if(ws_.failed_) + { + ec = boost::asio::error::operation_aborted; + goto upcall; + } + } - //---------------------------------------------------------------------- + //------------------------------------------------------------------ - go_mask_frag: - case do_mask_frag: - { - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - auto const n = clamp( - d.remain, d.ws.wr_.buf_size); - d.remain -= n; - d.fh.len = n; - d.fh.key = d.ws.maskgen_(); - d.fh.fin = d.fin ? d.remain == 0 : false; - detail::prepare_key(d.key, d.fh.key); - auto const b = buffer( - d.ws.wr_.buf.get(), n); - buffer_copy(b, d.cb); - detail::mask_inplace(b, d.key); - detail::write( - d.fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = d.remain == 0 ? - do_upcall : do_mask_frag + 1; - return boost::asio::async_write( - d.ws.stream_, buffer_cat( - d.fh_buf.data(), b), + if(how_ == do_nomask_nofrag) + { + fh_.fin = fin_; + fh_.len = buffer_size(cb_); + ws_.wr_.fb.reset(); + detail::write( + ws_.wr_.fb, fh_); + ws_.wr_.cont = ! fin_; + // Send frame + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write(ws_.stream_, + buffer_cat(ws_.wr_.fb.data(), cb_), std::move(*this)); - } - - case do_mask_frag + 1: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.wr_block_.reset(); - d.cb.consume( - bytes_transferred - d.fh_buf.size()); - d.fh_buf.consume(d.fh_buf.size()); - d.fh.op = detail::opcode::cont; - // Allow outgoing control frames to - // be sent in between message frames: - if( d.ws.close_op_.maybe_invoke() || - d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke()) - { - d.step = do_maybe_suspend; - d.ws.get_io_service().post( - std::move(*this)); - return; - } - d.ws.wr_block_ = d.tok; - goto go_mask_frag; - - //---------------------------------------------------------------------- - - go_deflate: - case do_deflate: - { - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - auto b = buffer(d.ws.wr_.buf.get(), - d.ws.wr_.buf_size); - auto const more = detail::deflate( - d.ws.pmd_->zo, b, d.cb, d.fin, ec); - d.ws.failed_ = !!ec; - if(d.ws.failed_) - goto upcall; - auto const n = buffer_size(b); - if(n == 0) - { - // The input was consumed, but there - // is no output due to compression - // latency. - BOOST_ASSERT(! d.fin); - BOOST_ASSERT(buffer_size(d.cb) == 0); - - // We can skip the dispatch if the - // asynchronous initiation function is - // not on call stack but its hard to - // figure out so be safe and dispatch. - d.step = do_upcall; - d.ws.get_io_service().post(std::move(*this)); - return; - } - if(d.fh.mask) - { - d.fh.key = d.ws.maskgen_(); - detail::prepared_key key; - detail::prepare_key(key, d.fh.key); - detail::mask_inplace(b, key); - } - d.fh.fin = ! more; - d.fh.len = n; - detail::write< - flat_static_buffer_base>(d.fh_buf, d.fh); - d.ws.wr_.cont = ! d.fin; - // Send frame - d.step = more ? - do_deflate + 1 : do_deflate + 2; - boost::asio::async_write(d.ws.stream_, - buffer_cat(d.fh_buf.data(), b), - std::move(*this)); - return; - } - - case do_deflate + 1: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.fh_buf.consume(d.fh_buf.size()); - d.ws.wr_block_.reset(); - d.fh.op = detail::opcode::cont; - d.fh.rsv1 = false; - // Allow outgoing control frames to - // be sent in between message frames: - if( d.ws.close_op_.maybe_invoke() || - d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke()) - { - d.step = do_maybe_suspend; - d.ws.get_io_service().post( - std::move(*this)); - return; - } - d.ws.wr_block_ = d.tok; - goto go_deflate; - - case do_deflate + 2: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - if(d.fh.fin && ( - (d.ws.role_ == role_type::client && - d.ws.pmd_config_.client_no_context_takeover) || - (d.ws.role_ == role_type::server && - d.ws.pmd_config_.server_no_context_takeover))) - d.ws.pmd_->zo.reset(); - goto upcall; - - //---------------------------------------------------------------------- - - case do_maybe_suspend: - if(d.ws.wr_block_) - { - // suspend - BOOST_ASSERT(d.ws.wr_block_ != d.tok); - d.step = do_maybe_suspend + 1; - d.ws.wr_op_.emplace(std::move(*this)); - return; - } - d.ws.wr_block_ = d.tok; - if(d.ws.failed_ || d.ws.wr_close_) - { - // call handler - return d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted, 0)); - } - d.step = d.entry_state; - goto loop; - - case do_maybe_suspend + 1: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = d.tok; - d.step = do_maybe_suspend + 2; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - d.ws.get_io_service().post(bind_handler( - std::move(*this), ec, 0)); - return; - - case do_maybe_suspend + 2: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - if(d.ws.failed_ || d.ws.wr_close_) - { - // call handler - ec = boost::asio::error::operation_aborted; + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ec) + ws_.failed_ = true; goto upcall; } - d.step = d.entry_state; - goto loop; - //---------------------------------------------------------------------- + //------------------------------------------------------------------ - case do_upcall: - goto upcall; + else if(how_ == do_nomask_frag) + { + for(;;) + { + fh_.len = clamp(remain_, ws_.wr_.buf_size); + remain_ -= clamp(fh_.len); + fh_.fin = fin_ ? remain_ == 0 : false; + ws_.wr_.fb.reset(); + detail::write( + ws_.wr_.fb, fh_); + ws_.wr_.cont = ! fin_; + // Send frame + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write( + ws_.stream_, buffer_cat( + ws_.wr_.fb.data(), buffer_prefix( + clamp(fh_.len), cb_)), + std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ec) + { + ws_.failed_ = true; + goto upcall; + } + if(remain_ == 0) + goto upcall; + cb_.consume( + bytes_transferred - ws_.wr_.fb.size()); + fh_.op = detail::opcode::cont; + // Allow outgoing control frames to + // be sent in between message frames + ws_.wr_block_.reset(); + if( ws_.close_op_.maybe_invoke() || + ws_.rd_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke()) + { + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + std::move(*this)); + goto do_maybe_suspend; + } + ws_.wr_block_ = tok_; + } + } + + //------------------------------------------------------------------ + + else if(how_ == do_mask_nofrag) + { + remain_ = buffer_size(cb_); + fh_.fin = fin_; + fh_.len = remain_; + fh_.key = ws_.maskgen_(); + detail::prepare_key(key_, fh_.key); + ws_.wr_.fb.reset(); + detail::write( + ws_.wr_.fb, fh_); + n = clamp(remain_, ws_.wr_.buf_size); + buffer_copy(buffer( + ws_.wr_.buf.get(), n), cb_); + detail::mask_inplace(buffer( + ws_.wr_.buf.get(), n), key_); + remain_ -= n; + ws_.wr_.cont = ! fin_; + // Send frame header and partial payload + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write( + ws_.stream_, buffer_cat(ws_.wr_.fb.data(), + buffer(ws_.wr_.buf.get(), n)), + std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ec) + { + ws_.failed_ = true; + goto upcall; + } + while(remain_ > 0) + { + cb_.consume(ws_.wr_.buf_size); + n = clamp(remain_, ws_.wr_.buf_size); + buffer_copy(buffer( + ws_.wr_.buf.get(), n), cb_); + detail::mask_inplace(buffer( + ws_.wr_.buf.get(), n), key_); + remain_ -= n; + // Send partial payload + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write(ws_.stream_, + buffer(ws_.wr_.buf.get(), n), + std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ec) + { + ws_.failed_ = true; + goto upcall; + } + } + goto upcall; + } + + //------------------------------------------------------------------ + + else if(how_ == do_mask_frag) + { + for(;;) + { + n = clamp(remain_, ws_.wr_.buf_size); + remain_ -= n; + fh_.len = n; + fh_.key = ws_.maskgen_(); + fh_.fin = fin_ ? remain_ == 0 : false; + detail::prepare_key(key_, fh_.key); + buffer_copy(buffer( + ws_.wr_.buf.get(), n), cb_); + detail::mask_inplace(buffer( + ws_.wr_.buf.get(), n), key_); + ws_.wr_.fb.reset(); + detail::write( + ws_.wr_.fb, fh_); + ws_.wr_.cont = ! fin_; + // Send frame + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write(ws_.stream_, + buffer_cat(ws_.wr_.fb.data(), + buffer(ws_.wr_.buf.get(), n)), + std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ec) + { + ws_.failed_ = true; + goto upcall; + } + if(remain_ == 0) + goto upcall; + cb_.consume( + bytes_transferred - ws_.wr_.fb.size()); + fh_.op = detail::opcode::cont; + // Allow outgoing control frames to + // be sent in between message frames: + ws_.wr_block_.reset(); + if( ws_.close_op_.maybe_invoke() || + ws_.rd_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke()) + { + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + std::move(*this)); + goto do_maybe_suspend; + } + ws_.wr_block_ = tok_; + } + } + + //------------------------------------------------------------------ + + else if(how_ == do_deflate) + { + for(;;) + { + b = buffer(ws_.wr_.buf.get(), + ws_.wr_.buf_size); + more_ = detail::deflate( + ws_.pmd_->zo, b, cb_, fin_, ec); + ws_.failed_ = !!ec; + if(ws_.failed_) + { + // Always dispatching is easiest + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + bind_handler(std::move(*this), ec)); + goto upcall; + } + n = buffer_size(b); + if(n == 0) + { + // The input was consumed, but there + // is no output due to compression + // latency. + BOOST_ASSERT(! fin_); + BOOST_ASSERT(buffer_size(cb_) == 0); + + // We can skip the dispatch if the + // asynchronous initiation function is + // not on call stack but its hard to + // figure out so be safe and dispatch. + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + std::move(*this)); + goto upcall; + } + if(fh_.mask) + { + fh_.key = ws_.maskgen_(); + detail::prepared_key key; + detail::prepare_key(key, fh_.key); + detail::mask_inplace(b, key); + } + fh_.fin = ! more_; + fh_.len = n; + ws_.wr_.fb.reset(); + detail::write< + flat_static_buffer_base>(ws_.wr_.fb, fh_); + ws_.wr_.cont = ! fin_; + // Send frame + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write(ws_.stream_, + buffer_cat(ws_.wr_.fb.data(), + mutable_buffers_1{b}), std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ec) + { + ws_.failed_ = true; + goto upcall; + } + if(more_) + { + fh_.op = detail::opcode::cont; + fh_.rsv1 = false; + // Allow outgoing control frames to + // be sent in between message frames: + ws_.wr_block_.reset(); + if( ws_.close_op_.maybe_invoke() || + ws_.rd_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke()) + { + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + std::move(*this)); + goto do_maybe_suspend; + } + ws_.wr_block_ = tok_; + } + else + { + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(fh_.fin && ( + (ws_.role_ == role_type::client && + ws_.pmd_config_.client_no_context_takeover) || + (ws_.role_ == role_type::server && + ws_.pmd_config_.server_no_context_takeover))) + ws_.pmd_->zo.reset(); + goto upcall; + } + } + } + + //-------------------------------------------------------------------------- + + upcall: + if(ws_.wr_block_ == tok_) + ws_.wr_block_.reset(); + ws_.close_op_.maybe_invoke() || + ws_.rd_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke(); + h_(ec); } -upcall: - if(d.ws.wr_block_ == d.tok) - d.ws.wr_block_.reset(); - d.ws.close_op_.maybe_invoke() || - d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke(); - d_.invoke(ec); } //------------------------------------------------------------------------------ @@ -543,6 +566,12 @@ write_some(bool fin, using boost::asio::buffer; using boost::asio::buffer_copy; using boost::asio::buffer_size; + // Make sure the stream is open + if(failed_) + { + ec = boost::asio::error::operation_aborted; + return; + } detail::frame_header fh; if(! wr_.cont) { @@ -750,7 +779,7 @@ async_write_some(bool fin, void(error_code)> init{handler}; write_some_op>{init.completion_handler, - *this, fin, bs}({}, 0, false); + *this, fin, bs}(); return init.result.get(); } @@ -804,7 +833,7 @@ async_write( void(error_code)> init{handler}; write_some_op>{init.completion_handler, - *this, true, bs}({}, 0, false); + *this, true, bs}(); return init.result.get(); } diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index 0c8ae6a9..87b31eb0 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -200,6 +200,8 @@ class stream // The buffer is allocated or reallocated at the beginning of // sending a message. std::unique_ptr buf; + + detail::fh_streambuf fb; }; // State information for the permessage-deflate extension