diff --git a/CHANGELOG.md b/CHANGELOG.md index 17deee17..efac9c57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ WebSocket: * Refactor read_op * Refactor close_op * Refactor read_op + fail_op +* Websocket close will automatically drain -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index da751d6c..47f49e46 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -10,6 +10,7 @@ #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP +#include #include #include #include @@ -39,9 +40,26 @@ template class stream::close_op : public boost::asio::coroutine { - Handler h_; - stream& ws_; - token tok_; + struct state + { + stream& ws; + token tok; + detail::frame_streambuf fb; + + state( + Handler&, + stream& ws_, + close_reason const& cr) + : ws(ws_) + , tok(ws.t_.unique()) + { + // Serialize the close frame + ws.template write_close< + flat_static_buffer_base>(fb, cr); + } + }; + + handler_ptr d_; public: close_op(close_op&&) = default; @@ -52,20 +70,8 @@ public: DeducedHandler&& h, stream& ws, close_reason const& cr) - : h_(std::forward(h)) - , ws_(ws) + : d_(std::forward(h), ws, cr) { - // Serialize the close frame - ws_.wr_.fb.reset(); - ws_.template write_close< - flat_static_buffer_base>( - ws_.wr_.fb, cr); - } - - Handler& - handler() - { - return h_; } void @@ -79,7 +85,7 @@ public: { using boost::asio::asio_handler_allocate; return asio_handler_allocate( - size, std::addressof(op->h_)); + size, std::addressof(op->d_.handler())); } friend @@ -88,7 +94,7 @@ public: { using boost::asio::asio_handler_deallocate; asio_handler_deallocate( - p, size, std::addressof(op->h_)); + p, size, std::addressof(op->d_.handler())); } friend @@ -96,7 +102,7 @@ public: { using boost::asio::asio_handler_is_continuation; return asio_handler_is_continuation( - std::addressof(op->h_)); + std::addressof(op->d_.handler())); } template @@ -105,7 +111,7 @@ public: { using boost::asio::asio_handler_invoke; asio_handler_invoke( - f, std::addressof(op->h_)); + f, std::addressof(op->d_.handler())); } }; @@ -113,21 +119,24 @@ template template void stream::close_op:: -operator()(error_code ec, std::size_t) +operator()(error_code ec, std::size_t bytes_transferred) { + using beast::detail::clamp; + auto& d = *d_; + close_code code{}; BOOST_ASIO_CORO_REENTER(*this) { // Maybe suspend - if(! ws_.wr_block_) + if(! d.ws.wr_block_) { // Acquire the write block - ws_.wr_block_ = tok_; + d.ws.wr_block_ = d.tok; // Make sure the stream is open - if(ws_.failed_) + if(d.ws.failed_) { BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( + d.ws.get_io_service().post( bind_handler(std::move(*this), boost::asio::error::operation_aborted)); goto upcall; @@ -136,21 +145,21 @@ operator()(error_code ec, std::size_t) else { // Suspend - BOOST_ASSERT(ws_.wr_block_ != tok_); + BOOST_ASSERT(d.ws.wr_block_ != d.tok); BOOST_ASIO_CORO_YIELD - ws_.close_op_.save(std::move(*this)); + d.ws.close_op_.emplace(std::move(*this)); // Acquire the write block - BOOST_ASSERT(! ws_.wr_block_); - ws_.wr_block_ = tok_; + BOOST_ASSERT(! d.ws.wr_block_); + d.ws.wr_block_ = d.tok; // Resume BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post(std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); + d.ws.get_io_service().post(std::move(*this)); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); // Make sure the stream is open - if(ws_.failed_) + if(d.ws.failed_) { ec = boost::asio::error::operation_aborted; goto upcall; @@ -158,19 +167,158 @@ operator()(error_code ec, std::size_t) } // Send close frame + BOOST_ASSERT(! d.ws.wr_close_); + d.ws.wr_close_ = true; BOOST_ASIO_CORO_YIELD - boost::asio::async_write(ws_.stream_, - ws_.wr_.fb.data(), std::move(*this)); + boost::asio::async_write(d.ws.stream_, + d.fb.data(), std::move(*this)); if(ec) - ws_.failed_ = true; + { + d.ws.failed_ = true; + goto upcall; + } + + if(d.ws.rd_close_) + { + // This happens when the read_op gets a close frame + // at the same time we are sending the close frame. The + // read_op will be suspended on the write block. + goto teardown; + } + + if(! d.ws.rd_block_) + { + // Acquire the read block + d.ws.rd_block_ = d.tok; + } + else + { + // The read_op is currently running so it will see + // the close frame and call teardown. We will suspend + // to cause async_read to return error::closed, before + // we return error::success. + + // Suspend + BOOST_ASSERT(d.ws.rd_block_ != d.tok); + BOOST_ASIO_CORO_YIELD + d.ws.rd_op_.emplace(std::move(*this)); + + // Acquire the read block + BOOST_ASSERT(! d.ws.rd_block_); + d.ws.rd_block_ = d.tok; + + // Resume + BOOST_ASIO_CORO_YIELD + d.ws.get_io_service().post(std::move(*this)); + BOOST_ASSERT(d.ws.rd_block_ == d.tok); + + // Handle the stream closing while suspended + if(d.ws.failed_) + { + ec = boost::asio::error::operation_aborted; + goto upcall; + } + } + + // Drain + if(d.ws.rd_.remain > 0) + goto read_payload; + for(;;) + { + // Read frame header + while(! d.ws.parse_fh( + d.ws.rd_.fh, d.ws.rd_.buf, code)) + { + if(code != close_code::none) + break; + BOOST_ASIO_CORO_YIELD + d.ws.stream_.async_read_some( + d.ws.rd_.buf.prepare(read_size(d.ws.rd_.buf, + d.ws.rd_.buf.max_size())), + std::move(*this)); + if(ec) + { + d.ws.failed_ = true; + goto upcall; + } + d.ws.rd_.buf.commit(bytes_transferred); + } + if(detail::is_control(d.ws.rd_.fh.op)) + { + // Process control frame + if(d.ws.rd_.fh.op == detail::opcode::close) + { + BOOST_ASSERT(! d.ws.rd_close_); + d.ws.rd_close_ = true; + auto const mb = buffer_prefix( + clamp(d.ws.rd_.fh.len), + d.ws.rd_.buf.data()); + if(d.ws.rd_.fh.len > 0 && d.ws.rd_.fh.mask) + detail::mask_inplace(mb, d.ws.rd_.key); + detail::read_close(d.ws.cr_, mb, code); + if(code != close_code::none) + { + // Protocol error + goto upcall; + } + d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len)); + break; + } + d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len)); + } + else + { + read_payload: + while(d.ws.rd_.buf.size() < d.ws.rd_.remain) + { + d.ws.rd_.remain -= d.ws.rd_.buf.size(); + d.ws.rd_.buf.consume(d.ws.rd_.buf.size()); + BOOST_ASIO_CORO_YIELD + d.ws.stream_.async_read_some( + d.ws.rd_.buf.prepare(read_size(d.ws.rd_.buf, + d.ws.rd_.buf.max_size())), + std::move(*this)); + if(ec) + { + d.ws.failed_ = true; + goto upcall; + } + d.ws.rd_.buf.commit(bytes_transferred); + } + BOOST_ASSERT(d.ws.rd_.buf.size() >= d.ws.rd_.remain); + d.ws.rd_.buf.consume(clamp(d.ws.rd_.remain)); + d.ws.rd_.remain = 0; + } + } + + teardown: + // Teardown + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + using beast::websocket::async_teardown; + BOOST_ASIO_CORO_YIELD + async_teardown(d.ws.role_, + d.ws.stream_, std::move(*this)); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + if(ec == boost::asio::error::eof) + { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec.assign(0, ec.category()); + } + d.ws.failed_ = true; upcall: - BOOST_ASSERT(ws_.wr_block_ == tok_); - ws_.wr_block_.reset(); - ws_.rd_op_.maybe_invoke() || - ws_.ping_op_.maybe_invoke() || - ws_.wr_op_.maybe_invoke(); - h_(ec); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + d.ws.wr_block_.reset(); + if(d.ws.rd_block_) + { + BOOST_ASSERT(d.ws.rd_block_ = d.tok); + d.ws.r_rd_op_.maybe_invoke(); + } + d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke() || + d.ws.wr_op_.maybe_invoke(); + d_.invoke(ec); } } @@ -205,13 +353,7 @@ close(close_reason const& cr, error_code& ec) } // If rd_close_ is set then we already sent a close BOOST_ASSERT(! rd_close_); - if(wr_close_) - { - // Can't call close twice, abort operation - BOOST_ASSERT(! wr_close_); - ec = boost::asio::error::operation_aborted; - return; - } + BOOST_ASSERT(! wr_close_); wr_close_ = true; { detail::frame_streambuf fb; diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index 7f7c5c69..dec46d89 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -560,6 +560,7 @@ operator()( upcall: BOOST_ASSERT(ws_.rd_block_ == tok_); ws_.rd_block_.reset(); + ws_.r_close_op_.maybe_invoke(); ws_.close_op_.maybe_invoke() || ws_.ping_op_.maybe_invoke() || ws_.wr_op_.maybe_invoke();