diff --git a/CHANGELOG.md b/CHANGELOG.md index f320e17e..07f98d1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ WebSocket: * websocket test improvements * Remove obsolete write_op * Refactor write_op +* Refactor ping_op -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/detail/pausation.hpp b/include/boost/beast/websocket/detail/pausation.hpp index 8d73cdca..3fe62ed4 100644 --- a/include/boost/beast/websocket/detail/pausation.hpp +++ b/include/boost/beast/websocket/detail/pausation.hpp @@ -11,6 +11,7 @@ #define BOOST_BEAST_WEBSOCKET_DETAIL_PAUSATION_HPP #include +#include #include #include #include @@ -68,7 +69,7 @@ class pausation } }; - struct exemplar + struct exemplar : boost::asio::coroutine { struct H { diff --git a/include/boost/beast/websocket/impl/ping.ipp b/include/boost/beast/websocket/impl/ping.ipp index b4244dcc..a4880bc9 100644 --- a/include/boost/beast/websocket/impl/ping.ipp +++ b/include/boost/beast/websocket/impl/ping.ipp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -25,53 +26,56 @@ namespace boost { namespace beast { namespace websocket { -//------------------------------------------------------------------------------ - -// write a ping frame -// +/* + This composed operation handles sending ping and pong frames. + It only sends the frames it does not make attempts to read + any frame data. +*/ template template class stream::ping_op + : public boost::asio::coroutine { - struct data : op + struct state { stream& ws; detail::frame_streambuf fb; - int state = 0; token tok; - data(Handler&, stream& ws_, - detail::opcode op_, ping_data const& payload) + state( + Handler&, + stream& ws_, + detail::opcode op, + ping_data const& payload) : ws(ws_) , tok(ws.t_.unique()) { - using boost::asio::buffer; - using boost::asio::buffer_copy; + // Serialize the control frame ws.template write_ping< - flat_static_buffer_base>(fb, op_, payload); + flat_static_buffer_base>( + fb, op, payload); } }; - handler_ptr d_; + handler_ptr d_; public: ping_op(ping_op&&) = default; ping_op(ping_op const&) = default; - template - ping_op(DeducedHandler&& h, - stream& ws, Args&&... args) + template + ping_op( + DeducedHandler&& h, + stream& ws, + detail::opcode op, + ping_data const& payload) : d_(std::forward(h), - ws, std::forward(args)...) + ws, op, payload) { } - void operator()() - { - (*this)({}); - } - - void operator()(error_code ec, + void operator()( + error_code ec = {}, std::size_t bytes_transferred = 0); friend @@ -118,109 +122,66 @@ ping_op:: operator()(error_code ec, std::size_t) { auto& d = *d_; - if(ec) + BOOST_ASIO_CORO_REENTER(*this) { - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.failed_ = true; - goto upcall; - } - switch(d.state) - { - case 0: - if(d.ws.wr_block_) + // Maybe suspend + if(! d.ws.wr_block_) { - // suspend - d.state = 1; - d.ws.ping_op_.emplace(std::move(*this)); - return; + // Acquire the write block + d.ws.wr_block_ = d.tok; + + // Make sure the stream is open + if(d.ws.failed_) + { + BOOST_ASIO_CORO_YIELD + d.ws.get_io_service().post( + bind_handler(std::move(*this), + boost::asio::error::operation_aborted)); + goto upcall; + } } - d.ws.wr_block_ = d.tok; - if(d.ws.failed_ || d.ws.wr_close_) + else { - // call handler - return d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); + // Suspend + BOOST_ASSERT(d.ws.wr_block_ != d.tok); + BOOST_ASIO_CORO_YIELD + d.ws.ping_op_.emplace(std::move(*this)); + + // Acquire the write block + BOOST_ASSERT(! d.ws.wr_block_); + d.ws.wr_block_ = d.tok; + + // Resume + BOOST_ASIO_CORO_YIELD + d.ws.get_io_service().post(std::move(*this)); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + + // Make sure the stream is open + if(d.ws.failed_) + { + ec = boost::asio::error::operation_aborted; + goto upcall; + } } - do_write: - // send ping frame - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.state = 3; + // Send ping frame + BOOST_ASIO_CORO_YIELD boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); - return; + if(ec) + d.ws.failed_ = true; - case 1: - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = d.tok; - d.state = 2; - // The current context is safe but might not be - // the same as the one for this operation (since - // we are being called from a write operation). - // Call post to make sure we are invoked the same - // way as the final handler for this operation. - d.ws.get_io_service().post( - bind_handler(std::move(*this), ec)); - return; - - case 2: + upcall: BOOST_ASSERT(d.ws.wr_block_ == d.tok); - if(d.ws.failed_ || d.ws.wr_close_) - { - // call handler - ec = boost::asio::error::operation_aborted; - goto upcall; - } - goto do_write; - - case 3: - break; + d.ws.wr_block_.reset(); + d.ws.close_op_.maybe_invoke() || + d.ws.rd_op_.maybe_invoke() || + d.ws.wr_op_.maybe_invoke(); + d_.invoke(ec); } -upcall: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.wr_block_.reset(); - d.ws.close_op_.maybe_invoke() || - d.ws.rd_op_.maybe_invoke() || - d.ws.wr_op_.maybe_invoke(); - d_.invoke(ec); } -template -template -async_return_type< - WriteHandler, void(error_code)> -stream:: -async_ping(ping_data const& payload, WriteHandler&& handler) -{ - static_assert(is_async_stream::value, - "AsyncStream requirements requirements not met"); - async_completion init{handler}; - ping_op>{ - init.completion_handler, *this, - detail::opcode::ping, payload}({}); - return init.result.get(); -} - -template -template -async_return_type< - WriteHandler, void(error_code)> -stream:: -async_pong(ping_data const& payload, WriteHandler&& handler) -{ - static_assert(is_async_stream::value, - "AsyncStream requirements requirements not met"); - async_completion init{handler}; - ping_op>{ - init.completion_handler, *this, - detail::opcode::pong, payload}({}); - return init.result.get(); -} +//------------------------------------------------------------------------------ template void @@ -238,10 +199,16 @@ void stream:: ping(ping_data const& payload, error_code& ec) { - detail::frame_streambuf db; + // Make sure the stream is open + if(failed_) + { + ec = boost::asio::error::operation_aborted; + return; + } + detail::frame_streambuf fb; write_ping( - db, detail::opcode::ping, payload); - boost::asio::write(stream_, db.data(), ec); + fb, detail::opcode::ping, payload); + boost::asio::write(stream_, fb.data(), ec); } template @@ -260,13 +227,53 @@ void stream:: pong(ping_data const& payload, error_code& ec) { - detail::frame_streambuf db; + // Make sure the stream is open + if(failed_) + { + ec = boost::asio::error::operation_aborted; + return; + } + detail::frame_streambuf fb; write_ping( - db, detail::opcode::pong, payload); - boost::asio::write(stream_, db.data(), ec); + fb, detail::opcode::pong, payload); + boost::asio::write(stream_, fb.data(), ec); } -//------------------------------------------------------------------------------ +template +template +async_return_type< + WriteHandler, void(error_code)> +stream:: +async_ping(ping_data const& payload, WriteHandler&& handler) +{ + static_assert(is_async_stream::value, + "AsyncStream requirements requirements not met"); + async_completion init{handler}; + ping_op>{ + init.completion_handler, *this, + detail::opcode::ping, payload}(); + return init.result.get(); +} + +template +template +async_return_type< + WriteHandler, void(error_code)> +stream:: +async_pong(ping_data const& payload, WriteHandler&& handler) +{ + static_assert(is_async_stream::value, + "AsyncStream requirements requirements not met"); + async_completion init{handler}; + ping_op>{ + init.completion_handler, *this, + detail::opcode::pong, payload}(); + return init.result.get(); +} } // websocket } // beast diff --git a/include/boost/beast/websocket/impl/write.ipp b/include/boost/beast/websocket/impl/write.ipp index 6c7daa10..6d7d2817 100644 --- a/include/boost/beast/websocket/impl/write.ipp +++ b/include/boost/beast/websocket/impl/write.ipp @@ -522,8 +522,8 @@ operator()(error_code ec, //-------------------------------------------------------------------------- upcall: - if(ws_.wr_block_ == tok_) - ws_.wr_block_.reset(); + BOOST_ASSERT(ws_.wr_block_ == tok_); + ws_.wr_block_.reset(); ws_.close_op_.maybe_invoke() || ws_.rd_op_.maybe_invoke() || ws_.ping_op_.maybe_invoke(); diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index 87b31eb0..eeef041a 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -154,7 +154,7 @@ class stream detail::prepared_key key; // current stateful mask key std::uint64_t size; // total size of current message so far std::uint64_t remain; // message frame bytes left in current frame - detail::frame_streambuf fb; // to write control frames + detail::frame_streambuf fb; // to write control frames (during reads) detail::utf8_checker utf8; // to validate utf8 // A small, circular buffer to read frame headers.