Improvements to test::stream:

The behavior of the test stream when either
end is destroyed or closed is well-defined.
This commit is contained in:
Vinnie Falco
2019-02-06 14:40:48 -08:00
parent a142082cf2
commit 812a19706a
4 changed files with 423 additions and 245 deletions

View File

@@ -1,6 +1,7 @@
Version 211:
* close_socket is in stream_traits.hpp
* Improvements to test::stream
--------------------------------------------------------------------------------

View File

@@ -10,30 +10,146 @@
#ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
#define BOOST_BEAST_TEST_IMPL_STREAM_HPP
#include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/buffer_size.hpp>
#include <boost/beast/core/buffers_prefix.hpp>
#include <stdexcept>
namespace boost {
namespace beast {
namespace test {
//------------------------------------------------------------------------------
template<class Handler, class Buffers>
class stream::read_op : public stream::read_op_base
{
using ex1_type =
net::io_context::executor_type;
using ex2_type
= net::associated_executor_t<Handler, ex1_type>;
class lambda
{
state& s_;
Buffers b_;
Handler h_;
net::executor_work_guard<ex2_type> wg2_;
public:
lambda(lambda&&) = default;
lambda(lambda const&) = default;
template<class DeducedHandler>
lambda(state& s, Buffers const& b, DeducedHandler&& h)
: s_(s)
, b_(b)
, h_(std::forward<DeducedHandler>(h))
, wg2_(net::get_associated_executor(
h_, s_.ioc.get_executor()))
{
}
void
operator()(bool cancel)
{
error_code ec;
std::size_t bytes_transferred = 0;
if(cancel)
{
ec = net::error::operation_aborted;
}
else
{
std::lock_guard<std::mutex> lock(s_.m);
BOOST_ASSERT(! s_.op);
if(s_.b.size() > 0)
{
bytes_transferred =
net::buffer_copy(
b_, s_.b.data(), s_.read_max);
s_.b.consume(bytes_transferred);
}
else
{
ec = net::error::eof;
}
}
auto alloc = net::get_associated_allocator(h_);
wg2_.get_executor().dispatch(
beast::bind_front_handler(std::move(h_),
ec, bytes_transferred), alloc);
wg2_.reset();
}
};
lambda fn_;
net::executor_work_guard<ex1_type> wg1_;
public:
template<class Handler_>
read_op(state& s,
Buffers const& b, Handler_&& h)
: fn_(s, b, std::forward<Handler_>(h))
, wg1_(s.ioc.get_executor())
{
}
void
operator()(bool cancel) override
{
net::post(
wg1_.get_executor(),
bind_handler(
std::move(fn_),
cancel));
wg1_.reset();
}
};
//------------------------------------------------------------------------------
stream::
state::
state(
net::io_context& ioc_,
fail_count* fc_)
: ioc(ioc_)
, fc(fc_)
{
}
stream::
state::
~state()
{
// cancel outstanding read
if(op != nullptr)
(*op)(true);
}
void
stream::
state::
notify_read()
{
if(op)
{
auto op_ = std::move(op);
op_->operator()();
}
else
{
cv.notify_all();
}
}
//------------------------------------------------------------------------------
stream::
~stream()
{
{
std::unique_lock<std::mutex> lock{in_->m};
in_->op.reset();
}
auto out = out_.lock();
if(out)
{
std::unique_lock<std::mutex> lock{out->m};
if(out->code == status::ok)
{
out->code = status::reset;
out->on_write();
}
}
close();
}
stream::
@@ -50,6 +166,7 @@ stream&
stream::
operator=(stream&& other)
{
close();
auto in = std::make_shared<state>(
other.in_->ioc, other.in_->fc);
in_ = std::move(other.in_);
@@ -58,6 +175,8 @@ operator=(stream&& other)
return *this;
}
//------------------------------------------------------------------------------
stream::
stream(net::io_context& ioc)
: in_(std::make_shared<state>(ioc, nullptr))
@@ -103,6 +222,8 @@ connect(stream& remote)
BOOST_ASSERT(! remote.out_.lock());
out_ = remote.in_;
remote.out_ = in_;
in_->code = status::ok;
remote.in_->code = status::ok;
}
string_view
@@ -138,15 +259,33 @@ void
stream::
close()
{
BOOST_ASSERT(! in_->op);
auto out = out_.lock();
if(! out)
return;
std::lock_guard<std::mutex> lock{out->m};
if(out->code == status::ok)
// cancel outstanding read
{
out->code = status::eof;
out->on_write();
std::unique_ptr<read_op_base> op;
{
std::lock_guard<std::mutex> lock(in_->m);
in_->code = status::eof;
op = std::move(in_->op);
}
if(op != nullptr)
(*op)(true);
}
// disconnect
{
auto out = out_.lock();
out_.reset();
// notify peer
if(out)
{
std::lock_guard<std::mutex> lock(out->m);
if(out->code == status::ok)
{
out->code = status::eof;
out->notify_read();
}
}
}
}
@@ -158,7 +297,7 @@ close_remote()
if(in_->code == status::ok)
{
in_->code = status::eof;
in_->on_write();
in_->notify_read();
}
}
@@ -186,13 +325,20 @@ read_some(MutableBufferSequence const& buffers,
static_assert(net::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
++in_->nread;
// test failure
if(in_->fc && in_->fc->fail(ec))
return 0;
// 0-byte reads are no-ops
if(buffer_size(buffers) == 0)
{
ec.clear();
return 0;
}
std::unique_lock<std::mutex> lock{in_->m};
BOOST_ASSERT(! in_->op);
in_->cv.wait(lock,
@@ -202,25 +348,20 @@ read_some(MutableBufferSequence const& buffers,
in_->b.size() > 0 ||
in_->code != status::ok;
});
std::size_t bytes_transferred;
// deliver bytes before eof
if(in_->b.size() > 0)
{
ec = {};
bytes_transferred = net::buffer_copy(
auto const n = net::buffer_copy(
buffers, in_->b.data(), in_->read_max);
in_->b.consume(bytes_transferred);
in_->b.consume(n);
return n;
}
else
{
BOOST_ASSERT(in_->code != status::ok);
bytes_transferred = 0;
if(in_->code == status::eof)
ec = net::error::eof;
else if(in_->code == status::reset)
ec = net::error::connection_reset;
}
++in_->nread;
return bytes_transferred;
// deliver error
BOOST_ASSERT(in_->code != status::ok);
ec = net::error::eof;
return 0;
}
template<class MutableBufferSequence, class ReadHandler>
@@ -234,8 +375,18 @@ async_read_some(
static_assert(net::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
BOOST_BEAST_HANDLER_INIT(
ReadHandler, void(error_code, std::size_t));
++in_->nread;
std::unique_lock<std::mutex> lock(in_->m);
if(in_->op != nullptr)
throw std::logic_error(
"in_->op != nullptr");
// test failure
error_code ec;
if(in_->fc && in_->fc->fail(ec))
{
@@ -243,51 +394,55 @@ async_read_some(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
ec,
0));
ec, std::size_t{0}));
return init.result.get();
}
else
// 0-byte reads are no-ops
if(buffer_size(buffers) == 0)
{
std::unique_lock<std::mutex> lock{in_->m};
BOOST_ASSERT(! in_->op);
if( buffer_size(buffers) == 0 ||
buffer_size(in_->b.data()) > 0)
{
auto const bytes_transferred = net::buffer_copy(
buffers, in_->b.data(), in_->read_max);
in_->b.consume(bytes_transferred);
lock.unlock();
++in_->nread;
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
error_code{},
bytes_transferred));
}
else if(in_->code != status::ok)
{
lock.unlock();
++in_->nread;
if(in_->code == status::eof)
ec = net::error::eof;
else if(in_->code == status::reset)
ec = net::error::connection_reset;
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
ec,
0));
}
else
{
in_->op.reset(new read_op<BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t)),
MutableBufferSequence>{*in_, buffers,
std::move(init.completion_handler)});
}
lock.unlock();
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
ec, std::size_t{0}));
return init.result.get();
}
// deliver bytes before eof
if(buffer_size(in_->b.data()) > 0)
{
auto n = net::buffer_copy(
buffers, in_->b.data(), in_->read_max);
in_->b.consume(n);
lock.unlock();
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
ec, n));
return init.result.get();
}
// deliver error
if(in_->code != status::ok)
{
lock.unlock();
ec = net::error::eof;
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
ec, std::size_t{0}));
return init.result.get();
}
// complete when bytes available or closed
in_->op.reset(new read_op<BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t)),
MutableBufferSequence>{*in_, buffers,
std::move(init.completion_handler)});
return init.result.get();
}
@@ -316,26 +471,31 @@ write_some(
static_assert(net::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
++in_->nwrite;
// connection closed
auto out = out_.lock();
if(! out)
{
ec = net::error::connection_reset;
return 0;
}
BOOST_ASSERT(out->code == status::ok);
// test failure
if(in_->fc && in_->fc->fail(ec))
return 0;
auto const n = std::min<std::size_t>(
// copy buffers
auto n = std::min<std::size_t>(
buffer_size(buffers), in_->write_max);
std::unique_lock<std::mutex> lock{out->m};
auto const bytes_transferred =
net::buffer_copy(out->b.prepare(n), buffers);
out->b.commit(bytes_transferred);
out->on_write();
lock.unlock();
++in_->nwrite;
ec = {};
return bytes_transferred;
{
std::lock_guard<std::mutex> lock(out->m);
n = net::buffer_copy(out->b.prepare(n), buffers);
out->b.commit(n);
out->notify_read();
}
return n;
}
template<class ConstBufferSequence, class WriteHandler>
@@ -350,6 +510,10 @@ async_write_some(ConstBufferSequence const& buffers,
"ConstBufferSequence requirements not met");
BOOST_BEAST_HANDLER_INIT(
WriteHandler, void(error_code, std::size_t));
++in_->nwrite;
// connection closed
auto out = out_.lock();
if(! out)
{
@@ -358,49 +522,46 @@ async_write_some(ConstBufferSequence const& buffers,
beast::bind_front_handler(
std::move(init.completion_handler),
net::error::connection_reset,
0));
std::size_t{0}));
return init.result.get();
}
else
// test failure
error_code ec;
if(in_->fc && in_->fc->fail(ec))
{
BOOST_ASSERT(out->code == status::ok);
error_code ec;
if(in_->fc && in_->fc->fail(ec))
{
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
ec,
0));
}
else
{
auto const n = std::min<std::size_t>(
buffer_size(buffers), in_->write_max);
std::unique_lock<std::mutex> lock{out->m};
auto const bytes_transferred =
net::buffer_copy(out->b.prepare(n), buffers);
out->b.commit(bytes_transferred);
out->on_write();
lock.unlock();
++in_->nwrite;
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
error_code{},
bytes_transferred));
}
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
ec,
std::size_t{0}));
return init.result.get();
}
// copy buffers
auto n = std::min<std::size_t>(
buffer_size(buffers), in_->write_max);
{
std::lock_guard<std::mutex> lock(out->m);
n = net::buffer_copy(out->b.prepare(n), buffers);
out->b.commit(n);
out->notify_read();
}
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(init.completion_handler),
error_code{}, n));
return init.result.get();
}
void
teardown(
websocket::role_type,
stream& s,
boost::system::error_code& ec)
websocket::role_type,
stream& s,
boost::system::error_code& ec)
{
if( s.in_->fc &&
s.in_->fc->fail(ec))
@@ -412,15 +573,15 @@ boost::system::error_code& ec)
s.in_->fc->fail(ec))
ec = net::error::eof;
else
ec = {};
ec.clear();
}
template<class TeardownHandler>
void
async_teardown(
websocket::role_type,
stream& s,
TeardownHandler&& handler)
websocket::role_type,
stream& s,
TeardownHandler&& handler)
{
error_code ec;
if( s.in_->fc &&
@@ -434,7 +595,7 @@ TeardownHandler&& handler)
s.in_->fc->fail(ec))
ec = net::error::eof;
else
ec = {};
ec.clear();
net::post(
s.get_executor(),
@@ -442,95 +603,6 @@ TeardownHandler&& handler)
std::move(handler), ec));
}
//------------------------------------------------------------------------------
template<class Handler, class Buffers>
class stream::read_op : public stream::read_op_base
{
using ex1_type =
net::io_context::executor_type;
using ex2_type
= net::associated_executor_t<Handler, ex1_type>;
class lambda
{
state& s_;
Buffers b_;
Handler h_;
net::executor_work_guard<ex2_type> wg2_;
public:
lambda(lambda&&) = default;
lambda(lambda const&) = default;
template<class DeducedHandler>
lambda(state& s, Buffers const& b, DeducedHandler&& h)
: s_(s)
, b_(b)
, h_(std::forward<DeducedHandler>(h))
, wg2_(net::get_associated_executor(
h_, s_.ioc.get_executor()))
{
}
void
operator()()
{
std::unique_lock<std::mutex> lock{s_.m};
error_code ec;
std::size_t bytes_transferred = 0;
BOOST_ASSERT(! s_.op);
if(s_.b.size() > 0)
{
bytes_transferred =
net::buffer_copy(
b_, s_.b.data(), s_.read_max);
s_.b.consume(bytes_transferred);
++s_.nread;
}
else
{
BOOST_ASSERT(s_.code != status::ok);
auto& s = s_;
++s.nread;
if(s.code == status::eof)
ec = net::error::eof;
else if(s.code == status::reset)
ec = net::error::connection_reset;
}
lock.unlock();
auto alloc = net::get_associated_allocator(h_);
wg2_.get_executor().dispatch(
beast::bind_front_handler(
std::move(h_),
ec,
bytes_transferred), alloc);
wg2_.reset();
}
};
lambda fn_;
net::executor_work_guard<ex1_type> wg1_;
public:
template<class DeducedHandler>
read_op(state& s, Buffers const& b, DeducedHandler&& h)
: fn_(s, b, std::forward<DeducedHandler>(h))
, wg1_(s.ioc.get_executor())
{
}
void
operator()() override
{
net::post(
wg1_.get_executor(),
std::move(fn_));
wg1_.reset();
}
};
stream
connect(stream& to)
{

View File

@@ -98,7 +98,7 @@ class stream
struct read_op_base
{
virtual ~read_op_base() = default;
virtual void operator()() = 0;
virtual void operator()(bool cancel = false) = 0;
};
template<class Handler, class Buffers>
@@ -108,7 +108,6 @@ class stream
{
ok,
eof,
reset
};
struct state
@@ -129,33 +128,16 @@ class stream
std::size_t write_max =
(std::numeric_limits<std::size_t>::max)();
~state()
{
BOOST_ASSERT(! op);
}
BOOST_BEAST_DECL
explicit
state(
net::io_context& ioc_,
fail_count* fc_)
: ioc(ioc_)
, fc(fc_)
{
}
state(net::io_context& ioc_, fail_count* fc_);
BOOST_BEAST_DECL
~state();
BOOST_BEAST_DECL
void
on_write()
{
if(op)
{
std::unique_ptr<read_op_base> op_ = std::move(op);
op_->operator()();
}
else
{
cv.notify_all();
}
}
notify_read();
};
std::shared_ptr<state> in_;

