From 91fb1c4b53a961b728bdb5f3e66f5ebf6a0baa9c Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 13 Aug 2017 07:40:47 -0700 Subject: [PATCH] Refactor fail_op --- CHANGELOG.md | 1 + include/boost/beast/websocket/impl/fail.ipp | 213 ++++++++++---------- 2 files changed, 106 insertions(+), 108 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07f98d1d..611be666 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ WebSocket: * Remove obsolete write_op * Refactor write_op * Refactor ping_op +* Refactor fail_op -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/impl/fail.ipp b/include/boost/beast/websocket/impl/fail.ipp index 7b91d1c9..bf447d28 100644 --- a/include/boost/beast/websocket/impl/fail.ipp +++ b/include/boost/beast/websocket/impl/fail.ipp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -29,19 +30,37 @@ namespace boost { namespace beast { namespace websocket { -// _Fail the WebSocket Connection_ -// +/* + This composed operation optionally sends a close frame, + then performs the teardown operation. +*/ template template class stream::fail_op + : public boost::asio::coroutine { - Handler h_; - stream& ws_; - int step_ = 0; - bool dispatched_ = false; - std::uint16_t code_; - error_code ev_; - token tok_; + struct state + { + stream& ws; + detail::frame_streambuf fb; + std::uint16_t code; + error_code ev; + token tok; + + state( + Handler&, + stream& ws_, + std::uint16_t code_, + error_code ev_) + : ws(ws_) + , code(code_) + , ev(ev_) + , tok(ws.t_.unique()) + { + } + }; + + handler_ptr d_; public: fail_op(fail_op&&) = default; @@ -53,21 +72,13 @@ public: stream& ws, std::uint16_t code, error_code ev) - : h_(std::forward(h)) - , ws_(ws) - , code_(code) - , ev_(ev) - , tok_(ws_.t_.unique()) + : d_(std::forward(h), + ws, code, ev) { } - Handler& - handler() - { - return h_; - } - - void operator()(error_code ec = {}, + void operator()( + error_code ec = {}, std::size_t bytes_transferred = 0); friend @@ -76,7 +87,7 @@ public: { using boost::asio::asio_handler_allocate; return asio_handler_allocate( - size, std::addressof(op->h_)); + size, std::addressof(op->d_.handler())); } friend @@ -85,16 +96,15 @@ public: { using boost::asio::asio_handler_deallocate; asio_handler_deallocate( - p, size, std::addressof(op->h_)); + p, size, std::addressof(op->d_.handler())); } friend bool asio_handler_is_continuation(fail_op* op) { using boost::asio::asio_handler_is_continuation; - return op->dispatched_ || - asio_handler_is_continuation( - std::addressof(op->h_)); + return asio_handler_is_continuation( + std::addressof(op->d_.handler())); } template @@ -103,7 +113,7 @@ public: { using boost::asio::asio_handler_invoke; asio_handler_invoke(f, - std::addressof(op->h_)); + std::addressof(op->d_.handler())); } }; @@ -114,81 +124,72 @@ stream:: fail_op:: operator()(error_code ec, std::size_t) { - enum + auto& d = *d_; + BOOST_ASIO_CORO_REENTER(*this) { - do_start = 0, - do_resume = 20, - do_teardown = 40 - }; - switch(step_) - { - case do_start: - // Acquire write block - if(ws_.wr_block_) + // Maybe suspend + if(! d.ws.wr_block_) { - // suspend - BOOST_ASSERT(ws_.wr_block_ != tok_); - step_ = do_resume; - ws_.rd_op_.save(std::move(*this)); - return; + // Acquire the write block + d.ws.wr_block_ = d.tok; + + // Make sure the stream is open + if(d.ws.failed_) + { + BOOST_ASIO_CORO_YIELD + d.ws.get_io_service().post( + bind_handler(std::move(*this), + boost::asio::error::operation_aborted)); + goto upcall; + } } - ws_.wr_block_ = tok_; - goto go_write; - - case do_resume: - BOOST_ASSERT(! ws_.wr_block_); - ws_.wr_block_ = tok_; - step_ = do_resume + 1; - // We were invoked from a foreign context, so post - return ws_.get_io_service().post(std::move(*this)); - - case do_resume + 1: - BOOST_ASSERT(ws_.wr_block_ == tok_); - dispatched_ = true; - go_write: - BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.failed_) + else { - ws_.wr_block_.reset(); - ec = boost::asio::error::operation_aborted; - break; - } - if(code_ == close_code::none) - goto go_teardown; - if(ws_.wr_close_) - goto go_teardown; - // send close frame - step_ = do_teardown; - ws_.rd_.fb.consume(ws_.rd_.fb.size()); - ws_.template write_close< - flat_static_buffer_base>( - ws_.rd_.fb, code_); - ws_.wr_close_ = true; - return boost::asio::async_write( - ws_.stream_, ws_.rd_.fb.data(), - std::move(*this)); + // Suspend + BOOST_ASSERT(d.ws.wr_block_ != d.tok); + BOOST_ASIO_CORO_YIELD + d.ws.rd_op_.emplace(std::move(*this)); // VFALCO emplace to rd_op_ - case do_teardown: - BOOST_ASSERT(ws_.wr_block_ == tok_); - dispatched_ = true; - ws_.failed_ = !!ec; - if(ws_.failed_) - { - ws_.wr_block_.reset(); - break; + // Acquire the write block + BOOST_ASSERT(! d.ws.wr_block_); + d.ws.wr_block_ = d.tok; + + // Resume + BOOST_ASIO_CORO_YIELD + d.ws.get_io_service().post(std::move(*this)); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + + // Make sure the stream is open + if(d.ws.failed_) + { + ec = boost::asio::error::operation_aborted; + goto upcall; + } } - go_teardown: - BOOST_ASSERT(ws_.wr_block_ == tok_); - step_ = do_teardown + 1; + if(d.code != close_code::none && ! d.ws.wr_close_) + { + // Serialize close frame + d.ws.template write_close< + flat_static_buffer_base>( + d.fb, d.code); + // Send close frame + d.ws.wr_close_ = true; + BOOST_ASIO_CORO_YIELD + boost::asio::async_write( + d.ws.stream_, d.fb.data(), + std::move(*this)); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + d.ws.failed_ = !!ec; + if(d.ws.failed_) + goto upcall; + } + // Teardown + BOOST_ASSERT(d.ws.wr_block_ == d.tok); using beast::websocket::async_teardown; - async_teardown(ws_.role_, - ws_.stream_, std::move(*this)); - return; - - case do_teardown + 1: - BOOST_ASSERT(ws_.wr_block_ == tok_); - dispatched_ = true; - ws_.wr_block_.reset(); + BOOST_ASIO_CORO_YIELD + async_teardown(d.ws.role_, + d.ws.stream_, std::move(*this)); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); if(ec == boost::asio::error::eof) { // Rationale: @@ -196,20 +197,16 @@ operator()(error_code ec, std::size_t) ec.assign(0, ec.category()); } if(! ec) - ec = ev_; - ws_.failed_ = true; - break; + ec = d.ev; + d.ws.failed_ = true; + upcall: + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + d.ws.wr_block_.reset(); + d.ws.close_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke() || + d.ws.wr_op_.maybe_invoke(); + d_.invoke(ec, 0); } - // upcall - BOOST_ASSERT(ws_.wr_block_ != tok_); - ws_.close_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke() || - ws_.wr_op_.maybe_invoke(); - if(! dispatched_) - ws_.stream_.get_io_service().post( - bind_handler(std::move(h_), ec, 0)); - else - h_(ec, 0); } //------------------------------------------------------------------------------