More split definitions in test::stream

Signed-off-by: Damian Jarek <damian.jarek93@gmail.com>
This commit is contained in:
Damian Jarek
2019-02-23 02:30:08 +01:00
committed by Vinnie Falco
parent c82237512a
commit d048aa8e7e
4 changed files with 68 additions and 81 deletions

View File

@ -1,3 +1,9 @@
Version 219:
* More split definitions in test::stream
--------------------------------------------------------------------------------
Version 218: Version 218:
* detect_ssl, async_detect_ssl are public interfaces * detect_ssl, async_detect_ssl are public interfaces

View File

@ -54,15 +54,10 @@ class stream::read_op : public stream::read_op_base
} }
void void
operator()(bool cancel) operator()(error_code ec)
{ {
error_code ec;
std::size_t bytes_transferred = 0; std::size_t bytes_transferred = 0;
if(cancel) if (!ec)
{
ec = net::error::operation_aborted;
}
else
{ {
std::lock_guard<std::mutex> lock(s_.m); std::lock_guard<std::mutex> lock(s_.m);
BOOST_ASSERT(! s_.op); BOOST_ASSERT(! s_.op);
@ -73,11 +68,12 @@ class stream::read_op : public stream::read_op_base
b_, s_.b.data(), s_.read_max); b_, s_.b.data(), s_.read_max);
s_.b.consume(bytes_transferred); s_.b.consume(bytes_transferred);
} }
else else if (buffer_size(b_) > 0)
{ {
ec = net::error::eof; ec = net::error::eof;
} }
} }
auto alloc = net::get_associated_allocator(h_); auto alloc = net::get_associated_allocator(h_);
wg2_.get_executor().dispatch( wg2_.get_executor().dispatch(
beast::bind_front_handler(std::move(h_), beast::bind_front_handler(std::move(h_),
@ -101,13 +97,13 @@ public:
} }
void void
operator()(bool cancel) override operator()(error_code ec) override
{ {
net::post( net::post(
wg1_.get_executor(), wg1_.get_executor(),
bind_handler( bind_handler(
std::move(fn_), std::move(fn_),
cancel)); ec));
wg1_.reset(); wg1_.reset();
} }
}; };
@ -120,7 +116,7 @@ struct stream::run_read_op
void void
operator()( operator()(
ReadHandler&& h, ReadHandler&& h,
std::shared_ptr<state> in_, stream& s,
MutableBufferSequence const& buffers) MutableBufferSequence const& buffers)
{ {
// If you get an error on the following line it means // If you get an error on the following line it means
@ -132,71 +128,15 @@ struct stream::run_read_op
void(error_code, std::size_t)>::value, void(error_code, std::size_t)>::value,
"ReadHandler type requirements not met"); "ReadHandler type requirements not met");
++in_->nread; s.initiate_read(
std::unique_ptr<read_op_base>{
std::unique_lock<std::mutex> lock(in_->m);
if(in_->op != nullptr)
throw std::logic_error(
"in_->op != nullptr");
// test failure
error_code ec;
if(in_->fc && in_->fc->fail(ec))
{
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(h),
ec, std::size_t{0}));
return;
}
// A request to read 0 bytes from a stream is a no-op.
if(buffer_size(buffers) == 0)
{
lock.unlock();
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(h),
ec, std::size_t{0}));
return;
}
// deliver bytes before eof
if(buffer_size(in_->b.data()) > 0)
{
auto n = net::buffer_copy(
buffers, in_->b.data(), in_->read_max);
in_->b.consume(n);
lock.unlock();
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(h),
ec, n));
return;
}
// deliver error
if(in_->code != status::ok)
{
lock.unlock();
ec = net::error::eof;
net::post(
in_->ioc.get_executor(),
beast::bind_front_handler(
std::move(h),
ec, std::size_t{0}));
return;
}
// complete when bytes available or closed
in_->op.reset(
new read_op< new read_op<
ReadHandler, typename std::decay<ReadHandler>::type,
MutableBufferSequence>( MutableBufferSequence>(
std::move(h), *in_, buffers)); std::move(h),
*s.in_,
buffers)},
buffer_size(buffers));
} }
}; };
@ -362,7 +302,7 @@ async_read_some(
void(error_code, std::size_t)>( void(error_code, std::size_t)>(
run_read_op{}, run_read_op{},
handler, handler,
in_, *this,
buffers); buffers);
} }

View File

@ -21,6 +21,45 @@ namespace beast {
namespace test { namespace test {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void stream::initiate_read(
std::unique_ptr<stream::read_op_base>&& op,
std::size_t buf_size)
{
std::unique_lock<std::mutex> lock(in_->m);
++in_->nread;
if(in_->op != nullptr)
throw std::logic_error(
"in_->op != nullptr");
// test failure
error_code ec;
if(in_->fc && in_->fc->fail(ec))
{
lock.unlock();
(*op)(ec);
return;
}
// A request to read 0 bytes from a stream is a no-op.
if(buf_size == 0 || buffer_size(in_->b.data()) > 0)
{
lock.unlock();
(*op)(ec);
return;
}
// deliver error
if(in_->code != status::ok)
{
lock.unlock();
(*op)(net::error::eof);
return;
}
// complete when bytes available or closed
in_->op = std::move(op);
}
stream:: stream::
state:: state::
@ -38,7 +77,7 @@ state::
{ {
// cancel outstanding read // cancel outstanding read
if(op != nullptr) if(op != nullptr)
(*op)(true); (*op)(net::error::operation_aborted);
} }
void void
@ -49,7 +88,7 @@ notify_read()
if(op) if(op)
{ {
auto op_ = std::move(op); auto op_ = std::move(op);
op_->operator()(); op_->operator()(error_code{});
} }
else else
{ {
@ -58,7 +97,7 @@ notify_read()
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
stream:: stream::
~stream() ~stream()
{ {
@ -181,7 +220,7 @@ close()
op = std::move(in_->op); op = std::move(in_->op);
} }
if(op != nullptr) if(op != nullptr)
(*op)(true); (*op)(net::error::operation_aborted);
} }
// disconnect // disconnect

View File

@ -99,9 +99,11 @@ class stream
struct read_op_base struct read_op_base
{ {
virtual ~read_op_base() = default; virtual ~read_op_base() = default;
virtual void operator()(bool cancel = false) = 0; virtual void operator()(error_code ec) = 0;
}; };
BOOST_BEAST_DECL void initiate_read(std::unique_ptr<read_op_base>&& op, std::size_t buf_size);
template<class Handler, class Buffers> template<class Handler, class Buffers>
class read_op; class read_op;
@ -138,7 +140,7 @@ class stream
BOOST_BEAST_DECL BOOST_BEAST_DECL
~state(); ~state();
BOOST_BEAST_DECL BOOST_BEAST_DECL
void void
notify_read(); notify_read();