diff --git a/CHANGELOG.md b/CHANGELOG.md index bf481e2c..9a4dcfd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +Version 168: + +* Use executor_work_guard in composed operations + +-------------------------------------------------------------------------------- + Version 167: * Revert: Tidy up calls to post() diff --git a/doc/qbk/03_core/5_composed.qbk b/doc/qbk/03_core/5_composed.qbk index 2eec4abd..cd1933e9 100644 --- a/doc/qbk/03_core/5_composed.qbk +++ b/doc/qbk/03_core/5_composed.qbk @@ -146,6 +146,9 @@ composed operations: __io_context__, the underlying stream may be accessed in a fashion that violates safety guarantees. +* Forgetting to create an object of type __executor_work_guard__ with the + type of executor returned by the stream's `get_executor` member function. + * For operations which complete immediately (i.e. without calling an intermediate initiating function), forgetting to use __post__ to invoke the final handler. This breaks the following initiating diff --git a/example/common/detect_ssl.hpp b/example/common/detect_ssl.hpp index 6fd1d654..8be32a99 100644 --- a/example/common/detect_ssl.hpp +++ b/example/common/detect_ssl.hpp @@ -23,6 +23,7 @@ #include #include +#include #include /** Return `true` if a buffer contains a TLS/SSL client handshake. @@ -319,6 +320,14 @@ class detect_ssl_op : public boost::asio::coroutine // be cheaply copied as needed by the implementation. AsyncReadStream& stream_; + + // Boost.Asio and the Networking TS require an object of + // type executor_work_guard, where T is the type of + // executor returned by the stream's get_executor function, + // to persist for the duration of the asynchronous operation. + boost::asio::executor_work_guard< + decltype(std::declval().get_executor())> work_; + DynamicBuffer& buffer_; Handler handler_; boost::tribool result_ = false; @@ -336,6 +345,7 @@ public: DynamicBuffer& buffer, DeducedHandler&& handler) : stream_(stream) + , work_(stream.get_executor()) , buffer_(buffer) , handler_(std::forward(handler)) { diff --git a/example/echo-op/echo_op.cpp b/example/echo-op/echo_op.cpp index 0d73da87..a26f3a06 100644 --- a/example/echo-op/echo_op.cpp +++ b/example/echo-op/echo_op.cpp @@ -88,6 +88,13 @@ class echo_op // The stream to read and write to AsyncStream& stream; + // Boost.Asio and the Networking TS require an object of + // type executor_work_guard, where T is the type of + // executor returned by the stream's get_executor function, + // to persist for the duration of the asynchronous operation. + boost::asio::executor_work_guard< + decltype(std::declval().get_executor())> work; + // Indicates what step in the operation's state machine // to perform next, starting from zero. int step = 0; @@ -109,6 +116,7 @@ class echo_op // explicit state(Handler const& handler, AsyncStream& stream_) : stream(stream_) + , work(stream.get_executor()) , buffer((std::numeric_limits::max)(), boost::asio::get_associated_allocator(handler)) { @@ -215,7 +223,6 @@ operator()(boost::beast::error_code ec, std::size_t bytes_transferred) p.buffer.consume(bytes_transferred); break; } - // Invoke the final handler. The implementation of `handler_ptr` // will deallocate the storage for the state before the handler // is invoked. This is necessary to provide the @@ -226,6 +233,10 @@ operator()(boost::beast::error_code ec, std::size_t bytes_transferred) // from the `state`, they would have to be moved to the stack // first or else undefined behavior results. // + // The work guard is moved to the stack first, otherwise it would + // be destroyed before the handler is invoked. + // + auto work = std::move(p.work); p_.invoke(ec); return; } diff --git a/include/boost/beast/core/impl/buffered_read_stream.ipp b/include/boost/beast/core/impl/buffered_read_stream.ipp index 1ae76b4d..b199ea06 100644 --- a/include/boost/beast/core/impl/buffered_read_stream.ipp +++ b/include/boost/beast/core/impl/buffered_read_stream.ipp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -31,10 +32,12 @@ template class buffered_read_stream< Stream, DynamicBuffer>::read_some_op { - int step_ = 0; buffered_read_stream& s_; + boost::asio::executor_work_guard().get_executor())> wg_; MutableBufferSequence b_; Handler h_; + int step_ = 0; public: read_some_op(read_some_op&&) = default; @@ -45,6 +48,7 @@ public: buffered_read_stream& s, MutableBufferSequence const& b) : s_(s) + , wg_(s_.get_executor()) , b_(b) , h_(std::forward(h)) { diff --git a/include/boost/beast/http/impl/file_body_win32.ipp b/include/boost/beast/http/impl/file_body_win32.ipp index 627401f5..6357cf49 100644 --- a/include/boost/beast/http/impl/file_body_win32.ipp +++ b/include/boost/beast/http/impl/file_body_win32.ipp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -337,6 +338,8 @@ template< class write_some_win32_op { boost::asio::basic_stream_socket& sock_; + boost::asio::executor_work_guard&>().get_executor())> wg_; serializer, Fields>& sr_; std::size_t bytes_transferred_ = 0; @@ -354,6 +357,7 @@ public: serializer,Fields>& sr) : sock_(s) + , wg_(sock_.get_executor()) , sr_(sr) , h_(std::forward(h)) { diff --git a/include/boost/beast/http/impl/read.ipp b/include/boost/beast/http/impl/read.ipp index 28ff0a3c..2ff0c3b1 100644 --- a/include/boost/beast/http/impl/read.ipp +++ b/include/boost/beast/http/impl/read.ipp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,8 @@ class read_some_op : public boost::asio::coroutine { Stream& s_; + boost::asio::executor_work_guard().get_executor())> wg_; DynamicBuffer& b_; basic_parser& p_; std::size_t bytes_transferred_ = 0; @@ -57,6 +60,7 @@ public: read_some_op(DeducedHandler&& h, Stream& s, DynamicBuffer& b, basic_parser& p) : s_(s) + , wg_(s_.get_executor()) , b_(b) , p_(p) , h_(std::forward(h)) @@ -170,10 +174,13 @@ operator()( upcall: if(! cont_) - return boost::asio::post( + { + BOOST_ASIO_CORO_YIELD + boost::asio::post( s_.get_executor(), - bind_handler(std::move(h_), + bind_handler(std::move(*this), ec, bytes_transferred_)); + } h_(ec, bytes_transferred_); } } @@ -209,6 +216,8 @@ class read_op : public boost::asio::coroutine { Stream& s_; + boost::asio::executor_work_guard().get_executor())> wg_; DynamicBuffer& b_; basic_parser& p_; std::size_t bytes_transferred_ = 0; @@ -224,6 +233,7 @@ public: DynamicBuffer& b, basic_parser& p) : s_(s) + , wg_(s_.get_executor()) , b_(b) , p_(p) , h_(std::forward(h)) @@ -327,6 +337,8 @@ class read_msg_op struct data { Stream& s; + boost::asio::executor_work_guard().get_executor())> wg; DynamicBuffer& b; message_type& m; parser_type p; @@ -336,6 +348,7 @@ class read_msg_op data(Handler const&, Stream& s_, DynamicBuffer& b_, message_type& m_) : s(s_) + , wg(s.get_executor()) , b(b_) , m(m_) , p(std::move(m)) @@ -432,7 +445,10 @@ operator()( } upcall: bytes_transferred = d.bytes_transferred; - d_.invoke(ec, bytes_transferred); + { + auto wg = std::move(d.wg); + d_.invoke(ec, bytes_transferred); + } } } diff --git a/include/boost/beast/http/impl/write.ipp b/include/boost/beast/http/impl/write.ipp index af31cd9b..2f7d6218 100644 --- a/include/boost/beast/http/impl/write.ipp +++ b/include/boost/beast/http/impl/write.ipp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -38,6 +39,8 @@ template< class write_some_op { Stream& s_; + boost::asio::executor_work_guard().get_executor())> wg_; serializer& sr_; Handler h_; @@ -74,6 +77,7 @@ public: write_some_op(DeducedHandler&& h, Stream& s, serializer& sr) : s_(s) + , wg_(s_.get_executor()) , sr_(sr) , h_(std::forward(h)) { @@ -204,11 +208,13 @@ template< bool isRequest, class Body, class Fields> class write_op { - int state_ = 0; Stream& s_; + boost::asio::executor_work_guard().get_executor())> wg_; serializer& sr_; std::size_t bytes_transferred_ = 0; Handler h_; + int state_ = 0; public: write_op(write_op&&) = default; @@ -218,6 +224,7 @@ public: write_op(DeducedHandler&& h, Stream& s, serializer& sr) : s_(s) + , wg_(s_.get_executor()) , sr_(sr) , h_(std::forward(h)) { @@ -321,11 +328,14 @@ class write_msg_op struct data { Stream& s; + boost::asio::executor_work_guard().get_executor())> wg; serializer sr; data(Handler const&, Stream& s_, message< isRequest, Body, Fields>& m_) : s(s_) + , wg(s.get_executor()) , sr(m_) { } @@ -405,6 +415,7 @@ write_msg_op< Stream, Handler, isRequest, Body, Fields>:: operator()(error_code ec, std::size_t bytes_transferred) { + auto wg = std::move(d_->wg); d_.invoke(ec, bytes_transferred); } diff --git a/include/boost/beast/websocket/impl/accept.ipp b/include/boost/beast/websocket/impl/accept.ipp index 9daf7b31..c93e933b 100644 --- a/include/boost/beast/websocket/impl/accept.ipp +++ b/include/boost/beast/websocket/impl/accept.ipp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,8 @@ class stream::response_op struct data { stream& ws; + boost::asio::executor_work_guard&>().get_executor())> wg; error_code result; response_type res; @@ -54,6 +57,7 @@ class stream::response_op http::basic_fields> const& req, Decorator const& decorator) : ws(ws_) + , wg(ws.get_executor()) , res(ws_.build_response(req, decorator, result)) { } @@ -137,7 +141,10 @@ operator()( d.ws.do_pmd_config(d.res, is_deflate_supported{}); d.ws.open(role_type::server); } - d_.invoke(ec); + { + auto wg = std::move(d.wg); + d_.invoke(ec); + } } } @@ -153,6 +160,8 @@ class stream::accept_op struct data { stream& ws; + boost::asio::executor_work_guard&>().get_executor())> wg; Decorator decorator; http::request_parser p; data( @@ -160,6 +169,7 @@ class stream::accept_op stream& ws_, Decorator const& decorator_) : ws(ws_) + , wg(ws.get_executor()) , decorator(decorator_) { } @@ -284,6 +294,7 @@ operator()(error_code ec, std::size_t) auto& ws = d.ws; auto const req = d.p.release(); auto const decorator = d.decorator; + auto wg = std::move(d.wg); #if 1 return response_op{ d_.release_handler(), @@ -297,7 +308,10 @@ operator()(error_code ec, std::size_t) #endif } } - d_.invoke(ec); + { + auto wg = std::move(d.wg); + d_.invoke(ec); + } } } diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index 7b0e1ff6..ef078ddd 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,8 @@ class stream::close_op struct state { stream& ws; + boost::asio::executor_work_guard&>().get_executor())> wg; detail::frame_buffer fb; error_code ev; bool cont = false; @@ -52,6 +55,7 @@ class stream::close_op stream& ws_, close_reason const& cr) : ws(ws_) + , wg(ws.get_executor()) { // Serialize the close frame ws.template write_close< @@ -303,12 +307,15 @@ operator()( d.ws.paused_wr_.maybe_invoke(); if(! d.cont) { - auto& ws = d.ws; - return boost::asio::post( - ws.stream_.get_executor(), - bind_handler(d_.release_handler(), ec)); + BOOST_ASIO_CORO_YIELD + boost::asio::post( + d.ws.get_executor(), + bind_handler(std::move(*this), ec)); + } + { + auto wg = std::move(d.wg); + d_.invoke(ec); } - d_.invoke(ec); } } diff --git a/include/boost/beast/websocket/impl/handshake.ipp b/include/boost/beast/websocket/impl/handshake.ipp index 8e33e1a8..a1a826be 100644 --- a/include/boost/beast/websocket/impl/handshake.ipp +++ b/include/boost/beast/websocket/impl/handshake.ipp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,8 @@ class stream::handshake_op struct data { stream& ws; + boost::asio::executor_work_guard&>().get_executor())> wg; response_type* res_p; detail::sec_ws_key_type key; http::request req; @@ -56,6 +59,7 @@ class stream::handshake_op string_view target, Decorator const& decorator) : ws(ws_) + , wg(ws.get_executor()) , res_p(res_p_) , req(ws.build_request(key, host, target, decorator)) @@ -155,7 +159,10 @@ operator()(error_code ec, std::size_t) if(d.res_p) swap(d.res, *d.res_p); upcall: - d_.invoke(ec); + { + auto wg = std::move(d.wg); + d_.invoke(ec); + } } } diff --git a/include/boost/beast/websocket/impl/ping.ipp b/include/boost/beast/websocket/impl/ping.ipp index c7deb9c3..6ceaaeab 100644 --- a/include/boost/beast/websocket/impl/ping.ipp +++ b/include/boost/beast/websocket/impl/ping.ipp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,8 @@ class stream::ping_op struct state { stream& ws; + boost::asio::executor_work_guard&>().get_executor())> wg; detail::frame_buffer fb; state( @@ -49,6 +52,7 @@ class stream::ping_op detail::opcode op, ping_data const& payload) : ws(ws_) + , wg(ws.get_executor()) { // Serialize the control frame ws.template write_ping< @@ -172,7 +176,10 @@ operator()(error_code ec, std::size_t) d.ws.paused_close_.maybe_invoke() || d.ws.paused_rd_.maybe_invoke() || d.ws.paused_wr_.maybe_invoke(); - d_.invoke(ec); + { + auto wg = std::move(d.wg); + d_.invoke(ec); + } } } diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index 1dfbd01e..73ea62d1 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -82,6 +83,8 @@ class stream::read_some_op { Handler h_; stream& ws_; + boost::asio::executor_work_guard&>().get_executor())> wg_; MutableBufferSequence bs_; buffers_suffix cb_; std::size_t bytes_written_ = 0; @@ -103,6 +106,7 @@ public: MutableBufferSequence const& bs) : h_(std::forward(h)) , ws_(ws) + , wg_(ws_.get_executor()) , bs_(bs) , cb_(bs) , code_(close_code::none) @@ -700,10 +704,13 @@ operator()( ws_.paused_ping_.maybe_invoke() || ws_.paused_wr_.maybe_invoke(); if(! cont_) - return boost::asio::post( - ws_.stream_.get_executor(), - bind_handler(std::move(h_), + { + BOOST_ASIO_CORO_YIELD + boost::asio::post( + ws_.get_executor(), + bind_handler(std::move(*this), ec, bytes_written_)); + } h_(ec, bytes_written_); } } @@ -719,6 +726,8 @@ class stream::read_op { Handler h_; stream& ws_; + boost::asio::executor_work_guard&>().get_executor())> wg_; DynamicBuffer& b_; std::size_t limit_; std::size_t bytes_written_ = 0; @@ -740,6 +749,7 @@ public: bool some) : h_(std::forward(h)) , ws_(ws) + , wg_(ws_.get_executor()) , b_(b) , limit_(limit ? limit : ( std::numeric_limits::max)()) diff --git a/include/boost/beast/websocket/impl/teardown.ipp b/include/boost/beast/websocket/impl/teardown.ipp index add6b277..fe94d552 100644 --- a/include/boost/beast/websocket/impl/teardown.ipp +++ b/include/boost/beast/websocket/impl/teardown.ipp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,8 @@ class teardown_tcp_op : public boost::asio::coroutine Handler h_; socket_type& s_; + boost::asio::executor_work_guard().get_executor())> wg_; role_type role_; bool nb_; @@ -48,6 +51,7 @@ public: role_type role) : h_(std::forward(h)) , s_(s) + , wg_(s_.get_executor()) , role_(role) { } diff --git a/include/boost/beast/websocket/impl/write.ipp b/include/boost/beast/websocket/impl/write.ipp index d12f2f9e..ee2bc8d4 100644 --- a/include/boost/beast/websocket/impl/write.ipp +++ b/include/boost/beast/websocket/impl/write.ipp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -141,6 +142,8 @@ class stream::write_some_op { Handler h_; stream& ws_; + boost::asio::executor_work_guard&>().get_executor())> wg_; buffers_suffix cb_; detail::frame_header fh_; detail::prepared_key key_; @@ -166,6 +169,7 @@ public: Buffers const& bs) : h_(std::forward(h)) , ws_(ws) + , wg_(ws_.get_executor()) , cb_(bs) , fin_(fin) { @@ -569,9 +573,12 @@ operator()( ws_.paused_rd_.maybe_invoke() || ws_.paused_ping_.maybe_invoke(); if(! cont_) - return boost::asio::post( - ws_.stream_.get_executor(), - bind_handler(std::move(h_), ec, bytes_transferred_)); + { + BOOST_ASIO_CORO_YIELD + boost::asio::post( + ws_.get_executor(), + bind_handler(std::move(*this), ec, bytes_transferred_)); + } h_(ec, bytes_transferred_); } } diff --git a/test/beast/websocket/test.hpp b/test/beast/websocket/test.hpp index c9f16b43..2cb6c09d 100644 --- a/test/beast/websocket/test.hpp +++ b/test/beast/websocket/test.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -71,9 +72,8 @@ public: std::ostream& log_; boost::asio::io_context ioc_; - boost::optional< - boost::asio::executor_work_guard< - boost::asio::io_context::executor_type>> work_; + boost::asio::executor_work_guard< + boost::asio::io_context::executor_type> work_; static_buffer buffer_; test::stream ts_; std::thread t_; @@ -115,7 +115,7 @@ public: ~echo_server() { - work_ = boost::none; + work_.reset(); t_.join(); } diff --git a/test/extras/include/boost/beast/test/stream.hpp b/test/extras/include/boost/beast/test/stream.hpp index 58c0ef43..ba2c4b3d 100644 --- a/test/extras/include/boost/beast/test/stream.hpp +++ b/test/extras/include/boost/beast/test/stream.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -697,9 +698,8 @@ class stream::read_op_impl : public stream::read_op state& s_; Buffers b_; Handler h_; - boost::optional< - boost::asio::executor_work_guard< - boost::asio::io_context::executor_type>> work_; + boost::asio::executor_work_guard< + boost::asio::io_context::executor_type> work_; public: lambda(lambda&&) = default; @@ -720,7 +720,7 @@ class stream::read_op_impl : public stream::read_op boost::asio::post( s_.ioc.get_executor(), std::move(*this)); - work_ = boost::none; + work_.reset(); } void diff --git a/test/extras/include/boost/beast/test/yield_to.hpp b/test/extras/include/boost/beast/test/yield_to.hpp index 1fb3738f..2893b547 100644 --- a/test/extras/include/boost/beast/test/yield_to.hpp +++ b/test/extras/include/boost/beast/test/yield_to.hpp @@ -10,9 +10,9 @@ #ifndef BOOST_BEAST_TEST_YIELD_TO_HPP #define BOOST_BEAST_TEST_YIELD_TO_HPP +#include #include #include -#include #include #include #include @@ -35,9 +35,8 @@ protected: boost::asio::io_context ioc_; private: - boost::optional< - boost::asio::executor_work_guard< - boost::asio::io_context::executor_type>> work_; + boost::asio::executor_work_guard< + boost::asio::io_context::executor_type> work_; std::vector threads_; std::mutex m_; std::condition_variable cv_; @@ -60,7 +59,7 @@ public: ~enable_yield_to() { - work_ = boost::none; + work_.reset(); for(auto& t : threads_) t.join(); }