Support BOOST_ASIO_NO_TS_EXECUTORS

This commit is contained in:
Richard Hodges
2020-06-25 16:03:59 +02:00
parent 5f1adfbc6d
commit 060f59685c
20 changed files with 285 additions and 92 deletions

View File

@ -1,3 +1,4 @@
* Support BOOST_ASIO_NO_TS_EXECUTORS.
* Use strand<> rather than legacy executor io_context::strand.
* Use dispatch/post free functions to be independent of executor concept.
* New name for polymorphic executor. Trait for detecting new executors.

View File

@ -240,7 +240,7 @@ int main(int argc, char** argv)
// The async operations will run on the system_executor.
// Because the main thread has nothing to do in this example, we just wait
// for the system_executor to run out of work.
net::system_executor().context().join();
net::query(net::system_executor(), net::execution::context).join();
return EXIT_SUCCESS;
}

View File

@ -356,7 +356,10 @@ int main(int argc, char* argv[])
net::io_context ioc;
// The work keeps io_context::run from returning
auto work = net::make_work_guard(ioc);
auto work = net::any_io_executor(
net::prefer(
ioc.get_executor(),
net::execution::outstanding_work.tracked));
// The report holds the aggregated statistics
crawl_report report{ioc};
@ -395,7 +398,7 @@ int main(int argc, char* argv[])
// If this is the last thread, reset the
// work object so that it can return from run.
if(i == workers.size() - 1)
work.reset();
work = {};
// Wait for the thread to exit
thread.join();

View File

@ -253,7 +253,7 @@ int main(int argc, char** argv)
// The async operations will run on the system_executor.
// Because the main thread has nothing to do in this example, we just wait
// for the system_executor to run out of work.
net::system_executor().context().join();
net::query(net::system_executor(), net::execution::context).join();
return EXIT_SUCCESS;
}

View File

