From 36521377184f87423b96fbdfcb783df41516fff5 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 13 Aug 2017 09:45:49 -0700 Subject: [PATCH] Refactor close_op --- CHANGELOG.md | 1 + include/boost/beast/websocket/impl/close.ipp | 196 +++++++++---------- 2 files changed, 98 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3010924..269e10dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ WebSocket: * Refactor ping_op * Refactor fail_op * Refactor read_op +* Refactor close_op -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index e18d489c..a40d30fd 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -26,47 +27,50 @@ namespace websocket { //------------------------------------------------------------------------------ -// send the close message and wait for the response -// +/* Close the WebSocket Connection + + This composed operation sends the close frame if it hasn't already + been sent, then reads and discards frames until receiving a close + frame. Finally it invokes the teardown operation to shut down the + underlying connection. +*/ template template class stream::close_op + : public boost::asio::coroutine { - struct data : op - { - stream& ws; - close_reason cr; - detail::frame_streambuf fb; - int state = 0; - token tok; - - data(Handler&, stream& ws_, - close_reason const& cr_) - : ws(ws_) - , cr(cr_) - , tok(ws.t_.unique()) - { - ws.template write_close< - flat_static_buffer_base>(fb, cr); - } - }; - - handler_ptr d_; + Handler h_; + stream& ws_; + token tok_; public: close_op(close_op&&) = default; close_op(close_op const&) = default; - template - close_op(DeducedHandler&& h, - stream& ws, Args&&... args) - : d_(std::forward(h), - ws, std::forward(args)...) + template + close_op( + DeducedHandler&& h, + stream& ws, + close_reason const& cr) + : h_(std::forward(h)) + , ws_(ws) { + // Serialize the close frame + ws_.wr_.fb.reset(); + ws_.template write_close< + flat_static_buffer_base>( + ws_.wr_.fb, cr); + } + + Handler& + handler() + { + return h_; } void - operator()(error_code ec = {}, + operator()( + error_code ec = {}, std::size_t bytes_transferred = 0); friend @@ -75,7 +79,7 @@ public: { using boost::asio::asio_handler_allocate; return asio_handler_allocate( - size, std::addressof(op->d_.handler())); + size, std::addressof(op->h_)); } friend @@ -84,7 +88,7 @@ public: { using boost::asio::asio_handler_deallocate; asio_handler_deallocate( - p, size, std::addressof(op->d_.handler())); + p, size, std::addressof(op->h_)); } friend @@ -92,7 +96,7 @@ public: { using boost::asio::asio_handler_is_continuation; return asio_handler_is_continuation( - std::addressof(op->d_.handler())); + std::addressof(op->h_)); } template @@ -101,7 +105,7 @@ public: { using boost::asio::asio_handler_invoke; asio_handler_invoke( - f, std::addressof(op->d_.handler())); + f, std::addressof(op->h_)); } }; @@ -111,75 +115,63 @@ void stream::close_op:: operator()(error_code ec, std::size_t) { - auto& d = *d_; - if(ec) + BOOST_ASIO_CORO_REENTER(*this) { - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.failed_ = true; - goto upcall; + // Maybe suspend + if(! ws_.wr_block_) + { + // Acquire the write block + ws_.wr_block_ = tok_; + + // Make sure the stream is open + if(ws_.failed_) + { + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post( + bind_handler(std::move(*this), + boost::asio::error::operation_aborted)); + goto upcall; + } + } + else + { + // Suspend + BOOST_ASSERT(ws_.wr_block_ != tok_); + BOOST_ASIO_CORO_YIELD + ws_.close_op_.save(std::move(*this)); + + // Acquire the write block + BOOST_ASSERT(! ws_.wr_block_); + ws_.wr_block_ = tok_; + + // Resume + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post(std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + + // Make sure the stream is open + if(ws_.failed_) + { + ec = boost::asio::error::operation_aborted; + goto upcall; + } + } + + // Send close frame + BOOST_ASIO_CORO_YIELD + boost::asio::async_write(ws_.stream_, + ws_.wr_.fb.data(), std::move(*this)); + if(ec) + ws_.failed_ = true; + + upcall: + BOOST_ASSERT(ws_.wr_block_ == tok_); + ws_.wr_block_.reset(); + ws_.rd_op_.maybe_invoke() || + ws_.ping_op_.maybe_invoke() || + ws_.wr_op_.maybe_invoke(); + h_(ec); } - switch(d.state) - { - case 0: - if(d.ws.wr_block_) - { - // suspend - d.state = 1; - d.ws.close_op_.emplace(std::move(*this)); - return; - } - d.ws.wr_block_ = d.tok; - if(d.ws.failed_ || d.ws.wr_close_) - { - // call handler - d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); - return; - } - - do_write: - // send close frame - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.state = 3; - d.ws.wr_close_ = true; - boost::asio::async_write(d.ws.stream_, - d.fb.data(), std::move(*this)); - return; - - case 1: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = d.tok; - d.state = 2; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - d.ws.get_io_service().post( - bind_handler(std::move(*this), ec)); - return; - - case 2: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - if(d.ws.failed_ || d.ws.wr_close_) - { - // call handler - ec = boost::asio::error::operation_aborted; - goto upcall; - } - goto do_write; - - case 3: - break; - } -upcall: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.wr_block_.reset(); - d.ws.rd_op_.maybe_invoke() || - d.ws.ping_op_.maybe_invoke() || - d.ws.wr_op_.maybe_invoke(); - d_.invoke(ec); } //------------------------------------------------------------------------------ @@ -205,6 +197,12 @@ close(close_reason const& cr, error_code& ec) static_assert(is_sync_stream::value, "SyncStream requirements not met"); using beast::detail::clamp; + // Make sure the stream is open + if(failed_) + { + ec = boost::asio::error::operation_aborted; + return; + } // If rd_close_ is set then we already sent a close BOOST_ASSERT(! rd_close_); if(wr_close_)