View File

@@ -9,3 +9,126 @@
// Test that header file is self-contained.
#include <boost/beast/_experimental/test/stream.hpp>
#include <boost/beast/_experimental/unit_test/suite.hpp>
namespace boost {
namespace beast {
class test_stream_test
: public unit_test::suite
{
public:
void
testTestStream()
{
net::io_context ioc;
char buf[1] = {};
net::mutable_buffer m0;
net::mutable_buffer m1(buf, sizeof(buf));
{
{
test::stream ts(ioc);
}
{
test::stream ts(ioc);
ts.close();
}
{
test::stream t1(ioc);
auto t2 = connect(t1);
}
{
test::stream t1(ioc);
auto t2 = connect(t1);
t2.close();
}
{
#if 0
test::stream ts(ioc);
error_code ec;
ts.read_some(net::mutable_buffer{}, ec);
log << ec.message();
#endif
}
{
error_code ec;
{
test::stream ts(ioc);
ts.async_read_some(m1,
[&](error_code ec_, std::size_t)
{
ec = ec_;
});
}
ioc.run();
ioc.restart();
BEAST_EXPECTS(
//ec == net::error::eof,
ec == net::error::operation_aborted,
ec.message());
}
{
error_code ec;
test::stream ts(ioc);
ts.async_read_some(m1,
[&](error_code ec_, std::size_t)
{
ec = ec_;
});
ts.close();
ioc.run();
ioc.restart();
BEAST_EXPECTS(
//ec == net::error::eof,
ec == net::error::operation_aborted,
ec.message());
}
{
error_code ec;
test::stream t1(ioc);
auto t2 = connect(t1);
t1.async_read_some(m1,
[&](error_code ec_, std::size_t)
{
ec = ec_;
});
t2.close();
ioc.run();
ioc.restart();
BEAST_EXPECTS(
ec == net::error::eof,
ec.message());
}
{
error_code ec;
test::stream t1(ioc);
auto t2 = connect(t1);
t1.async_read_some(m1,
[&](error_code ec_, std::size_t)
{
ec = ec_;
});
t1.close();
ioc.run();
ioc.restart();
BEAST_EXPECTS(
ec == net::error::operation_aborted,
ec.message());
}
}
}
void
run() override
{
testTestStream();
pass();
}
};
BEAST_DEFINE_TESTSUITE(beast,test,test_stream);
} // beast
} // boost