Add executor rebind to test::stream

closes #2139
This commit is contained in:
Richard Hodges
2020-12-11 12:03:34 +01:00
parent a87330e53f
commit 8913a3cd21
13 changed files with 619 additions and 400 deletions
@@ -27,53 +27,19 @@ namespace test {
//------------------------------------------------------------------------------
struct stream::service_impl
{
std::mutex m_;
std::vector<state*> v_;
BOOST_BEAST_DECL
void
remove(state& impl);
};
class stream::service
: public beast::detail::service_base<service>
{
boost::shared_ptr<service_impl> sp_;
BOOST_BEAST_DECL
void
shutdown() override;
public:
BOOST_BEAST_DECL
explicit
service(net::execution_context& ctx);
BOOST_BEAST_DECL
static
auto
make_impl(
net::io_context& ctx,
test::fail_count* fc) ->
boost::shared_ptr<state>;
};
//------------------------------------------------------------------------------
template<class Executor>
template<class Handler, class Buffers>
class stream::read_op : public stream::read_op_base
class basic_stream<Executor>::read_op : public detail::stream_read_op_base
{
using ex1_type =
net::io_context::executor_type;
executor_type;
using ex2_type
= net::associated_executor_t<Handler, ex1_type>;
struct lambda
{
Handler h_;
boost::weak_ptr<state> wp_;
boost::weak_ptr<detail::stream_state> wp_;
Buffers b_;
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::any_io_executor wg2_;
@@ -87,7 +53,7 @@ class stream::read_op : public stream::read_op_base
template<class Handler_>
lambda(
Handler_&& h,
boost::shared_ptr<state> const& s,
boost::shared_ptr<detail::stream_state> const& s,
Buffers const& b)
: h_(std::forward<Handler_>(h))
, wp_(s)
@@ -95,11 +61,11 @@ class stream::read_op : public stream::read_op_base
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
, wg2_(net::prefer(
net::get_associated_executor(
h_, s->ioc.get_executor()),
h_, s->exec),
net::execution::outstanding_work.tracked))
#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
, wg2_(net::get_associated_executor(
h_, s->ioc.get_executor()))
h_, s->exec))
#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
{
}
@@ -151,43 +117,44 @@ class stream::read_op : public stream::read_op_base
};
lambda fn_;
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
net::executor_work_guard<net::any_io_executor> wg1_;
#else
net::any_io_executor wg1_;
#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::executor_work_guard<ex1_type> wg1_;
#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
#endif
public:
template<class Handler_>
read_op(
Handler_&& h,
boost::shared_ptr<state> const& s,
boost::shared_ptr<detail::stream_state> const& s,
Buffers const& b)
: fn_(std::forward<Handler_>(h), s, b)
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
, wg1_(net::prefer(s->ioc.get_executor(),
#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
, wg1_(s->exec)
#else
, wg1_(net::prefer(s->exec,
net::execution::outstanding_work.tracked))
#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
, wg1_(s->ioc.get_executor())
#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
#endif
{
}
void
operator()(error_code ec) override
{
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
net::post(wg1_, beast::bind_front_handler(std::move(fn_), ec));
wg1_ = net::any_io_executor(); // probably unnecessary
#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
net::post(wg1_.get_executor(),
beast::bind_front_handler(std::move(fn_), ec));
wg1_.reset();
#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
#else
net::post(wg1_, beast::bind_front_handler(std::move(fn_), ec));
wg1_ = net::any_io_executor(); // probably unnecessary
#endif
}
};
struct stream::run_read_op
template<class Executor>
struct basic_stream<Executor>::run_read_op
{
template<
class ReadHandler,
@@ -195,7 +162,7 @@ struct stream::run_read_op
void
operator()(
ReadHandler&& h,
boost::shared_ptr<state> const& in,
boost::shared_ptr<detail::stream_state> const& in,
MutableBufferSequence const& buffers)
{
// If you get an error on the following line it means
@@ -209,7 +176,7 @@ struct stream::run_read_op
initiate_read(
in,
std::unique_ptr<read_op_base>{
std::unique_ptr<detail::stream_read_op_base>{
new read_op<
typename std::decay<ReadHandler>::type,
MutableBufferSequence>(
@@ -220,7 +187,8 @@ struct stream::run_read_op
}
};
struct stream::run_write_op
template<class Executor>
struct basic_stream<Executor>::run_write_op
{
template<
class WriteHandler,
@@ -228,8 +196,8 @@ struct stream::run_write_op
void
operator()(
WriteHandler&& h,
boost::shared_ptr<state> in_,
boost::weak_ptr<state> out_,
boost::shared_ptr<detail::stream_state> in_,
boost::weak_ptr<detail::stream_state> out_,
ConstBufferSequence const& buffers)
{
// If you get an error on the following line it means
@@ -245,7 +213,7 @@ struct stream::run_write_op
auto const upcall = [&](error_code ec, std::size_t n)
{
net::post(
in_->ioc.get_executor(),
in_->exec,
beast::bind_front_handler(std::move(h), ec, n));
};
@@ -281,9 +249,10 @@ struct stream::run_write_op
//------------------------------------------------------------------------------
template<class Executor>
template<class MutableBufferSequence>
std::size_t
stream::
basic_stream<Executor>::
read_some(MutableBufferSequence const& buffers)
{
static_assert(net::is_mutable_buffer_sequence<
@@ -296,9 +265,10 @@ read_some(MutableBufferSequence const& buffers)
return n;
}
template<class Executor>
template<class MutableBufferSequence>
std::size_t
stream::
basic_stream<Executor>::
read_some(MutableBufferSequence const& buffers,
error_code& ec)
{
@@ -326,7 +296,7 @@ read_some(MutableBufferSequence const& buffers,
{
return
in_->b.size() > 0 ||
in_->code != status::ok;
in_->code != detail::stream_status::ok;
});
// deliver bytes before eof
@@ -340,14 +310,16 @@ read_some(MutableBufferSequence const& buffers,
}
// deliver error
BOOST_ASSERT(in_->code != status::ok);
BOOST_ASSERT(in_->code != detail::stream_status::ok);
ec = net::error::eof;
return 0;
}
template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
stream::
template<class Executor>
template<class MutableBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, void(error_code, std::size_t))
basic_stream<Executor>::
async_read_some(
MutableBufferSequence const& buffers,
ReadHandler&& handler)
@@ -365,9 +337,10 @@ async_read_some(
buffers);
}
template<class Executor>
template<class ConstBufferSequence>
std::size_t
stream::
basic_stream<Executor>::
write_some(ConstBufferSequence const& buffers)
{
static_assert(net::is_const_buffer_sequence<
@@ -381,9 +354,10 @@ write_some(ConstBufferSequence const& buffers)
return bytes_transferred;
}
template<class Executor>
template<class ConstBufferSequence>
std::size_t
stream::
basic_stream<Executor>::
write_some(
ConstBufferSequence const& buffers, error_code& ec)
{
@@ -425,9 +399,11 @@ write_some(
return n;
}
template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
stream::
template<class Executor>
template<class ConstBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) WriteHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(WriteHandler, void(error_code, std::size_t))
basic_stream<Executor>::
async_write_some(
ConstBufferSequence const& buffers,
WriteHandler&& handler)
@@ -448,11 +424,11 @@ async_write_some(
//------------------------------------------------------------------------------
template<class TeardownHandler>
template<class Executor, class TeardownHandler>
void
async_teardown(
role_type,
stream& s,
basic_stream<Executor>& s,
TeardownHandler&& handler)
{
error_code ec;
@@ -477,8 +453,8 @@ async_teardown(
//------------------------------------------------------------------------------
template<class Arg1, class... ArgN>
stream
template<class Executor, class Arg1, class... ArgN>
basic_stream<Executor>
connect(stream& to, Arg1&& arg1, ArgN&&... argn)
{
stream from{
@@ -488,6 +464,34 @@ connect(stream& to, Arg1&& arg1, ArgN&&... argn)
return from;
}
namespace detail
{
template<class To>
struct extract_executor_op
{
To operator()(net::any_io_executor& ex) const
{
assert(ex.template target<To>());
return *ex.template target<To>();
}
};
template<>
struct extract_executor_op<net::any_io_executor>
{
net::any_io_executor operator()(net::any_io_executor& ex) const
{
return ex;
}
};
}
template<class Executor>
auto basic_stream<Executor>::get_executor() noexcept -> executor_type
{
return detail::extract_executor_op<Executor>()(in_->exec);
}
} // test
} // beast
} // boost
@@ -23,62 +23,10 @@ namespace test {
//------------------------------------------------------------------------------
stream::
service::
service(net::execution_context& ctx)
: beast::detail::service_base<service>(ctx)
, sp_(boost::make_shared<service_impl>())
{
}
void
stream::
service::
shutdown()
{
std::vector<std::unique_ptr<read_op_base>> v;
std::lock_guard<std::mutex> g1(sp_->m_);
v.reserve(sp_->v_.size());
for(auto p : sp_->v_)
{
std::lock_guard<std::mutex> g2(p->m);
v.emplace_back(std::move(p->op));
p->code = status::eof;
}
}
auto
stream::
service::
make_impl(
net::io_context& ctx,
test::fail_count* fc) ->
boost::shared_ptr<state>
{
auto& svc = net::use_service<service>(ctx);
auto sp = boost::make_shared<state>(ctx, svc.sp_, fc);
std::lock_guard<std::mutex> g(svc.sp_->m_);
svc.sp_->v_.push_back(sp.get());
return sp;
}
void
stream::
service_impl::
remove(state& impl)
{
std::lock_guard<std::mutex> g(m_);
*std::find(
v_.begin(), v_.end(),
&impl) = std::move(v_.back());
v_.pop_back();
}
//------------------------------------------------------------------------------
void stream::initiate_read(
boost::shared_ptr<state> const& in_,
std::unique_ptr<stream::read_op_base>&& op,
template<class Executor>
void basic_stream<Executor>::initiate_read(
boost::shared_ptr<detail::stream_state> const& in_,
std::unique_ptr<detail::stream_read_op_base>&& op,
std::size_t buf_size)
{
std::unique_lock<std::mutex> lock(in_->m);
@@ -106,7 +54,7 @@ void stream::initiate_read(
}
// deliver error
if(in_->code != status::ok)
if(in_->code != detail::stream_status::ok)
{
lock.unlock();
(*op)(net::error::eof);
@@ -117,98 +65,36 @@ void stream::initiate_read(
in_->op = std::move(op);
}
stream::
state::
state(
net::io_context& ioc_,
boost::weak_ptr<service_impl> wp_,
fail_count* fc_)
: ioc(ioc_)
, wp(std::move(wp_))
, fc(fc_)
{
}
stream::
state::
~state()
{
// cancel outstanding read
if(op != nullptr)
(*op)(net::error::operation_aborted);
}
void
stream::
state::
remove() noexcept
{
auto sp = wp.lock();
// If this goes off, it means the lifetime of a test::stream object
// extended beyond the lifetime of the associated execution context.
BOOST_ASSERT(sp);
sp->remove(*this);
}
void
stream::
state::
notify_read()
{
if(op)
{
auto op_ = std::move(op);
op_->operator()(error_code{});
}
else
{
cv.notify_all();
}
}
void
stream::
state::
cancel_read()
{
std::unique_ptr<read_op_base> p;
{
std::lock_guard<std::mutex> lock(m);
code = status::eof;
p = std::move(op);
}
if(p != nullptr)
(*p)(net::error::operation_aborted);
}
//------------------------------------------------------------------------------
stream::
~stream()
template<class Executor>
basic_stream<Executor>::
~basic_stream()
{
close();
in_->remove();
}
stream::
stream(stream&& other)
template<class Executor>
basic_stream<Executor>::
basic_stream(basic_stream&& other)
{
auto in = service::make_impl(
other.in_->ioc, other.in_->fc);
auto in = detail::stream_service::make_impl(
other.in_->exec, other.in_->fc);
in_ = std::move(other.in_);
out_ = std::move(other.out_);
other.in_ = in;
}
stream&
stream::
operator=(stream&& other)
template<class Executor>
basic_stream<Executor>&
basic_stream<Executor>::
operator=(basic_stream&& other)
{
close();
auto in = service::make_impl(
other.in_->ioc, other.in_->fc);
auto in = detail::stream_service::make_impl(
other.in_->exec, other.in_->fc);
in_->remove();
in_ = std::move(other.in_);
out_ = std::move(other.out_);
@@ -218,46 +104,51 @@ operator=(stream&& other)
//------------------------------------------------------------------------------
stream::
stream(net::io_context& ioc)
: in_(service::make_impl(ioc, nullptr))
template<class Executor>
basic_stream<Executor>::
basic_stream(executor_type exec)
: in_(detail::stream_service::make_impl(std::move(exec), nullptr))
{
}
stream::
stream(
template<class Executor>
basic_stream<Executor>::
basic_stream(
net::io_context& ioc,
fail_count& fc)
: in_(service::make_impl(ioc, &fc))
: in_(detail::stream_service::make_impl(ioc.get_executor(), &fc))
{
}
stream::
stream(
template<class Executor>
basic_stream<Executor>::
basic_stream(
net::io_context& ioc,
string_view s)
: in_(service::make_impl(ioc, nullptr))
: in_(detail::stream_service::make_impl(ioc.get_executor(), nullptr))
{
in_->b.commit(net::buffer_copy(
in_->b.prepare(s.size()),
net::buffer(s.data(), s.size())));
}
stream::
stream(
template<class Executor>
basic_stream<Executor>::
basic_stream(
net::io_context& ioc,
fail_count& fc,
string_view s)
: in_(service::make_impl(ioc, &fc))
: in_(detail::stream_service::make_impl(ioc.get_executor(), &fc))
{
in_->b.commit(net::buffer_copy(
in_->b.prepare(s.size()),
net::buffer(s.data(), s.size())));
}
template<class Executor>
void
stream::
connect(stream& remote)
basic_stream<Executor>::
connect(basic_stream& remote)
{
BOOST_ASSERT(! out_.lock());
BOOST_ASSERT(! remote.out_.lock());
@@ -266,12 +157,13 @@ connect(stream& remote)
std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock};
out_ = remote.in_;
remote.out_ = in_;
in_->code = status::ok;
remote.in_->code = status::ok;
in_->code = detail::stream_status::ok;
remote.in_->code = detail::stream_status::ok;
}
template<class Executor>
string_view
stream::
basic_stream<Executor>::
str() const
{
auto const bs = in_->b.data();
@@ -281,8 +173,9 @@ str() const
return {static_cast<char const*>(b.data()), b.size()};
}
template<class Executor>
void
stream::
basic_stream<Executor>::
append(string_view s)
{
std::lock_guard<std::mutex> lock{in_->m};
@@ -291,16 +184,18 @@ append(string_view s)
net::buffer(s.data(), s.size())));
}
template<class Executor>
void
stream::
basic_stream<Executor>::
clear()
{
std::lock_guard<std::mutex> lock{in_->m};
in_->b.consume(in_->b.size());
}
template<class Executor>
void
stream::
basic_stream<Executor>::
close()
{
in_->cancel_read();
@@ -314,31 +209,33 @@ close()
if(out)
{
std::lock_guard<std::mutex> lock(out->m);
if(out->code == status::ok)
if(out->code == detail::stream_status::ok)
{
out->code = status::eof;
out->code = detail::stream_status::eof;
out->notify_read();
}
}
}
}
template<class Executor>
void
stream::
basic_stream<Executor>::
close_remote()
{
std::lock_guard<std::mutex> lock{in_->m};
if(in_->code == status::ok)
if(in_->code == detail::stream_status::ok)
{
in_->code = status::eof;
in_->code = detail::stream_status::eof;
in_->notify_read();
}
}
template<class Executor>
void
teardown(
role_type,
stream& s,
basic_stream<Executor>& s,
boost::system::error_code& ec)
{
if( s.in_->fc &&
@@ -356,20 +253,18 @@ teardown(
//------------------------------------------------------------------------------
stream
connect(stream& to)
template<class Executor>
basic_stream<Executor>
connect(basic_stream<Executor>& to)
{
#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
stream from{net::query(to.get_executor(), net::execution::context)};
#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
stream from{to.get_executor().context()};
#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
basic_stream<Executor> from(to.get_executor());
from.connect(to);
return from;
}
template<class Executor>
void
connect(stream& s1, stream& s2)
connect(basic_stream<Executor>& s1, basic_stream<Executor>& s2)
{
s1.connect(s2);
}