Add test::stream::service

This commit is contained in:
Vinnie Falco
2019-02-26 18:55:23 -08:00
parent ad48837872
commit c510662c67
6 changed files with 318 additions and 111 deletions

View File

@@ -1,3 +1,9 @@
Version 223:
* Add test::stream::service
--------------------------------------------------------------------------------
Version 222: Version 222:
* stream_base::timeout::suggested is a nested function * stream_base::timeout::suggested is a nested function

View File

@@ -13,7 +13,10 @@
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/buffer_size.hpp> #include <boost/beast/core/buffer_size.hpp>
#include <boost/beast/core/buffers_prefix.hpp> #include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/detail/service_base.hpp>
#include <mutex>
#include <stdexcept> #include <stdexcept>
#include <vector>
namespace boost { namespace boost {
namespace beast { namespace beast {
@@ -21,6 +24,41 @@ namespace test {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
struct stream::service_impl
{
std::mutex m_;
std::vector<state*> v_;
BOOST_BEAST_DECL
void
remove(state& impl);
};
class stream::service
: public beast::detail::service_base<service>
{
boost::shared_ptr<service_impl> sp_;
BOOST_BEAST_DECL
void
shutdown() override;
public:
BOOST_BEAST_DECL
explicit
service(net::execution_context& ctx);
BOOST_BEAST_DECL
static
auto
make_impl(
net::io_context& ctx,
test::fail_count* fc) ->
boost::shared_ptr<state>;
};
//------------------------------------------------------------------------------
template<class Handler, class Buffers> template<class Handler, class Buffers>
class stream::read_op : public stream::read_op_base class stream::read_op : public stream::read_op_base
{ {
@@ -32,7 +70,7 @@ class stream::read_op : public stream::read_op_base
struct lambda struct lambda
{ {
Handler h_; Handler h_;
state& s_; boost::weak_ptr<state> wp_;
Buffers b_; Buffers b_;
net::executor_work_guard<ex2_type> wg2_; net::executor_work_guard<ex2_type> wg2_;
@@ -42,13 +80,13 @@ class stream::read_op : public stream::read_op_base
template<class Handler_> template<class Handler_>
lambda( lambda(
Handler_&& h, Handler_&& h,
state& s, boost::shared_ptr<state> const& s,
Buffers const& b) Buffers const& b)
: h_(std::forward<Handler_>(h)) : h_(std::forward<Handler_>(h))
, s_(s) , wp_(s)
, b_(b) , b_(b)
, wg2_(net::get_associated_executor( , wg2_(net::get_associated_executor(
h_, s_.ioc.get_executor())) h_, s->ioc.get_executor()))
{ {
} }
@@ -56,16 +94,19 @@ class stream::read_op : public stream::read_op_base
operator()(error_code ec) operator()(error_code ec)
{ {
std::size_t bytes_transferred = 0; std::size_t bytes_transferred = 0;
if (!ec) auto sp = wp_.lock();
if(! sp)
ec = net::error::operation_aborted;
if(! ec)
{ {
std::lock_guard<std::mutex> lock(s_.m); std::lock_guard<std::mutex> lock(sp->m);
BOOST_ASSERT(! s_.op); BOOST_ASSERT(! sp->op);
if(s_.b.size() > 0) if(sp->b.size() > 0)
{ {
bytes_transferred = bytes_transferred =
net::buffer_copy( net::buffer_copy(
b_, s_.b.data(), s_.read_max); b_, sp->b.data(), sp->read_max);
s_.b.consume(bytes_transferred); sp->b.consume(bytes_transferred);
} }
else if (buffer_size(b_) > 0) else if (buffer_size(b_) > 0)
{ {
@@ -88,10 +129,10 @@ public:
template<class Handler_> template<class Handler_>
read_op( read_op(
Handler_&& h, Handler_&& h,
state& s, boost::shared_ptr<state> const& s,
Buffers const& b) Buffers const& b)
: fn_(std::forward<Handler_>(h), s, b) : fn_(std::forward<Handler_>(h), s, b)
, wg1_(s.ioc.get_executor()) , wg1_(s->ioc.get_executor())
{ {
} }
@@ -114,7 +155,7 @@ struct stream::run_read_op
void void
operator()( operator()(
ReadHandler&& h, ReadHandler&& h,
std::shared_ptr<state> const& in, boost::shared_ptr<state> const& in,
MutableBufferSequence const& buffers) MutableBufferSequence const& buffers)
{ {
// If you get an error on the following line it means // If you get an error on the following line it means
@@ -133,7 +174,7 @@ struct stream::run_read_op
typename std::decay<ReadHandler>::type, typename std::decay<ReadHandler>::type,
MutableBufferSequence>( MutableBufferSequence>(
std::move(h), std::move(h),
*in, in,
buffers)}, buffers)},
buffer_size(buffers)); buffer_size(buffers));
} }
@@ -147,8 +188,8 @@ struct stream::run_write_op
void void
operator()( operator()(
WriteHandler&& h, WriteHandler&& h,
std::shared_ptr<state> in_, boost::shared_ptr<state> in_,
std::weak_ptr<state> out_, boost::weak_ptr<state> out_,
ConstBufferSequence const& buffers) ConstBufferSequence const& buffers)
{ {
// If you get an error on the following line it means // If you get an error on the following line it means

View File

@@ -14,7 +14,9 @@
#include <boost/beast/core/bind_handler.hpp> #include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/buffer_size.hpp> #include <boost/beast/core/buffer_size.hpp>
#include <boost/beast/core/buffers_prefix.hpp> #include <boost/beast/core/buffers_prefix.hpp>
#include <boost/make_shared.hpp>
#include <stdexcept> #include <stdexcept>
#include <vector>
namespace boost { namespace boost {
namespace beast { namespace beast {
@@ -22,8 +24,61 @@ namespace test {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
stream::
service::
service(net::execution_context& ctx)
: beast::detail::service_base<service>(ctx)
, sp_(boost::make_shared<service_impl>())
{
}
void
stream::
service::
shutdown()
{
std::vector<std::unique_ptr<read_op_base>> v;
std::lock_guard<std::mutex> g1(sp_->m_);
v.reserve(sp_->v_.size());
for(auto p : sp_->v_)
{
std::lock_guard<std::mutex> g2(p->m);
v.emplace_back(std::move(p->op));
p->code = status::eof;
}
}
auto
stream::
service::
make_impl(
net::io_context& ctx,
test::fail_count* fc) ->
boost::shared_ptr<state>
{
auto& svc = net::use_service<service>(ctx);
auto sp = boost::make_shared<state>(ctx, svc.sp_, fc);
std::lock_guard<std::mutex> g(svc.sp_->m_);
svc.sp_->v_.push_back(sp.get());
return sp;
}
void
stream::
service_impl::
remove(state& impl)
{
std::lock_guard<std::mutex> g(m_);
*std::find(
v_.begin(), v_.end(),
&impl) = std::move(v_.back());
v_.pop_back();
}
//------------------------------------------------------------------------------
void stream::initiate_read( void stream::initiate_read(
std::shared_ptr<state> const& in_, boost::shared_ptr<state> const& in_,
std::unique_ptr<stream::read_op_base>&& op, std::unique_ptr<stream::read_op_base>&& op,
std::size_t buf_size) std::size_t buf_size)
{ {
@@ -67,8 +122,10 @@ stream::
state:: state::
state( state(
net::io_context& ioc_, net::io_context& ioc_,
boost::weak_ptr<service_impl> wp_,
fail_count* fc_) fail_count* fc_)
: ioc(ioc_) : ioc(ioc_)
, wp(std::move(wp_))
, fc(fc_) , fc(fc_)
{ {
} }
@@ -82,6 +139,20 @@ state::
(*op)(net::error::operation_aborted); (*op)(net::error::operation_aborted);
} }
void
stream::
state::
remove() noexcept
{
auto sp = wp.lock();
// If this goes off, it means the lifetime of a test::stream object
// extended beyond the lifetime of the associated execution context.
BOOST_ASSERT(sp);
sp->remove(*this);
}
void void
stream:: stream::
state:: state::
@@ -98,18 +169,34 @@ notify_read()
} }
} }
void
stream::
state::
cancel_read()
{
std::unique_ptr<read_op_base> p;
{
std::lock_guard<std::mutex> lock(m);
code = status::eof;
p = std::move(op);
}
if(p != nullptr)
(*p)(net::error::operation_aborted);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
stream:: stream::
~stream() ~stream()
{ {
close(); close();
in_->remove();
} }
stream:: stream::
stream(stream&& other) stream(stream&& other)
{ {
auto in = std::make_shared<state>( auto in = service::make_impl(
other.in_->ioc, other.in_->fc); other.in_->ioc, other.in_->fc);
in_ = std::move(other.in_); in_ = std::move(other.in_);
out_ = std::move(other.out_); out_ = std::move(other.out_);
@@ -121,8 +208,9 @@ stream::
operator=(stream&& other) operator=(stream&& other)
{ {
close(); close();
auto in = std::make_shared<state>( auto in = service::make_impl(
other.in_->ioc, other.in_->fc); other.in_->ioc, other.in_->fc);
in_->remove();
in_ = std::move(other.in_); in_ = std::move(other.in_);
out_ = std::move(other.out_); out_ = std::move(other.out_);
other.in_ = in; other.in_ = in;
@@ -133,7 +221,7 @@ operator=(stream&& other)
stream:: stream::
stream(net::io_context& ioc) stream(net::io_context& ioc)
: in_(std::make_shared<state>(ioc, nullptr)) : in_(service::make_impl(ioc, nullptr))
{ {
} }
@@ -141,7 +229,7 @@ stream::
stream( stream(
net::io_context& ioc, net::io_context& ioc,
fail_count& fc) fail_count& fc)
: in_(std::make_shared<state>(ioc, &fc)) : in_(service::make_impl(ioc, &fc))
{ {
} }
@@ -149,7 +237,7 @@ stream::
stream( stream(
net::io_context& ioc, net::io_context& ioc,
string_view s) string_view s)
: in_(std::make_shared<state>(ioc, nullptr)) : in_(service::make_impl(ioc, nullptr))
{ {
in_->b.commit(net::buffer_copy( in_->b.commit(net::buffer_copy(
in_->b.prepare(s.size()), in_->b.prepare(s.size()),
@@ -161,7 +249,7 @@ stream(
net::io_context& ioc, net::io_context& ioc,
fail_count& fc, fail_count& fc,
string_view s) string_view s)
: in_(std::make_shared<state>(ioc, &fc)) : in_(service::make_impl(ioc, &fc))
{ {
in_->b.commit(net::buffer_copy( in_->b.commit(net::buffer_copy(
in_->b.prepare(s.size()), in_->b.prepare(s.size()),
@@ -213,17 +301,7 @@ void
stream:: stream::
close() close()
{ {
// cancel outstanding read in_->cancel_read();
{
std::unique_ptr<read_op_base> op;
{
std::lock_guard<std::mutex> lock(in_->m);
in_->code = status::eof;
op = std::move(in_->op);
}
if(op != nullptr)
(*op)(net::error::operation_aborted);
}
// disconnect // disconnect
{ {

View File

@@ -23,6 +23,8 @@
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include <condition_variable> #include <condition_variable>
#include <limits> #include <limits>
@@ -98,8 +100,8 @@ class stream
{ {
struct state; struct state;
std::shared_ptr<state> in_; boost::shared_ptr<state> in_;
std::weak_ptr<state> out_; boost::weak_ptr<state> out_;
enum class status enum class status
{ {
@@ -107,6 +109,9 @@ class stream
eof, eof,
}; };
class service;
struct service_impl;
struct read_op_base struct read_op_base
{ {
virtual ~read_op_base() = default; virtual ~read_op_base() = default;
@@ -117,11 +122,12 @@ class stream
{ {
friend class stream; friend class stream;
net::io_context& ioc;
boost::weak_ptr<service_impl> wp;
std::mutex m; std::mutex m;
flat_buffer b; flat_buffer b;
std::condition_variable cv; std::condition_variable cv;
std::unique_ptr<read_op_base> op; std::unique_ptr<read_op_base> op;
net::io_context& ioc;
status code = status::ok; status code = status::ok;
fail_count* fc = nullptr; fail_count* fc = nullptr;
std::size_t nread = 0; std::size_t nread = 0;
@@ -132,15 +138,26 @@ class stream
(std::numeric_limits<std::size_t>::max)(); (std::numeric_limits<std::size_t>::max)();
BOOST_BEAST_DECL BOOST_BEAST_DECL
explicit state(
state(net::io_context& ioc_, fail_count* fc_); net::io_context& ioc_,
boost::weak_ptr<service_impl> wp_,
fail_count* fc_);
BOOST_BEAST_DECL BOOST_BEAST_DECL
~state(); ~state();
BOOST_BEAST_DECL
void
remove() noexcept;
BOOST_BEAST_DECL BOOST_BEAST_DECL
void void
notify_read(); notify_read();
BOOST_BEAST_DECL
void
cancel_read();
}; };
template<class Handler, class Buffers> template<class Handler, class Buffers>
@@ -153,7 +170,7 @@ class stream
static static
void void
initiate_read( initiate_read(
std::shared_ptr<state> const& in, boost::shared_ptr<state> const& in,
std::unique_ptr<read_op_base>&& op, std::unique_ptr<read_op_base>&& op,
std::size_t buf_size); std::size_t buf_size);

View File

@@ -0,0 +1,43 @@
//
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_DETAIL_SERVICE_BASE_HPP
#define BOOST_BEAST_DETAIL_SERVICE_BASE_HPP
#include <boost/asio/execution_context.hpp>
namespace boost {
namespace beast {
namespace detail {
template<class T>
struct service_id : net::execution_context::id
{
};
template<class T>
struct service_base : net::execution_context::service
{
static service_id<T> id;
explicit
service_base(net::execution_context& ctx)
: net::execution_context::service(ctx)
{
}
};
template<class T>
service_id<T> service_base<T>::id;
} // detail
} // beast
} // boost
#endif

View File

@@ -1,4 +1,4 @@
// //
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
@@ -11,23 +11,25 @@
#include <boost/beast/_experimental/test/stream.hpp> #include <boost/beast/_experimental/test/stream.hpp>
#include <boost/beast/_experimental/unit_test/suite.hpp> #include <boost/beast/_experimental/unit_test/suite.hpp>
#include <boost/beast/_experimental/test/handler.hpp>
namespace boost { namespace boost {
namespace beast { namespace beast {
class test_stream_test class stream_test
: public unit_test::suite : public unit_test::suite
{ {
public: public:
void void
testTestStream() testTestStream()
{ {
net::io_context ioc;
char buf[1] = {}; char buf[1] = {};
net::mutable_buffer m0; net::mutable_buffer m0;
net::mutable_buffer m1(buf, sizeof(buf)); net::mutable_buffer m1(buf, sizeof(buf));
{ {
net::io_context ioc;
{ {
test::stream ts(ioc); test::stream ts(ioc);
} }
@@ -44,91 +46,111 @@ public:
auto t2 = connect(t1); auto t2 = connect(t1);
t2.close(); t2.close();
} }
{
#if 0
test::stream ts(ioc);
error_code ec;
ts.read_some(net::mutable_buffer{}, ec);
log << ec.message();
#endif
} }
{ {
error_code ec; // abandon
net::io_context ioc;
test::stream ts(ioc);
ts.async_read_some(m1,
[](error_code, std::size_t)
{
BEAST_FAIL();
});
}
//---
{
net::io_context ioc;
{ {
test::stream ts(ioc); test::stream ts(ioc);
ts.async_read_some(m1, ts.async_read_some(m1,
[&](error_code ec_, std::size_t) test::fail_handler(
{ net::error::operation_aborted));
ec = ec_;
});
} }
ioc.run(); test::run(ioc);
ioc.restart();
BEAST_EXPECTS(
//ec == net::error::eof,
ec == net::error::operation_aborted,
ec.message());
} }
{ {
error_code ec; net::io_context ioc;
test::stream ts(ioc); test::stream ts(ioc);
ts.async_read_some(m1, ts.async_read_some(m1,
[&](error_code ec_, std::size_t) test::fail_handler(
{ net::error::operation_aborted));
ec = ec_;
});
ts.close(); ts.close();
ioc.run(); test::run(ioc);
ioc.restart();
BEAST_EXPECTS(
//ec == net::error::eof,
ec == net::error::operation_aborted,
ec.message());
} }
{ {
error_code ec; net::io_context ioc;
test::stream t1(ioc); test::stream t1(ioc);
auto t2 = connect(t1); auto t2 = connect(t1);
t1.async_read_some(m1, t1.async_read_some(m1,
[&](error_code ec_, std::size_t) test::fail_handler(
{ net::error::eof));
ec = ec_;
});
t2.close(); t2.close();
ioc.run(); test::run(ioc);
ioc.restart();
BEAST_EXPECTS(
ec == net::error::eof,
ec.message());
} }
{ {
error_code ec; net::io_context ioc;
test::stream t1(ioc); test::stream t1(ioc);
auto t2 = connect(t1); auto t2 = connect(t1);
t1.async_read_some(m1, t1.async_read_some(m1,
[&](error_code ec_, std::size_t) test::fail_handler(
{ net::error::operation_aborted));
ec = ec_;
});
t1.close(); t1.close();
ioc.run(); test::run(ioc);
ioc.restart();
BEAST_EXPECTS(
ec == net::error::operation_aborted,
ec.message());
} }
} }
void
testSharedAbandon()
{
struct handler
{
std::shared_ptr<test::stream> ts_;
void
operator()(error_code, std::size_t)
{
}
};
char buf[1] = {};
net::mutable_buffer m1(buf, sizeof(buf));
std::weak_ptr<test::stream> wp;
{
net::io_context ioc;
{
auto sp = std::make_shared<test::stream>(ioc);
sp->async_read_some(m1, handler{sp});
wp = sp;
}
}
BEAST_EXPECT(! wp.lock());
}
void
testLifetimeViolation()
{
// This should assert
std::shared_ptr<test::stream> sp;
{
net::io_context ioc;
sp = std::make_shared<test::stream>(ioc);
}
sp.reset();
} }
void void
run() override run() override
{ {
testTestStream(); testTestStream();
pass(); testSharedAbandon();
//testLifetimeViolation();
} }
}; };
BEAST_DEFINE_TESTSUITE(beast,test,test_stream); BEAST_DEFINE_TESTSUITE(beast,test,stream);
} // beast } // beast
} // boost } // boost