Refactor test::stream

This commit is contained in:
Vinnie Falco
2017-08-21 14:45:58 -07:00
parent 99822aebf5
commit 463accffd1
7 changed files with 731 additions and 683 deletions

View File

@@ -1,5 +1,7 @@
Version 109: Version 109:
* refactor test::stream
WebSocket: WebSocket:
* Fix async_read_some handler signature * Fix async_read_some handler signature

View File

@@ -51,7 +51,7 @@ public:
test::stream ts{ios_, fc}; test::stream ts{ios_, fc};
test_parser<isRequest> p(fc); test_parser<isRequest> p(fc);
error_code ec = test::error::fail_error; error_code ec = test::error::fail_error;
ts.remote().close(); ts.close_remote();
read(ts, b, p, ec); read(ts, b, p, ec);
if(! ec) if(! ec)
break; break;
@@ -68,7 +68,7 @@ public:
std::string(s + pre, len - pre)}; std::string(s + pre, len - pre)};
test_parser<isRequest> p(fc); test_parser<isRequest> p(fc);
error_code ec = test::error::fail_error; error_code ec = test::error::fail_error;
ts.remote().close(); ts.close_remote();
read(ts, b, p, ec); read(ts, b, p, ec);
if(! ec) if(! ec)
break; break;
@@ -83,7 +83,7 @@ public:
test::stream ts{ios_, fc}; test::stream ts{ios_, fc};
test_parser<isRequest> p(fc); test_parser<isRequest> p(fc);
error_code ec = test::error::fail_error; error_code ec = test::error::fail_error;
ts.remote().close(); ts.close_remote();
async_read(ts, b, p, do_yield[ec]); async_read(ts, b, p, do_yield[ec]);
if(! ec) if(! ec)
break; break;
@@ -100,7 +100,7 @@ public:
std::string{s + pre, len - pre}); std::string{s + pre, len - pre});
test_parser<isRequest> p(fc); test_parser<isRequest> p(fc);
error_code ec = test::error::fail_error; error_code ec = test::error::fail_error;
ts.remote().close(); ts.close_remote();
async_read(ts, b, p, do_yield[ec]); async_read(ts, b, p, do_yield[ec]);
if(! ec) if(! ec)
break; break;
@@ -114,7 +114,7 @@ public:
{ {
multi_buffer b; multi_buffer b;
test::stream c{ios_, "GET / X"}; test::stream c{ios_, "GET / X"};
c.remote().close(); c.close_remote();
request_parser<dynamic_body> p; request_parser<dynamic_body> p;
read(c, b, p); read(c, b, p);
fail(); fail();
@@ -306,7 +306,7 @@ public:
test::stream ts{ios_}; test::stream ts{ios_};
request_parser<dynamic_body> p; request_parser<dynamic_body> p;
error_code ec; error_code ec;
ts.remote().close(); ts.close_remote();
read(ts, b, p, ec); read(ts, b, p, ec);
BEAST_EXPECT(ec == http::error::end_of_stream); BEAST_EXPECT(ec == http::error::end_of_stream);
} }
@@ -315,7 +315,7 @@ public:
test::stream ts{ios_}; test::stream ts{ios_};
request_parser<dynamic_body> p; request_parser<dynamic_body> p;
error_code ec; error_code ec;
ts.remote().close(); ts.close_remote();
async_read(ts, b, p, do_yield[ec]); async_read(ts, b, p, do_yield[ec]);
BEAST_EXPECT(ec == http::error::end_of_stream); BEAST_EXPECT(ec == http::error::end_of_stream);
} }

View File