@ -14,10 +14,10 @@
#include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/detail/allocator.hpp>
#include <boost/beast/core/detail/async_base.hpp>
#include <boost/beast/core/detail/work_guard.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_alloc_hook.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
@ -186,7 +186,27 @@ class async_base
"Executor type requirements not met");
Handler h_;
net::executor_work_guard<Executor1> wg1_;
detail::select_work_guard_t<Executor1> wg1_;
public:
/** The type of executor associated with this object.
If a class derived from @ref async_base is a completion
handler, then the associated executor of the derived class will
be this type.
*/
using executor_type =
#if BOOST_BEAST_DOXYGEN
__implementation_defined__;
#else
typename
net::associated_executor<
Handler,
typename detail::select_work_guard_t<Executor1>::executor_type
>::type;
#endif
private:
virtual
void
@ -229,7 +249,7 @@ public:
Handler_&& handler,
Executor1 const& ex1)
: h_(std::forward<Handler_>(handler))
, wg1_(ex1)
, wg1_(detail::make_work_guard(ex1))
{
}
@ -262,15 +282,6 @@ public:
using allocator_type =
net::associated_allocator_t<Handler, Allocator>;
/** The type of executor associated with this object.
If a class derived from @ref async_base is a completion
handler, then the associated executor of the derived class will
be this type.
*/
using executor_type =
net::associated_executor_t<Handler, Executor1>;
/** Returns the allocator associated with this object.
If a class derived from @ref async_base is a completion
@ -293,8 +304,7 @@ public:
executor_type
get_executor() const noexcept
{
return net::get_associated_executor(
h_, wg1_.get_executor());
return net::get_associated_executor(h_, wg1_.get_executor());
}
/// Returns the handler associated with this object

View File

@ -11,6 +11,9 @@
#define BOOST_BEAST_DETAIL_GET_IO_CONTEXT_HPP
#include <boost/beast/core/stream_traits.hpp>
#ifdef BOOST_ASIO_NO_TS_EXECUTORS
#include <boost/asio/execution.hpp>
#endif
#include <boost/asio/executor.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>

View File

@ -0,0 +1,93 @@
#ifndef BOOST_BEAST_CORE_DETAIL_WORK_GUARD_HPP
#define BOOST_BEAST_CORE_DETAIL_WORK_GUARD_HPP
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/execution.hpp>
#include <boost/optional.hpp>
namespace boost {
namespace beast {
namespace detail {
template<class Executor, class Enable = void>
struct select_work_guard;
template<class Executor>
using select_work_guard_t = typename
select_work_guard<Executor>::type;
#if !defined(BOOST_ASIO_NO_TS_EXECUTORS)
template<class Executor>
struct select_work_guard
<
Executor,
typename std::enable_if
<
net::is_executor<Executor>::value
>::type
>
{
using type = net::executor_work_guard<Executor>;
};
#endif
template<class Executor>
struct execution_work_guard
{
using executor_type = decltype(
net::prefer(std::declval<Executor const&>(),
net::execution::outstanding_work.tracked));
execution_work_guard(Executor const& exec)
: ex_(net::prefer(exec, net::execution::outstanding_work.tracked))
{
}
executor_type
get_executor() const noexcept
{
BOOST_ASSERT(ex_.has_value());
return *ex_;
}
void reset() noexcept
{
ex_.reset();
}
private:
boost::optional<executor_type> ex_;
};
template<class Executor>
struct select_work_guard
<
Executor,
typename std::enable_if
<
net::execution::is_executor<Executor>::value
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
|| net::is_executor<Executor>::value
#else
&& !net::is_executor<Executor>::value
#endif
>::type
>
{
using type = execution_work_guard<Executor>;
};
template<class Executor>
select_work_guard_t<Executor>
make_work_guard(Executor const& exec) noexcept
{
return select_work_guard_t<Executor>(exec);
}
}
}
}
#endif // BOOST_BEAST_CORE_DETAIL_WORK_GUARD_HPP

View File

@ -64,9 +64,10 @@ class saved_handler::impl final : public base
ebo_pair v_;
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
std::decay_t<decltype(net::prefer(
std::declval<net::associated_executor_t<Handler>>(),
net::execution::outstanding_work.tracked))> wg2_;
typename std::decay<decltype(net::prefer(std::declval<
net::associated_executor_t<Handler>>(),
net::execution::outstanding_work.tracked))>::type
wg2_;
#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::executor_work_guard<
net::associated_executor_t<Handler>> wg2_;

View File

@ -128,7 +128,8 @@ struct stream<NextLayer, deflateSupported>::impl_type
boost::empty_init_t{},
std::forward<Args>(args)...)
, detail::service::impl_type(
this->boost::empty_value<NextLayer>::get().get_executor().context())
net::query(this->boost::empty_value<NextLayer>::get().get_executor(),
net::execution::context))
, timer(this->boost::empty_value<NextLayer>::get().get_executor())
{
timeout_opt.handshake_timeout = none();

View File

@ -25,6 +25,7 @@
#include <boost/core/ignore_unused.hpp>
#include <stdexcept>
//------------------------------------------------------------------------------
namespace boost {
@ -32,6 +33,52 @@ namespace beast {
namespace {
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
struct ex1_type
{
net::execution_context &
query(net::execution::context_t c) const noexcept
{ return *reinterpret_cast<net::execution_context *>(0); }
net::execution::blocking_t
query(net::execution::blocking_t) const noexcept
{ return net::execution::blocking; };
net::execution::outstanding_work_t
query(net::execution::outstanding_work_t w) const noexcept
{ return net::execution::outstanding_work; }
ex1_type
require(net::execution::blocking_t::possibly_t b) const
{ return *this; }
ex1_type
require(net::execution::blocking_t::never_t b) const
{ return *this; };
ex1_type
prefer(net::execution::outstanding_work_t::untracked_t w) const
{ return *this; };
ex1_type
prefer(net::execution::outstanding_work_t::tracked_t w) const
{ return *this; };
template<class F>
void
execute(F &&) const
{}
bool
operator==(ex1_type const &) const noexcept
{ return true; }
bool
operator!=(ex1_type const &) const noexcept
{ return false; }
};
BOOST_STATIC_ASSERT(net::execution::is_executor<ex1_type>::value);
#else
struct ex1_type
{
void* context() { return nullptr; }
@ -41,6 +88,9 @@ struct ex1_type
template<class F> void post(F&&) {}
template<class F> void defer(F&&) {}
};
BOOST_STATIC_ASSERT(net::is_executor<ex1_type>::value);
#endif
struct no_alloc
{
@ -430,11 +480,13 @@ public:
}
{
net::io_context ioc;
async_base<
test::handler,
net::io_context::executor_type> op(
test::any_handler(), ioc.get_executor());
op.complete(false);
auto op = new
async_base<
test::handler,
net::io_context::executor_type>(
test::any_handler(), ioc.get_executor());
op->complete(false);
delete op;
ioc.run();
}
{
@ -506,12 +558,13 @@ public:
net::io_context ioc1;
net::io_context ioc2;
auto h = net::bind_executor(ioc2, test::any_handler());
stable_async_base<
auto op = new stable_async_base<
decltype(h),
net::io_context::executor_type> op(
net::io_context::executor_type>(
std::move(h),
ioc1.get_executor());
op.complete(false);
op->complete(false);
delete op;
BEAST_EXPECT(ioc1.run() == 0);
BEAST_EXPECT(ioc2.run() == 1);
}
@ -678,7 +731,9 @@ public:
: base_type(std::move(handler), stream.get_executor())
, stream_(stream)
, repeats_(repeats)
, data_(allocate_stable<temporary_data>(*this, std::move(message), stream.get_executor().context()))
, data_(allocate_stable<temporary_data>(*this,
std::move(message),
net::query(stream.get_executor(), net::execution::context)))
{
(*this)(); // start the operation
}

View File

@ -1226,6 +1226,7 @@ public:
{
return {};
}
void process_http_1 (tcp_stream& stream, net::yield_context yield)
{
flat_buffer buffer;

View File

@ -80,11 +80,9 @@ public:
class test_executor
{
bind_handler_test& s_;
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::any_io_executor ex_;
#else
net::io_context::executor_type ex_;
#endif
// Storing the blocking property as a member is not strictly necessary,
// as we could simply forward the calls
@ -97,6 +95,9 @@ public:
// outstanding_work property.
net::execution::blocking_t blocking_;
#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::io_context::executor_type ex_;
#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
public:
test_executor(
test_executor const&) = default;
@ -106,7 +107,9 @@ public:
net::io_context& ioc)
: s_(s)
, ex_(ioc.get_executor())
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
, blocking_(net::execution::blocking.possibly)
#endif
{
}
@ -123,6 +126,7 @@ public:
}
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::execution_context& query(net::execution::context_t c) const noexcept
{
return net::query(ex_, c);
@ -182,7 +186,6 @@ public:
}
}
#endif
#if !defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::execution_context&
context() const noexcept
@ -247,7 +250,7 @@ public:
BOOST_STATIC_ASSERT(std::is_nothrow_destructible<T>::value);
BOOST_STATIC_ASSERT(boost::asio::traits::equality_comparable<T>::is_valid);
BOOST_STATIC_ASSERT(boost::asio::traits::equality_comparable<T>::is_noexcept);
BOOST_STATIC_ASSERT(net::execution::is_executor_v<test_executor>);
BOOST_STATIC_ASSERT(net::execution::is_executor<test_executor>::value);
#else
BOOST_STATIC_ASSERT(net::is_executor<test_executor>::value);
#endif

View File

@ -18,7 +18,6 @@
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/optional.hpp>
#if BOOST_ASIO_HAS_CO_AWAIT
@ -42,9 +41,10 @@ public:
buffered_read_stream<test::stream, multi_buffer> srs(ioc);
buffered_read_stream<test::stream, multi_buffer> srs2(std::move(srs));
srs = std::move(srs2);
BEAST_EXPECT(&srs.get_executor().context() == &ioc);
BEAST_EXPECT(&net::query(srs.get_executor(), net::execution::context) == &ioc);
BEAST_EXPECT(
&srs.get_executor().context() == &srs2.get_executor().context());
&net::query(srs.get_executor(), net::execution::context) ==
&net::query(srs2.get_executor(), net::execution::context));
}
{
test::stream ts{ioc};
@ -238,7 +238,6 @@ public:
{
testRead(yield);
});
testAsyncLoop();
#if BOOST_ASIO_HAS_CO_AWAIT

View File

@ -14,7 +14,6 @@
#include <boost/beast/core/error.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>
namespace boost {

View File

@ -57,7 +57,7 @@ class simple_executor
std::size_t id_;
public:
simple_executor()
simple_executor() noexcept
: id_([]
{
static std::size_t n = 0;
@ -66,12 +66,19 @@ public:
{
}
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
void* query(net::execution::context_t) const { return nullptr; }
template<class F>
void execute(F&&) const {}
simple_executor prefer(net::execution::outstanding_work_t::tracked_t) const { return *this; }
#else
void* context() { return nullptr; }
void on_work_started() {}
void on_work_finished() {}
template<class F> void dispatch(F&&) {}
template<class F> void post(F&&) {}
template<class F> void defer(F&&) {}
#endif
friend
bool operator==(
@ -90,6 +97,15 @@ public:
}
};
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
static_assert(net::execution::can_execute<simple_executor, net::execution::invocable_archetype>::value, "");
static_assert(std::is_nothrow_copy_constructible<simple_executor>::value, "");
static_assert(std::is_nothrow_destructible<simple_executor>::value, "");
static_assert(net::traits::equality_comparable<simple_executor>::is_valid, "");
static_assert(net::traits::equality_comparable<simple_executor>::is_noexcept, "");
static_assert(net::execution::is_executor<simple_executor>::value, "");
#endif
// A move-only handler
struct move_only_handler
{

View File

@ -72,8 +72,7 @@ public:
std::ostream& log_;
net::io_context ioc_;
net::executor_work_guard<
net::io_context::executor_type> work_;
net::any_io_executor work_;
static_buffer<buf_size> buffer_;
test::stream ts_;
std::thread t_;
@ -86,7 +85,7 @@ public:
std::ostream& log,
kind k = kind::sync)
: log_(log)
, work_(ioc_.get_executor())
, work_(net::prefer(ioc_.get_executor(), net::execution::outstanding_work.tracked))
, ts_(ioc_)
, ws_(ts_)
{
@ -115,7 +114,7 @@ public:
~echo_server()
{
work_.reset();
work_ = {};
t_.join();
}
@ -1249,8 +1248,8 @@ public:
{
error_code ec;
ws.async_accept(handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1266,8 +1265,8 @@ public:
{
error_code ec;
ws.async_accept(buffers, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1280,8 +1279,8 @@ public:
{
error_code ec;
ws.async_accept(req, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1296,8 +1295,8 @@ public:
{
error_code ec;
ws.async_accept_ex(d, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1314,8 +1313,8 @@ public:
{
error_code ec;
ws.async_accept_ex(buffers, d, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1331,8 +1330,8 @@ public:
{
error_code ec;
ws.async_accept_ex(req, d, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1350,8 +1349,8 @@ public:
error_code ec;
ws.async_accept_ex(
req, buffers, d, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1367,8 +1366,8 @@ public:
error_code ec;
ws.async_handshake(
uri, path, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1384,8 +1383,8 @@ public:
error_code ec;
ws.async_handshake(
res, uri, path, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1403,8 +1402,8 @@ public:
error_code ec;
ws.async_handshake_ex(
uri, path, d, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1423,8 +1422,8 @@ public:
error_code ec;
ws.async_handshake_ex(
res, uri, path, d, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1437,8 +1436,8 @@ public:
{
error_code ec;
ws.async_ping(payload, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1451,8 +1450,8 @@ public:
{
error_code ec;
ws.async_pong(payload, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1465,8 +1464,8 @@ public:
{
error_code ec;
ws.async_close(cr, handler(ec));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
}
@ -1482,8 +1481,8 @@ public:
error_code ec;
std::size_t n;
ws.async_read(buffer, handler(ec, n));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
return n;
@ -1501,8 +1500,8 @@ public:
error_code ec;
std::size_t n;
ws.async_read_some(buffer, limit, handler(ec, n));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
return n;
@ -1519,8 +1518,8 @@ public:
error_code ec;
std::size_t n;
ws.async_read_some(buffers, handler(ec, n));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
return n;
@ -1537,8 +1536,8 @@ public:
error_code ec;
std::size_t n;
ws.async_write(buffers, handler(ec, n));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
return n;
@ -1556,8 +1555,8 @@ public:
error_code ec;
std::size_t n;
ws.async_write_some(fin, buffers, handler(ec, n));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
return n;
@ -1575,8 +1574,8 @@ public:
std::size_t n;
net::async_write(ws.next_layer(),
buffers, handler(ec, n));
ws.get_executor().context().run();
ws.get_executor().context().restart();
net::query(ws.get_executor(), net::execution::context).run();
net::query(ws.get_executor(), net::execution::context).restart();
if(ec)
throw system_error{ec};
return n;

View File

@ -38,7 +38,9 @@ namespace ssl = boost::asio::ssl;
using tcp = net::ip::tcp;
net::io_context ioc;
auto work = net::make_work_guard(ioc);
auto work = net::any_io_executor(
net::prefer(ioc.get_executor(),
net::execution::outstanding_work.tracked));
std::thread t{[&](){ ioc.run(); }};
error_code ec;

View File

@ -37,7 +37,10 @@ net::const_buffer get_next_chunk_body()
void fxx() {
net::io_context ioc;
auto work = net::make_work_guard(ioc);
auto work = net::any_io_executor(
net::prefer(
ioc.get_executor(),
net::execution::outstanding_work.tracked));
std::thread t{[&](){ ioc.run(); }};
net::ip::tcp::socket sock{ioc};

View File

@ -17,7 +17,10 @@ using tcp = net::ip::tcp;
error_code ec;
net::io_context ioc;
auto work = net::make_work_guard(ioc);
auto work = net::any_io_executor(
net::prefer(
ioc.get_executor(),
net::execution::outstanding_work.tracked));
std::thread t{[&](){ ioc.run(); }};
tcp::socket sock(ioc);

View File

@ -35,8 +35,8 @@ protected:
net::io_context ioc_;
private:
net::executor_work_guard<
net::io_context::executor_type> work_;
detail::select_work_guard_t<net::io_context::executor_type>
work_;
std::vector<std::thread> threads_;
std::mutex m_;
std::condition_variable cv_;
@ -49,7 +49,7 @@ public:
explicit
enable_yield_to(std::size_t concurrency = 1)
: work_(ioc_.get_executor())
: work_(detail::make_work_guard(ioc_.get_executor()))
{
threads_.reserve(concurrency);
while(concurrency--)
@ -136,4 +136,5 @@ spawn(F0&& f, FN&&... fn)
} // beast
} // boost
#endif