Use executor_work_guard in composed operations:

fix #1076

As per Asio and Networking TS requirements, composed operations must
maintain an object of type executor_work_guard for the executor associated
with the I/O object, for the lifetime of the asynchronous operation.

This is in addition to the requirement for maintaining an object of type
executor_work_guard for the executor associated with the handler.
This commit is contained in:
Vinnie Falco
2018-04-09 16:22:15 -07:00
parent b4cb4f1fd3
commit 86342dd72b
18 changed files with 154 additions and 34 deletions

View File

@ -1,3 +1,9 @@
Version 168:
* Use executor_work_guard in composed operations
--------------------------------------------------------------------------------
Version 167:
* Revert: Tidy up calls to post()

View File

@ -146,6 +146,9 @@ composed operations:
__io_context__, the underlying stream may be accessed in a fashion
that violates safety guarantees.
* Forgetting to create an object of type __executor_work_guard__ with the
type of executor returned by the stream's `get_executor` member function.
* For operations which complete immediately (i.e. without calling an
intermediate initiating function), forgetting to use __post__ to
invoke the final handler. This breaks the following initiating

View File

@ -23,6 +23,7 @@
#include <boost/beast/core.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/logic/tribool.hpp>
/** Return `true` if a buffer contains a TLS/SSL client handshake.
@ -319,6 +320,14 @@ class detect_ssl_op : public boost::asio::coroutine
// be cheaply copied as needed by the implementation.
AsyncReadStream& stream_;
// Boost.Asio and the Networking TS require an object of
// type executor_work_guard<T>, where T is the type of
// executor returned by the stream's get_executor function,
// to persist for the duration of the asynchronous operation.
boost::asio::executor_work_guard<
decltype(std::declval<AsyncReadStream&>().get_executor())> work_;
DynamicBuffer& buffer_;
Handler handler_;
boost::tribool result_ = false;
@ -336,6 +345,7 @@ public:
DynamicBuffer& buffer,
DeducedHandler&& handler)
: stream_(stream)
, work_(stream.get_executor())
, buffer_(buffer)
, handler_(std::forward<DeducedHandler>(handler))
{

View File

@ -88,6 +88,13 @@ class echo_op
// The stream to read and write to
AsyncStream& stream;
// Boost.Asio and the Networking TS require an object of
// type executor_work_guard<T>, where T is the type of
// executor returned by the stream's get_executor function,
// to persist for the duration of the asynchronous operation.
boost::asio::executor_work_guard<
decltype(std::declval<AsyncStream&>().get_executor())> work;
// Indicates what step in the operation's state machine
// to perform next, starting from zero.
int step = 0;
@ -109,6 +116,7 @@ class echo_op
//
explicit state(Handler const& handler, AsyncStream& stream_)
: stream(stream_)
, work(stream.get_executor())
, buffer((std::numeric_limits<std::size_t>::max)(),
boost::asio::get_associated_allocator(handler))
{
@ -215,7 +223,6 @@ operator()(boost::beast::error_code ec, std::size_t bytes_transferred)
p.buffer.consume(bytes_transferred);
break;
}
// Invoke the final handler. The implementation of `handler_ptr`
// will deallocate the storage for the state before the handler
// is invoked. This is necessary to provide the
@ -226,6 +233,10 @@ operator()(boost::beast::error_code ec, std::size_t bytes_transferred)
// from the `state`, they would have to be moved to the stack
// first or else undefined behavior results.
//
// The work guard is moved to the stack first, otherwise it would
// be destroyed before the handler is invoked.
//
auto work = std::move(p.work);
p_.invoke(ec);
return;
}

View File

@ -18,6 +18,7 @@
#include <boost/beast/core/detail/config.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -31,10 +32,12 @@ template<class MutableBufferSequence, class Handler>
class buffered_read_stream<
Stream, DynamicBuffer>::read_some_op
{
int step_ = 0;
buffered_read_stream& s_;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
MutableBufferSequence b_;
Handler h_;
int step_ = 0;
public:
read_some_op(read_some_op&&) = default;
@ -45,6 +48,7 @@ public:
buffered_read_stream& s,
MutableBufferSequence const& b)
: s_(s)
, wg_(s_.get_executor())
, b_(b)
, h_(std::forward<DeducedHandler>(h))
{

View File

@ -20,6 +20,7 @@
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/basic_stream_socket.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/windows/overlapped_ptr.hpp>
@ -337,6 +338,8 @@ template<
class write_some_win32_op
{
boost::asio::basic_stream_socket<Protocol>& sock_;
boost::asio::executor_work_guard<decltype(std::declval<
boost::asio::basic_stream_socket<Protocol>&>().get_executor())> wg_;
serializer<isRequest,
basic_file_body<file_win32>, Fields>& sr_;
std::size_t bytes_transferred_ = 0;
@ -354,6 +357,7 @@ public:
serializer<isRequest,
basic_file_body<file_win32>,Fields>& sr)
: sock_(s)
, wg_(sock_.get_executor())
, sr_(sr)
, h_(std::forward<DeducedHandler>(h))
{

View File

@ -21,6 +21,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -43,6 +44,8 @@ class read_some_op
: public boost::asio::coroutine
{
Stream& s_;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
DynamicBuffer& b_;
basic_parser<isRequest, Derived>& p_;
std::size_t bytes_transferred_ = 0;
@ -57,6 +60,7 @@ public:
read_some_op(DeducedHandler&& h, Stream& s,
DynamicBuffer& b, basic_parser<isRequest, Derived>& p)
: s_(s)
, wg_(s_.get_executor())
, b_(b)
, p_(p)
, h_(std::forward<DeducedHandler>(h))
@ -170,10 +174,13 @@ operator()(
upcall:
if(! cont_)
return boost::asio::post(
{
BOOST_ASIO_CORO_YIELD
boost::asio::post(
s_.get_executor(),
bind_handler(std::move(h_),
bind_handler(std::move(*this),
ec, bytes_transferred_));
}
h_(ec, bytes_transferred_);
}
}
@ -209,6 +216,8 @@ class read_op
: public boost::asio::coroutine
{
Stream& s_;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
DynamicBuffer& b_;
basic_parser<isRequest, Derived>& p_;
std::size_t bytes_transferred_ = 0;
@ -224,6 +233,7 @@ public:
DynamicBuffer& b, basic_parser<isRequest,
Derived>& p)
: s_(s)
, wg_(s_.get_executor())
, b_(b)
, p_(p)
, h_(std::forward<DeducedHandler>(h))
@ -327,6 +337,8 @@ class read_msg_op
struct data
{
Stream& s;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg;
DynamicBuffer& b;
message_type& m;
parser_type p;
@ -336,6 +348,7 @@ class read_msg_op
data(Handler const&, Stream& s_,
DynamicBuffer& b_, message_type& m_)
: s(s_)
, wg(s.get_executor())
, b(b_)
, m(m_)
, p(std::move(m))
@ -432,7 +445,10 @@ operator()(
}
upcall:
bytes_transferred = d.bytes_transferred;
d_.invoke(ec, bytes_transferred);
{
auto wg = std::move(d.wg);
d_.invoke(ec, bytes_transferred);
}
}
}

View File

@ -18,6 +18,7 @@
#include <boost/beast/core/detail/config.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -38,6 +39,8 @@ template<
class write_some_op
{
Stream& s_;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
serializer<isRequest,Body, Fields>& sr_;
Handler h_;
@ -74,6 +77,7 @@ public:
write_some_op(DeducedHandler&& h, Stream& s,
serializer<isRequest, Body, Fields>& sr)
: s_(s)
, wg_(s_.get_executor())
, sr_(sr)
, h_(std::forward<DeducedHandler>(h))
{
@ -204,11 +208,13 @@ template<
bool isRequest, class Body, class Fields>
class write_op
{
int state_ = 0;
Stream& s_;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
serializer<isRequest, Body, Fields>& sr_;
std::size_t bytes_transferred_ = 0;
Handler h_;
int state_ = 0;
public:
write_op(write_op&&) = default;
@ -218,6 +224,7 @@ public:
write_op(DeducedHandler&& h, Stream& s,
serializer<isRequest, Body, Fields>& sr)
: s_(s)
, wg_(s_.get_executor())
, sr_(sr)
, h_(std::forward<DeducedHandler>(h))
{
@ -321,11 +328,14 @@ class write_msg_op
struct data
{
Stream& s;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg;
serializer<isRequest, Body, Fields> sr;
data(Handler const&, Stream& s_, message<
isRequest, Body, Fields>& m_)
: s(s_)
, wg(s.get_executor())
, sr(m_)
{
}
@ -405,6 +415,7 @@ write_msg_op<
Stream, Handler, isRequest, Body, Fields>::
operator()(error_code ec, std::size_t bytes_transferred)
{
auto wg = std::move(d_->wg);
d_.invoke(ec, bytes_transferred);
}

View File

@ -22,6 +22,7 @@
#include <boost/asio/coroutine.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -43,6 +44,8 @@ class stream<NextLayer, deflateSupported>::response_op
struct data
{
stream<NextLayer, deflateSupported>& ws;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
error_code result;
response_type res;
@ -54,6 +57,7 @@ class stream<NextLayer, deflateSupported>::response_op
http::basic_fields<Allocator>> const& req,
Decorator const& decorator)
: ws(ws_)
, wg(ws.get_executor())
, res(ws_.build_response(req, decorator, result))
{
}
@ -137,7 +141,10 @@ operator()(
d.ws.do_pmd_config(d.res, is_deflate_supported{});
d.ws.open(role_type::server);
}
d_.invoke(ec);
{
auto wg = std::move(d.wg);
d_.invoke(ec);
}
}
}
@ -153,6 +160,8 @@ class stream<NextLayer, deflateSupported>::accept_op
struct data
{
stream<NextLayer, deflateSupported>& ws;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
Decorator decorator;
http::request_parser<http::empty_body> p;
data(
@ -160,6 +169,7 @@ class stream<NextLayer, deflateSupported>::accept_op
stream<NextLayer, deflateSupported>& ws_,
Decorator const& decorator_)
: ws(ws_)
, wg(ws.get_executor())
, decorator(decorator_)
{
}
@ -284,6 +294,7 @@ operator()(error_code ec, std::size_t)
auto& ws = d.ws;
auto const req = d.p.release();
auto const decorator = d.decorator;
auto wg = std::move(d.wg);
#if 1
return response_op<Handler>{
d_.release_handler(),
@ -297,7 +308,10 @@ operator()(error_code ec, std::size_t)
#endif
}
}
d_.invoke(ec);
{
auto wg = std::move(d.wg);
d_.invoke(ec);
}
}
}

View File

@ -18,6 +18,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -43,6 +44,8 @@ class stream<NextLayer, deflateSupported>::close_op
struct state
{
stream<NextLayer, deflateSupported>& ws;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
detail::frame_buffer fb;
error_code ev;
bool cont = false;
@ -52,6 +55,7 @@ class stream<NextLayer, deflateSupported>::close_op
stream<NextLayer, deflateSupported>& ws_,
close_reason const& cr)
: ws(ws_)
, wg(ws.get_executor())
{
// Serialize the close frame
ws.template write_close<
@ -303,12 +307,15 @@ operator()(
d.ws.paused_wr_.maybe_invoke();
if(! d.cont)
{
auto& ws = d.ws;
return boost::asio::post(
ws.stream_.get_executor(),
bind_handler(d_.release_handler(), ec));
BOOST_ASIO_CORO_YIELD
boost::asio::post(
d.ws.get_executor(),
bind_handler(std::move(*this), ec));
}
{
auto wg = std::move(d.wg);
d_.invoke(ec);
}
d_.invoke(ec);
}
}

View File

@ -20,6 +20,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/assert.hpp>
@ -42,6 +43,8 @@ class stream<NextLayer, deflateSupported>::handshake_op
struct data
{
stream<NextLayer, deflateSupported>& ws;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
response_type* res_p;
detail::sec_ws_key_type key;
http::request<http::empty_body> req;
@ -56,6 +59,7 @@ class stream<NextLayer, deflateSupported>::handshake_op
string_view target,
Decorator const& decorator)
: ws(ws_)
, wg(ws.get_executor())
, res_p(res_p_)
, req(ws.build_request(key,
host, target, decorator))
@ -155,7 +159,10 @@ operator()(error_code ec, std::size_t)
if(d.res_p)
swap(d.res, *d.res_p);
upcall:
d_.invoke(ec);
{
auto wg = std::move(d.wg);
d_.invoke(ec);
}
}
}

View File

@ -18,6 +18,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -41,6 +42,8 @@ class stream<NextLayer, deflateSupported>::ping_op
struct state
{
stream<NextLayer, deflateSupported>& ws;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg;
detail::frame_buffer fb;
state(
@ -49,6 +52,7 @@ class stream<NextLayer, deflateSupported>::ping_op
detail::opcode op,
ping_data const& payload)
: ws(ws_)
, wg(ws.get_executor())
{
// Serialize the control frame
ws.template write_ping<
@ -172,7 +176,10 @@ operator()(error_code ec, std::size_t)
d.ws.paused_close_.maybe_invoke() ||
d.ws.paused_rd_.maybe_invoke() ||
d.ws.paused_wr_.maybe_invoke();
d_.invoke(ec);
{
auto wg = std::move(d.wg);
d_.invoke(ec);
}
}
}

View File

@ -21,6 +21,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -82,6 +83,8 @@ class stream<NextLayer, deflateSupported>::read_some_op
{
Handler h_;
stream<NextLayer, deflateSupported>& ws_;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
MutableBufferSequence bs_;
buffers_suffix<MutableBufferSequence> cb_;
std::size_t bytes_written_ = 0;
@ -103,6 +106,7 @@ public:
MutableBufferSequence const& bs)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, wg_(ws_.get_executor())
, bs_(bs)
, cb_(bs)
, code_(close_code::none)
@ -700,10 +704,13 @@ operator()(
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
if(! cont_)
return boost::asio::post(
ws_.stream_.get_executor(),
bind_handler(std::move(h_),
{
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(),
bind_handler(std::move(*this),
ec, bytes_written_));
}
h_(ec, bytes_written_);
}
}
@ -719,6 +726,8 @@ class stream<NextLayer, deflateSupported>::read_op
{
Handler h_;
stream<NextLayer, deflateSupported>& ws_;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
DynamicBuffer& b_;
std::size_t limit_;
std::size_t bytes_written_ = 0;
@ -740,6 +749,7 @@ public:
bool some)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, wg_(ws_.get_executor())
, b_(b)
, limit_(limit ? limit : (
std::numeric_limits<std::size_t>::max)())

View File

@ -15,6 +15,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
@ -34,6 +35,8 @@ class teardown_tcp_op : public boost::asio::coroutine
Handler h_;
socket_type& s_;
boost::asio::executor_work_guard<decltype(std::declval<
socket_type&>().get_executor())> wg_;
role_type role_;
bool nb_;
@ -48,6 +51,7 @@ public:
role_type role)
: h_(std::forward<DeducedHandler>(h))
, s_(s)
, wg_(s_.get_executor())
, role_(role)
{
}

View File

@ -22,6 +22,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/assert.hpp>
@ -141,6 +142,8 @@ class stream<NextLayer, deflateSupported>::write_some_op
{
Handler h_;
stream<NextLayer, deflateSupported>& ws_;
boost::asio::executor_work_guard<decltype(std::declval<
stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
buffers_suffix<Buffers> cb_;
detail::frame_header fh_;
detail::prepared_key key_;
@ -166,6 +169,7 @@ public:
Buffers const& bs)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, wg_(ws_.get_executor())
, cb_(bs)
, fin_(fin)
{
@ -569,9 +573,12 @@ operator()(
ws_.paused_rd_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke();
if(! cont_)
return boost::asio::post(
ws_.stream_.get_executor(),
bind_handler(std::move(h_), ec, bytes_transferred_));
{
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(),
bind_handler(std::move(*this), ec, bytes_transferred_));
}
h_(ec, bytes_transferred_);
}
}

View File

@ -17,6 +17,7 @@
#include <boost/beast/test/stream.hpp>
#include <boost/beast/test/yield_to.hpp>
#include <boost/beast/unit_test/suite.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/optional.hpp>
@ -71,9 +72,8 @@ public:
std::ostream& log_;
boost::asio::io_context ioc_;
boost::optional<
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>> work_;
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type> work_;
static_buffer<buf_size> buffer_;
test::stream ts_;
std::thread t_;
@ -115,7 +115,7 @@ public:
~echo_server()
{
work_ = boost::none;
work_.reset();
t_.join();
}

View File

@ -19,6 +19,7 @@
#include <boost/beast/test/fail_counter.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/assert.hpp>
@ -697,9 +698,8 @@ class stream::read_op_impl : public stream::read_op
state& s_;
Buffers b_;
Handler h_;
boost::optional<
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>> work_;
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type> work_;
public:
lambda(lambda&&) = default;
@ -720,7 +720,7 @@ class stream::read_op_impl : public stream::read_op
boost::asio::post(
s_.ioc.get_executor(),
std::move(*this));
work_ = boost::none;
work_.reset();
}
void

View File

@ -10,9 +10,9 @@
#ifndef BOOST_BEAST_TEST_YIELD_TO_HPP
#define BOOST_BEAST_TEST_YIELD_TO_HPP
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/optional.hpp>
#include <condition_variable>
#include <functional>
#include <mutex>
@ -35,9 +35,8 @@ protected:
boost::asio::io_context ioc_;
private:
boost::optional<
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>> work_;
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type> work_;
std::vector<std::thread> threads_;
std::mutex m_;
std::condition_variable cv_;
@ -60,7 +59,7 @@ public:
~enable_yield_to()
{
work_ = boost::none;
work_.reset();
for(auto& t : threads_)
t.join();
}