@@ -284,10 +284,11 @@ public:
bool bool
equal_body(string_view sv, string_view body) equal_body(string_view sv, string_view body)
{ {
test::stream ts{ios_, sv}; test::stream ts{ios_, sv}, tr{ios_};
ts.connect(tr);
message<isRequest, string_body, fields> m; message<isRequest, string_body, fields> m;
multi_buffer b; multi_buffer b;
ts.remote().close(); ts.close_remote();
try try
{ {
read(ts, b, m); read(ts, b, m);
@@ -304,12 +305,13 @@ public:
std::string std::string
str(message<isRequest, Body, Fields> const& m) str(message<isRequest, Body, Fields> const& m)
{ {
test::stream ts(ios_); test::stream ts{ios_}, tr{ios_};
ts.connect(tr);
error_code ec; error_code ec;
write(ts, m, ec); write(ts, m, ec);
if(ec && ec != error::end_of_stream) if(ec && ec != error::end_of_stream)
BOOST_THROW_EXCEPTION(system_error{ec}); BOOST_THROW_EXCEPTION(system_error{ec});
return ts.remote().str().to_string(); return tr.str().to_string();
} }
void void
@@ -323,10 +325,11 @@ public:
m.set(field::content_length, "5"); m.set(field::content_length, "5");
m.body = "*****"; m.body = "*****";
error_code ec; error_code ec;
test::stream ts{ios_}; test::stream ts{ios_}, tr{ios_};
ts.connect(tr);
async_write(ts, m, do_yield[ec]); async_write(ts, m, do_yield[ec]);
if(BEAST_EXPECTS(ec == error::end_of_stream, ec.message())) if(BEAST_EXPECTS(ec == error::end_of_stream, ec.message()))
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"HTTP/1.0 200 OK\r\n" "HTTP/1.0 200 OK\r\n"
"Server: test\r\n" "Server: test\r\n"
"Content-Length: 5\r\n" "Content-Length: 5\r\n"
@@ -341,10 +344,11 @@ public:
m.set(field::transfer_encoding, "chunked"); m.set(field::transfer_encoding, "chunked");
m.body = "*****"; m.body = "*****";
error_code ec; error_code ec;
test::stream ts{ios_}; test::stream ts{ios_}, tr{ios_};
ts.connect(tr);
async_write(ts, m, do_yield[ec]); async_write(ts, m, do_yield[ec]);
if(BEAST_EXPECTS(! ec, ec.message())) if(BEAST_EXPECTS(! ec, ec.message()))
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"HTTP/1.1 200 OK\r\n" "HTTP/1.1 200 OK\r\n"
"Server: test\r\n" "Server: test\r\n"
"Transfer-Encoding: chunked\r\n" "Transfer-Encoding: chunked\r\n"
@@ -364,7 +368,8 @@ public:
for(n = 0; n < limit; ++n) for(n = 0; n < limit; ++n)
{ {
test::fail_counter fc(n); test::fail_counter fc(n);
test::stream ts(ios_, fc); test::stream ts{ios_, fc}, tr{ios_};
ts.connect(tr);
request<fail_body> m(verb::get, "/", 10, fc); request<fail_body> m(verb::get, "/", 10, fc);
m.set(field::user_agent, "test"); m.set(field::user_agent, "test");
m.set(field::connection, "keep-alive"); m.set(field::connection, "keep-alive");
@@ -373,7 +378,7 @@ public:
try try
{ {
write(ts, m); write(ts, m);
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"GET / HTTP/1.0\r\n" "GET / HTTP/1.0\r\n"
"User-Agent: test\r\n" "User-Agent: test\r\n"
"Connection: keep-alive\r\n" "Connection: keep-alive\r\n"
@@ -393,7 +398,8 @@ public:
for(n = 0; n < limit; ++n) for(n = 0; n < limit; ++n)
{ {
test::fail_counter fc(n); test::fail_counter fc(n);
test::stream ts(ios_, fc); test::stream ts{ios_, fc}, tr{ios_};
ts.connect(tr);
request<fail_body> m{verb::get, "/", 10, fc}; request<fail_body> m{verb::get, "/", 10, fc};
m.set(field::user_agent, "test"); m.set(field::user_agent, "test");
m.set(field::transfer_encoding, "chunked"); m.set(field::transfer_encoding, "chunked");
@@ -402,7 +408,7 @@ public:
write(ts, m, ec); write(ts, m, ec);
if(ec == error::end_of_stream) if(ec == error::end_of_stream)
{ {
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"GET / HTTP/1.0\r\n" "GET / HTTP/1.0\r\n"
"User-Agent: test\r\n" "User-Agent: test\r\n"
"Transfer-Encoding: chunked\r\n" "Transfer-Encoding: chunked\r\n"
@@ -422,7 +428,8 @@ public:
for(n = 0; n < limit; ++n) for(n = 0; n < limit; ++n)
{ {
test::fail_counter fc(n); test::fail_counter fc(n);
test::stream ts(ios_, fc); test::stream ts{ios_, fc}, tr{ios_};
ts.connect(tr);
request<fail_body> m{verb::get, "/", 10, fc}; request<fail_body> m{verb::get, "/", 10, fc};
m.set(field::user_agent, "test"); m.set(field::user_agent, "test");
m.set(field::transfer_encoding, "chunked"); m.set(field::transfer_encoding, "chunked");
@@ -431,7 +438,7 @@ public:
async_write(ts, m, do_yield[ec]); async_write(ts, m, do_yield[ec]);
if(ec == error::end_of_stream) if(ec == error::end_of_stream)
{ {
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"GET / HTTP/1.0\r\n" "GET / HTTP/1.0\r\n"
"User-Agent: test\r\n" "User-Agent: test\r\n"
"Transfer-Encoding: chunked\r\n" "Transfer-Encoding: chunked\r\n"
@@ -451,7 +458,8 @@ public:
for(n = 0; n < limit; ++n) for(n = 0; n < limit; ++n)
{ {
test::fail_counter fc(n); test::fail_counter fc(n);
test::stream ts(ios_, fc); test::stream ts{ios_, fc}, tr{ios_};
ts.connect(tr);
request<fail_body> m{verb::get, "/", 10, fc}; request<fail_body> m{verb::get, "/", 10, fc};
m.set(field::user_agent, "test"); m.set(field::user_agent, "test");
m.set(field::connection, "keep-alive"); m.set(field::connection, "keep-alive");
@@ -461,7 +469,7 @@ public:
write(ts, m, ec); write(ts, m, ec);
if(! ec) if(! ec)
{ {
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"GET / HTTP/1.0\r\n" "GET / HTTP/1.0\r\n"
"User-Agent: test\r\n" "User-Agent: test\r\n"
"Connection: keep-alive\r\n" "Connection: keep-alive\r\n"
@@ -477,7 +485,8 @@ public:
for(n = 0; n < limit; ++n) for(n = 0; n < limit; ++n)
{ {
test::fail_counter fc(n); test::fail_counter fc(n);
test::stream ts(ios_, fc); test::stream ts{ios_, fc}, tr{ios_};
ts.connect(tr);
request<fail_body> m{verb::get, "/", 10, fc}; request<fail_body> m{verb::get, "/", 10, fc};
m.set(field::user_agent, "test"); m.set(field::user_agent, "test");
m.set(field::connection, "keep-alive"); m.set(field::connection, "keep-alive");
@@ -487,7 +496,7 @@ public:
async_write(ts, m, do_yield[ec]); async_write(ts, m, do_yield[ec]);
if(! ec) if(! ec)
{ {
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"GET / HTTP/1.0\r\n" "GET / HTTP/1.0\r\n"
"User-Agent: test\r\n" "User-Agent: test\r\n"
"Connection: keep-alive\r\n" "Connection: keep-alive\r\n"
@@ -530,11 +539,12 @@ public:
m.set(field::user_agent, "test"); m.set(field::user_agent, "test");
m.body = "*"; m.body = "*";
m.prepare_payload(); m.prepare_payload();
test::stream ts(ios_); test::stream ts{ios_}, tr{ios_};
ts.connect(tr);
error_code ec; error_code ec;
write(ts, m, ec); write(ts, m, ec);
BEAST_EXPECT(ec == error::end_of_stream); BEAST_EXPECT(ec == error::end_of_stream);
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"GET / HTTP/1.0\r\n" "GET / HTTP/1.0\r\n"
"User-Agent: test\r\n" "User-Agent: test\r\n"
"\r\n" "\r\n"
@@ -567,10 +577,11 @@ public:
m.set(field::user_agent, "test"); m.set(field::user_agent, "test");
m.body = "*"; m.body = "*";
m.prepare_payload(); m.prepare_payload();
test::stream ts(ios_); test::stream ts{ios_}, tr{ios_};
ts.connect(tr);
error_code ec; error_code ec;
write(ts, m, ec); write(ts, m, ec);
BEAST_EXPECT(ts.remote().str() == BEAST_EXPECT(tr.str() ==
"GET / HTTP/1.1\r\n" "GET / HTTP/1.1\r\n"
"User-Agent: test\r\n" "User-Agent: test\r\n"
"Transfer-Encoding: chunked\r\n" "Transfer-Encoding: chunked\r\n"
@@ -635,7 +646,8 @@ public:
// destroyed when calling ~io_service // destroyed when calling ~io_service
{ {
boost::asio::io_service ios; boost::asio::io_service ios;
test::stream ts{ios}; test::stream ts{ios}, tr{ios};
ts.connect(tr);
BEAST_EXPECT(handler::count() == 0); BEAST_EXPECT(handler::count() == 0);
request<string_body> m; request<string_body> m;
m.method(verb::get); m.method(verb::get);
@@ -695,8 +707,8 @@ public:
void void
testWriteStream(boost::asio::yield_context yield) testWriteStream(boost::asio::yield_context yield)
{ {
test::stream ts{ios_}; test::stream ts{ios_}, tr{ios_};
auto tr = ts.remote(); ts.connect(tr);
ts.write_size(3); ts.write_size(3);
response<Body> m0; response<Body> m0;
@@ -812,8 +824,8 @@ public:
testIssue655() testIssue655()
{ {
boost::asio::io_service ios; boost::asio::io_service ios;
test::stream ts{ios}; test::stream ts{ios}, tr{ios};
ts.connect(tr);
response<empty_body> res; response<empty_body> res;
res.chunked(true); res.chunked(true);
response_serializer<empty_body> sr{res}; response_serializer<empty_body> sr{res};
@@ -821,11 +833,11 @@ public:
[&](const error_code&) [&](const error_code&)
{ {
}); });
ios.run(); ios.run();
} }
void run() override void
run() override
{ {
testIssue655(); testIssue655();
yield_to( yield_to(

File diff suppressed because it is too large Load Diff

View File

@@ -50,8 +50,7 @@ public:
testRead() testRead()
{ {
{ {
test::stream ts(ios_, test::stream ts{ios_, "\x16***"};
"\x16***");
error_code ec; error_code ec;
flat_buffer b; flat_buffer b;
auto const result = detect_ssl(ts, b, ec); auto const result = detect_ssl(ts, b, ec);
@@ -61,8 +60,7 @@ public:
yield_to( yield_to(
[&](yield_context yield) [&](yield_context yield)
{ {
test::stream ts(ios_, test::stream ts{ios_, "\x16***"};
"\x16***");
error_code ec; error_code ec;
flat_buffer b; flat_buffer b;
auto const result = auto const result =

View File

@@ -74,7 +74,7 @@ public:
test::stream ts{ios_, sv}; test::stream ts{ios_, sv};
message<isRequest, string_body, fields> m; message<isRequest, string_body, fields> m;
multi_buffer b; multi_buffer b;
ts.remote().close(); ts.close_remote();
try try
{ {
read(ts, b, m); read(ts, b, m);
@@ -90,8 +90,8 @@ public:
void void
doExpect100Continue() doExpect100Continue()
{ {
test::stream ts{ios_}; test::stream ts{ios_}, tr{ios_};
auto tr = ts.remote(); ts.connect(tr);
yield_to( yield_to(
[&](yield_context) [&](yield_context)
{ {
@@ -125,12 +125,13 @@ public:
std::string const s = "Hello, world!"; std::string const s = "Hello, world!";
test::stream t0{ios_, s}; test::stream t0{ios_, s};
t0.read_size(3); t0.read_size(3);
t0.remote().close(); t0.close_remote();
test::stream t1{ios_}; test::stream t1{ios_}, t1r{ios_};
t1.connect(t1r);
error_code ec; error_code ec;
send_cgi_response(t0, t1, ec); send_cgi_response(t0, t1, ec);
BEAST_EXPECTS(! ec, ec.message()); BEAST_EXPECTS(! ec, ec.message());
BEAST_EXPECT(equal_body<false>(t1.remote().str(), s)); BEAST_EXPECT(equal_body<false>(t1r.str(), s));
} }
void void
@@ -144,10 +145,11 @@ public:
req.body = "Hello, world!"; req.body = "Hello, world!";
req.prepare_payload(); req.prepare_payload();
test::stream ds{ios_}; test::stream ds{ios_}, dsr{ios_};
auto dsr = ds.remote(); ds.connect(dsr);
dsr.read_size(3); dsr.read_size(3);
test::stream us{ios_}; test::stream us{ios_}, usr{ios_};
us.connect(usr);
us.write_size(3); us.write_size(3);
error_code ec; error_code ec;
@@ -165,7 +167,7 @@ public:
}); });
BEAST_EXPECTS(! ec, ec.message()); BEAST_EXPECTS(! ec, ec.message());
BEAST_EXPECT(equal_body<true>( BEAST_EXPECT(equal_body<true>(
us.remote().str(), req.body)); usr.str(), req.body));
} }
void void
@@ -239,8 +241,8 @@ public:
void void
doHEAD() doHEAD()
{ {
test::stream ts{ios_}; test::stream ts{ios_}, tr{ios_};
auto tr = ts.remote(); ts.connect(tr);
yield_to( yield_to(
[&](yield_context) [&](yield_context)
{ {
@@ -324,7 +326,8 @@ public:
return boost::asio::const_buffers_1{ return boost::asio::const_buffers_1{
s.data(), s.size()}; s.data(), s.size()};
}; };
test::stream ts{ios_}; test::stream ts{ios_}, tr{ios_};
ts.connect(tr);
response<empty_body> res{status::ok, 11}; response<empty_body> res{status::ok, 11};
res.set(field::server, "test"); res.set(field::server, "test");
@@ -366,7 +369,7 @@ public:
std::allocator<double>{} std::allocator<double>{}
), ec); ), ec);
BEAST_EXPECT( BEAST_EXPECT(
to_string(ts.remote().buffer().data()) == to_string(tr.buffer().data()) ==
"HTTP/1.1 200 OK\r\n" "HTTP/1.1 200 OK\r\n"
"Server: test\r\n" "Server: test\r\n"
"Accept: Expires, Content-MD5\r\n" "Accept: Expires, Content-MD5\r\n"

View File

@@ -32,16 +32,18 @@ namespace boost {
namespace beast { namespace beast {
namespace test { namespace test {
class stream; /** A bidirectional in-memory communication channel
namespace detail { An instance of this class provides a client and server
endpoint that are automatically connected to each other
similarly to a connected socket.
class stream_impl Test pipes are used to facilitate writing unit tests
where the behavior of the transport is tightly controlled
to help illuminate all code paths (for code coverage)
*/
class stream
{ {
friend class boost::beast::test::stream;
using buffer_type = flat_buffer;
struct read_op struct read_op
{ {
virtual ~read_op() = default; virtual ~read_op() = default;
@@ -63,7 +65,7 @@ class stream_impl
friend class stream; friend class stream;
std::mutex m; std::mutex m;
buffer_type b; flat_buffer b;
std::condition_variable cv; std::condition_variable cv;
std::unique_ptr<read_op> op; std::unique_ptr<read_op> op;
boost::asio::io_service& ios; boost::asio::io_service& ios;
@@ -105,218 +107,61 @@ class stream_impl
} }
}; };
state s0_; std::shared_ptr<state> in_;
state s1_; std::weak_ptr<state> out_;
public:
stream_impl(
boost::asio::io_service& ios,
fail_counter* fc)
: s0_(ios, fc)
, s1_(ios, nullptr)
{
}
stream_impl(
boost::asio::io_service& ios0,
boost::asio::io_service& ios1)
: s0_(ios0, nullptr)
, s1_(ios1, nullptr)
{
}
~stream_impl()
{
BOOST_ASSERT(! s0_.op);
BOOST_ASSERT(! s1_.op);
}
};
template<class Handler, class Buffers>
class stream_impl::read_op_impl : public stream_impl::read_op
{
class lambda
{
state& s_;
Buffers b_;
Handler h_;
boost::optional<
boost::asio::io_service::work> work_;
public:
lambda(lambda&&) = default;
lambda(lambda const&) = default;
lambda(state& s, Buffers const& b, Handler&& h)
: s_(s)
, b_(b)
, h_(std::move(h))
, work_(s_.ios)
{
}
lambda(state& s, Buffers const& b, Handler const& h)
: s_(s)
, b_(b)
, h_(h)
, work_(s_.ios)
{
}
void
post()
{
s_.ios.post(std::move(*this));
work_ = boost::none;
}
void
operator()()
{
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
std::unique_lock<std::mutex> lock{s_.m};
BOOST_ASSERT(! s_.op);
if(s_.b.size() > 0)
{
auto const bytes_transferred = buffer_copy(
b_, s_.b.data(), s_.read_max);
s_.b.consume(bytes_transferred);
auto& s = s_;
Handler h{std::move(h_)};
lock.unlock();
++s.nread;
s.ios.post(bind_handler(std::move(h),
error_code{}, bytes_transferred));
}
else
{
BOOST_ASSERT(s_.code != status::ok);
auto& s = s_;
Handler h{std::move(h_)};
lock.unlock();
++s.nread;
error_code ec;
if(s.code == status::eof)
ec = boost::asio::error::eof;
else if(s.code == status::reset)
ec = boost::asio::error::connection_reset;
s.ios.post(bind_handler(std::move(h), ec, 0));
}
}
};
lambda fn_;
public:
read_op_impl(state& s, Buffers const& b, Handler&& h)
: fn_(s, b, std::move(h))
{
}
read_op_impl(state& s, Buffers const& b, Handler const& h)
: fn_(s, b, h)
{
}
void
operator()() override
{
fn_.post();
}
};
} // detail
//------------------------------------------------------------------------------
/** A bidirectional in-memory communication channel
An instance of this class provides a client and server
endpoint that are automatically connected to each other
similarly to a connected socket.
Test pipes are used to facilitate writing unit tests
where the behavior of the transport is tightly controlled
to help illuminate all code paths (for code coverage)
*/
class stream
{
using status = detail::stream_impl::status;
std::shared_ptr<detail::stream_impl> impl_;
detail::stream_impl::state* in_;
detail::stream_impl::state* out_;
explicit
stream(std::shared_ptr<
detail::stream_impl> const& impl)
: impl_(impl)
, in_(&impl_->s1_)
, out_(&impl_->s0_)
{
}
public: public:
using buffer_type = flat_buffer; using buffer_type = flat_buffer;
/// Assignment
stream& operator=(stream&&) = default;
/// Destructor /// Destructor
~stream() ~stream()
{ {
if(! impl_) auto out = out_.lock();
return; if(out)
std::unique_lock<std::mutex> lock{out_->m};
if(out_->code == status::ok)
{ {
out_->code = status::reset; std::unique_lock<std::mutex> lock{out->m};
out_->on_write(); if(out->code == status::ok)
{
out->code = status::reset;
out->on_write();
}
} }
lock.unlock();
} }
/// Constructor /// Constructor
stream(stream&& other) stream(stream&& other)
: impl_(std::move(other.impl_))
, in_(other.in_)
, out_(other.out_)
{ {
auto in = std::make_shared<state>(
other.in_->ios, other.in_->fc);
in_ = std::move(other.in_);
out_ = std::move(other.out_);
other.in_ = in;
}
/// Assignment
stream&
operator=(stream&& other)
{
auto in = std::make_shared<state>(
other.in_->ios, other.in_->fc);
in_ = std::move(other.in_);
out_ = std::move(other.out_);
other.in_ = in;
return *this;
} }
/// Constructor /// Constructor
explicit explicit
stream( stream(boost::asio::io_service& ios)
boost::asio::io_service& ios) : in_(std::make_shared<state>(ios, nullptr))
: impl_(std::make_shared<
detail::stream_impl>(ios, nullptr))
, in_(&impl_->s0_)
, out_(&impl_->s1_)
{ {
} }
/// Constructor /// Constructor
explicit
stream(
boost::asio::io_service& ios0,
boost::asio::io_service& ios1)
: impl_(std::make_shared<
detail::stream_impl>(ios0, ios1))
, in_(&impl_->s0_)
, out_(&impl_->s1_)
{
}
/// Constructor
explicit
stream( stream(
boost::asio::io_service& ios, boost::asio::io_service& ios,
fail_counter& fc) fail_counter& fc)
: impl_(std::make_shared< : in_(std::make_shared<state>(ios, &fc))
detail::stream_impl>(ios, &fc))
, in_(&impl_->s0_)
, out_(&impl_->s1_)
{ {
} }
@@ -324,10 +169,7 @@ public:
stream( stream(
boost::asio::io_service& ios, boost::asio::io_service& ios,
string_view s) string_view s)
: impl_(std::make_shared< : in_(std::make_shared<state>(ios, nullptr))
detail::stream_impl>(ios, nullptr))
, in_(&impl_->s0_)
, out_(&impl_->s1_)
{ {
using boost::asio::buffer; using boost::asio::buffer;
using boost::asio::buffer_copy; using boost::asio::buffer_copy;
@@ -341,10 +183,7 @@ public:
boost::asio::io_service& ios, boost::asio::io_service& ios,
fail_counter& fc, fail_counter& fc,
string_view s) string_view s)
: impl_(std::make_shared< : in_(std::make_shared<state>(ios, &fc))
detail::stream_impl>(ios, &fc))
, in_(&impl_->s0_)
, out_(&impl_->s1_)
{ {
using boost::asio::buffer; using boost::asio::buffer;
using boost::asio::buffer_copy; using boost::asio::buffer_copy;
@@ -353,15 +192,17 @@ public:
buffer(s.data(), s.size()))); buffer(s.data(), s.size())));
} }
/// Return the other end of the connection /// Establish a connection
stream void
remote() connect(stream& remote)
{ {
BOOST_ASSERT(in_ == &impl_->s0_); BOOST_ASSERT(! out_.lock());
return stream{impl_}; BOOST_ASSERT(! remote.out_.lock());
out_ = remote.in_;
remote.out_ = in_;
} }
/// Return the `io_service` associated with the object /// Return the `io_service` associated with the stream
boost::asio::io_service& boost::asio::io_service&
get_io_service() get_io_service()
{ {
@@ -379,7 +220,7 @@ public:
void void
write_size(std::size_t n) write_size(std::size_t n)
{ {
out_->write_max = n; in_->write_max = n;
} }
/// Direct input buffer access /// Direct input buffer access
@@ -402,7 +243,7 @@ public:
/// Appends a string to the pending input data /// Appends a string to the pending input data
void void
str(string_view s) append(string_view s)
{ {
using boost::asio::buffer; using boost::asio::buffer;
using boost::asio::buffer_copy; using boost::asio::buffer_copy;
@@ -431,18 +272,25 @@ public:
std::size_t std::size_t
nwrite() const nwrite() const
{ {
return out_->nwrite; return in_->nwrite;
} }
/** Close the stream. /** Close the stream.
The other end of the pipe will see `error::eof` The other end of the connection will see
after reading all the data from the buffer. `error::eof` after reading all the remaining data.
*/ */
template<class = void>
void void
close(); close();
/** Close the other end of the stream.
This end of the connection will see
`error::eof` after reading all the remaining data.
*/
void
close_remote();
template<class MutableBufferSequence> template<class MutableBufferSequence>
std::size_t std::size_t
read_some(MutableBufferSequence const& buffers); read_some(MutableBufferSequence const& buffers);
@@ -487,6 +335,36 @@ public:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
inline
void
stream::
close()
{
BOOST_ASSERT(! in_->op);
auto out = out_.lock();
if(! out)
return;
std::lock_guard<std::mutex> lock{out->m};
if(out->code == status::ok)
{
out->code = status::eof;
out->on_write();
}
}
inline
void
stream::
close_remote()
{
std::lock_guard<std::mutex> lock{in_->m};
if(in_->code == status::ok)
{
in_->code = status::eof;
in_->on_write();
}
}
template<class MutableBufferSequence> template<class MutableBufferSequence>
std::size_t std::size_t
stream:: stream::
@@ -597,8 +475,7 @@ async_read_some(
} }
else else
{ {
in_->op.reset(new in_->op.reset(new read_op_impl<handler_type<
detail::stream_impl::read_op_impl<handler_type<
ReadHandler, void(error_code, std::size_t)>, ReadHandler, void(error_code, std::size_t)>,
MutableBufferSequence>{*in_, buffers, MutableBufferSequence>{*in_, buffers,
init.completion_handler}); init.completion_handler});
@@ -615,7 +492,6 @@ write_some(ConstBufferSequence const& buffers)
static_assert(is_const_buffer_sequence< static_assert(is_const_buffer_sequence<
ConstBufferSequence>::value, ConstBufferSequence>::value,
"ConstBufferSequence requirements not met"); "ConstBufferSequence requirements not met");
BOOST_ASSERT(out_->code == status::ok);
error_code ec; error_code ec;
auto const bytes_transferred = auto const bytes_transferred =
write_some(buffers, ec); write_some(buffers, ec);
@@ -635,18 +511,24 @@ write_some(
"ConstBufferSequence requirements not met"); "ConstBufferSequence requirements not met");
using boost::asio::buffer_copy; using boost::asio::buffer_copy;
using boost::asio::buffer_size; using boost::asio::buffer_size;
BOOST_ASSERT(out_->code == status::ok); auto out = out_.lock();
if(! out)
{
ec = boost::asio::error::connection_reset;
return 0;
}
BOOST_ASSERT(out->code == status::ok);
if(in_->fc && in_->fc->fail(ec)) if(in_->fc && in_->fc->fail(ec))
return 0; return 0;
auto const n = (std::min)( auto const n = (std::min)(
buffer_size(buffers), out_->write_max); buffer_size(buffers), in_->write_max);
std::unique_lock<std::mutex> lock{out_->m}; std::unique_lock<std::mutex> lock{out->m};
auto const bytes_transferred = auto const bytes_transferred =
buffer_copy(out_->b.prepare(n), buffers); buffer_copy(out->b.prepare(n), buffers);
out_->b.commit(bytes_transferred); out->b.commit(bytes_transferred);
out_->on_write(); out->on_write();
lock.unlock(); lock.unlock();
++out_->nwrite; ++in_->nwrite;
ec.assign(0, ec.category()); ec.assign(0, ec.category());
return bytes_transferred; return bytes_transferred;
} }
@@ -663,9 +545,14 @@ async_write_some(ConstBufferSequence const& buffers,
"ConstBufferSequence requirements not met"); "ConstBufferSequence requirements not met");
using boost::asio::buffer_copy; using boost::asio::buffer_copy;
using boost::asio::buffer_size; using boost::asio::buffer_size;
BOOST_ASSERT(out_->code == status::ok);
async_completion<WriteHandler, async_completion<WriteHandler,
void(error_code, std::size_t)> init{handler}; void(error_code, std::size_t)> init{handler};
auto out = out_.lock();
if(! out)
return in_->ios.post(
bind_handler(init.completion_handler,
boost::asio::error::connection_reset, 0));
BOOST_ASSERT(out->code == status::ok);
if(in_->fc) if(in_->fc)
{ {
error_code ec; error_code ec;
@@ -674,14 +561,14 @@ async_write_some(ConstBufferSequence const& buffers,
init.completion_handler, ec, 0)); init.completion_handler, ec, 0));
} }
auto const n = auto const n =
(std::min)(buffer_size(buffers), out_->write_max); (std::min)(buffer_size(buffers), in_->write_max);
std::unique_lock<std::mutex> lock{out_->m}; std::unique_lock<std::mutex> lock{out->m};
auto const bytes_transferred = auto const bytes_transferred =
buffer_copy(out_->b.prepare(n), buffers); buffer_copy(out->b.prepare(n), buffers);
out_->b.commit(bytes_transferred); out->b.commit(bytes_transferred);
out_->on_write(); out->on_write();
lock.unlock(); lock.unlock();
++out_->nwrite; ++in_->nwrite;
in_->ios.post(bind_handler(init.completion_handler, in_->ios.post(bind_handler(init.completion_handler,
error_code{}, bytes_transferred)); error_code{}, bytes_transferred));
return init.result.get(); return init.result.get();
@@ -689,8 +576,10 @@ async_write_some(ConstBufferSequence const& buffers,
inline inline
void void
teardown(websocket::role_type, teardown(
stream& s, boost::system::error_code& ec) websocket::role_type,
stream& s,
boost::system::error_code& ec)
{ {
if(s.in_->fc) if(s.in_->fc)
{ {
@@ -707,8 +596,10 @@ teardown(websocket::role_type,
template<class TeardownHandler> template<class TeardownHandler>
inline inline
void void
async_teardown(websocket::role_type, async_teardown(
stream& s, TeardownHandler&& handler) websocket::role_type,
stream& s,
TeardownHandler&& handler)
{ {
error_code ec; error_code ec;
if(s.in_->fc && s.in_->fc->fail(ec)) if(s.in_->fc && s.in_->fc->fail(ec))
@@ -719,18 +610,122 @@ async_teardown(websocket::role_type,
bind_handler(std::move(handler), ec)); bind_handler(std::move(handler), ec));
} }
template<class> //------------------------------------------------------------------------------
void
stream:: template<class Handler, class Buffers>
close() class stream::read_op_impl : public stream::read_op
{ {
BOOST_ASSERT(! in_->op); class lambda
std::lock_guard<std::mutex> lock{out_->m}; {
if(out_->code == status::ok) state& s_;
Buffers b_;
Handler h_;
boost::optional<
boost::asio::io_service::work> work_;
public:
lambda(lambda&&) = default;
lambda(lambda const&) = default;
lambda(state& s, Buffers const& b, Handler&& h)
: s_(s)
, b_(b)
, h_(std::move(h))
, work_(s_.ios)
{
}
lambda(state& s, Buffers const& b, Handler const& h)
: s_(s)
, b_(b)
, h_(h)
, work_(s_.ios)
{
}
void
post()
{
s_.ios.post(std::move(*this));
work_ = boost::none;
}
void
operator()()
{
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
std::unique_lock<std::mutex> lock{s_.m};
BOOST_ASSERT(! s_.op);
if(s_.b.size() > 0)
{
auto const bytes_transferred = buffer_copy(
b_, s_.b.data(), s_.read_max);
s_.b.consume(bytes_transferred);
auto& s = s_;
Handler h{std::move(h_)};
lock.unlock();
++s.nread;
s.ios.post(bind_handler(std::move(h),
error_code{}, bytes_transferred));
}
else
{
BOOST_ASSERT(s_.code != status::ok);
auto& s = s_;
Handler h{std::move(h_)};
lock.unlock();
++s.nread;
error_code ec;
if(s.code == status::eof)
ec = boost::asio::error::eof;
else if(s.code == status::reset)
ec = boost::asio::error::connection_reset;
s.ios.post(bind_handler(std::move(h), ec, 0));
}
}
};
lambda fn_;
public:
read_op_impl(state& s, Buffers const& b, Handler&& h)
: fn_(s, b, std::move(h))
{ {
out_->code = status::eof;
out_->on_write();
} }
read_op_impl(state& s, Buffers const& b, Handler const& h)
: fn_(s, b, h)
{
}
void
operator()() override
{
fn_.post();
}
};
/// Create and return a connected stream
inline
stream
connect(stream& to)
{
stream from{to.get_io_service()};
from.connect(to);
return from;
}
/// Create and return a connected stream
template<class Arg1, class... ArgN>
stream
connect(stream& to, Arg1&& arg1, ArgN&&... argn)
{
stream from{
std::forward<Arg1>(arg1),
std::forward<ArgN>(argn)...};
from.connect(to);
return from;
} }
} // test } // test