From 6e3c63e685cddb8d629619a171561dd1492fa8d0 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sat, 29 Jul 2017 17:47:04 -0700 Subject: [PATCH] Add test::stream --- CHANGELOG.md | 1 + extras/boost/beast/test/pipe_stream.hpp | 5 +- extras/boost/beast/test/stream.hpp | 625 ++++++++++++++++++++++++ test/beast/http/read.cpp | 32 +- test/beast/websocket/stream.cpp | 6 + 5 files changed, 653 insertions(+), 16 deletions(-) create mode 100644 extras/boost/beast/test/stream.hpp diff --git a/CHANGELOG.md b/CHANGELOG.md index c28b7f9f..85503869 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Version 95: * Move scripts to build/ * Fix race in test::pipe * close on test::pipe teardown +* Add test::stream -------------------------------------------------------------------------------- diff --git a/extras/boost/beast/test/pipe_stream.hpp b/extras/boost/beast/test/pipe_stream.hpp index d7defc9e..e58b53d9 100644 --- a/extras/boost/beast/test/pipe_stream.hpp +++ b/extras/boost/beast/test/pipe_stream.hpp @@ -109,12 +109,13 @@ public: stream(stream&&) = default; /// Set the fail counter on the object +#if 0 void fail(fail_counter& fc) { fc_ = &fc; } - +#endif /// Return the `io_service` associated with the object boost::asio::io_service& get_io_service() @@ -253,7 +254,7 @@ async_teardown(websocket::teardown_tag, if(s.fc_ && s.fc_->fail(ec)) return s.get_io_service().post( bind_handler(std::move(handler), ec)); - close(); + s.close(); s.get_io_service().post( bind_handler(std::move(handler), ec)); } diff --git a/extras/boost/beast/test/stream.hpp b/extras/boost/beast/test/stream.hpp new file mode 100644 index 00000000..9f09c48d --- /dev/null +++ b/extras/boost/beast/test/stream.hpp @@ -0,0 +1,625 @@ +// +// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_TEST_STREAM_HPP +#define BOOST_BEAST_TEST_STREAM_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace beast { +namespace test { + +class stream; + +namespace detail { + +class stream_impl +{ + friend class boost::beast::test::stream; + + using buffer_type = flat_buffer; + + struct read_op + { + virtual ~read_op() = default; + virtual void operator()() = 0; + }; + + template + class read_op_impl; + + struct state + { + std::mutex m; + buffer_type b; + std::condition_variable cv; + std::unique_ptr op; + boost::asio::io_service& ios; + bool eof = false; + fail_counter* fc = nullptr; + std::size_t nread = 0; + std::size_t nwrite = 0; + std::size_t read_max = + (std::numeric_limits::max)(); + std::size_t write_max = + (std::numeric_limits::max)(); + + explicit + state( + boost::asio::io_service& ios_, + fail_counter* fc_) + : ios(ios_) + , fc(fc_) + { + } + + friend class stream; + }; + + state s0_; + state s1_; + +public: + stream_impl( + boost::asio::io_service& ios, + fail_counter* fc) + : s0_(ios, fc) + , s1_(ios, fc) + { + } +}; + +template +class stream_impl::read_op_impl : public stream_impl::read_op +{ + state& s_; + Buffers b_; + Handler h_; + +public: + read_op_impl(state& s, + Buffers const& b, Handler&& h) + : s_(s) + , b_(b) + , h_(std::move(h)) + { + } + + read_op_impl(state& s, + Buffers const& b, Handler const& h) + : s_(s) + , b_(b) + , h_(h) + { + } + + void + operator()() override; +}; + +template +void +stream_impl:: +read_op_impl:: +operator()() +{ + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + s_.ios.post( + [&]() + { + BOOST_ASSERT(s_.op); + std::unique_lock lock{s_.m}; + if(s_.b.size() > 0) + { + auto const bytes_transferred = buffer_copy( + b_, s_.b.data(), s_.read_max); + s_.b.consume(bytes_transferred); + auto& s = s_; + Handler h{std::move(h_)}; + s.op.reset(nullptr); + lock.unlock(); + ++s.nread; + s.ios.post(bind_handler(std::move(h), + error_code{}, bytes_transferred)); + } + else + { + BOOST_ASSERT(s_.eof); + auto& s = s_; + Handler h{std::move(h_)}; + s.op.reset(nullptr); + lock.unlock(); + ++s.nread; + s.ios.post(bind_handler(std::move(h), + boost::asio::error::eof, 0)); + } + }); +} + +} // detail + +//------------------------------------------------------------------------------ + +/** A bidirectional in-memory communication channel + + An instance of this class provides a client and server + endpoint that are automatically connected to each other + similarly to a connected socket. + + Test pipes are used to facilitate writing unit tests + where the behavior of the transport is tightly controlled + to help illuminate all code paths (for code coverage) +*/ +class stream +{ + std::shared_ptr impl_; + detail::stream_impl::state& in_; + detail::stream_impl::state& out_; + + explicit + stream(std::shared_ptr< + detail::stream_impl> const& impl) + : impl_(impl) + , in_(impl_->s1_) + , out_(impl_->s0_) + { + } + +public: + using buffer_type = flat_buffer; + + ~stream() = default; + stream(stream&&) = default; + stream& operator=(stream const&) = delete; + + /// Constructor + explicit + stream( + boost::asio::io_service& ios) + : impl_(std::make_shared< + detail::stream_impl>(ios, nullptr)) + , in_(impl_->s0_) + , out_(impl_->s1_) + { + } + + /// Constructor + explicit + stream( + boost::asio::io_service& ios, + fail_counter& fc) + : impl_(std::make_shared< + detail::stream_impl>(ios, &fc)) + , in_(impl_->s0_) + , out_(impl_->s1_) + { + } + + /// Constructor + stream( + boost::asio::io_service& ios, + string_view s) + : impl_(std::make_shared< + detail::stream_impl>(ios, nullptr)) + , in_(impl_->s0_) + , out_(impl_->s1_) + { + using boost::asio::buffer; + using boost::asio::buffer_copy; + in_.b.commit(buffer_copy( + in_.b.prepare(s.size()), + buffer(s.data(), s.size()))); + } + + /// Constructor + stream( + boost::asio::io_service& ios, + fail_counter& fc, + string_view s) + : impl_(std::make_shared< + detail::stream_impl>(ios, &fc)) + , in_(impl_->s0_) + , out_(impl_->s1_) + { + using boost::asio::buffer; + using boost::asio::buffer_copy; + in_.b.commit(buffer_copy( + in_.b.prepare(s.size()), + buffer(s.data(), s.size()))); + } + + /// Return the other end of the connection + stream + remote() + { + BOOST_ASSERT(&in_ == &impl_->s0_); + return stream{impl_}; + } + + /// Return the `io_service` associated with the object + boost::asio::io_service& + get_io_service() + { + return in_.ios; + } + + /// Set the maximum number of bytes returned by read_some + void + read_size(std::size_t n) + { + in_.read_max = n; + } + + /// Set the maximum number of bytes returned by write_some + void + write_size(std::size_t n) + { + out_.write_max = n; + } + + /// Direct input buffer access + buffer_type& + buffer() + { + return in_.b; + } + + /// Returns a string view representing the pending input data + string_view + str() const + { + using boost::asio::buffer_cast; + using boost::asio::buffer_size; + return { + buffer_cast(*in_.b.data().begin()), + buffer_size(*in_.b.data().begin())}; + } + + /// Clear the buffer holding the input data + /* + void + clear() + { + in_.b.consume((std::numeric_limits< + std::size_t>::max)()); + } + */ + + /// Return the number of reads + std::size_t + nread() const + { + return in_.nread; + } + + /// Return the number of writes + std::size_t + nwrite() const + { + return out_.nwrite; + } + + /** Close the stream. + + The other end of the pipe will see `error::eof` + after reading all the data from the buffer. + */ + template + void + close(); + + template + std::size_t + read_some(MutableBufferSequence const& buffers); + + template + std::size_t + read_some(MutableBufferSequence const& buffers, + error_code& ec); + + template + async_return_type< + ReadHandler, void(error_code, std::size_t)> + async_read_some(MutableBufferSequence const& buffers, + ReadHandler&& handler); + + template + std::size_t + write_some(ConstBufferSequence const& buffers); + + template + std::size_t + write_some( + ConstBufferSequence const& buffers, error_code&); + + template + async_return_type< + WriteHandler, void(error_code, std::size_t)> + async_write_some(ConstBufferSequence const& buffers, + WriteHandler&& handler); + + friend + void + teardown(websocket::teardown_tag, + stream& s, boost::system::error_code& ec); + + template + friend + void + async_teardown(websocket::teardown_tag, + stream& s, TeardownHandler&& handler); +}; + +//------------------------------------------------------------------------------ + +template +std::size_t +stream:: +read_some(MutableBufferSequence const& buffers) +{ + static_assert(is_mutable_buffer_sequence< + MutableBufferSequence>::value, + "MutableBufferSequence requirements not met"); + error_code ec; + auto const n = read_some(buffers, ec); + if(ec) + BOOST_THROW_EXCEPTION(system_error{ec}); + return n; +} + +template +std::size_t +stream:: +read_some(MutableBufferSequence const& buffers, + error_code& ec) +{ + static_assert(is_mutable_buffer_sequence< + MutableBufferSequence>::value, + "MutableBufferSequence requirements not met"); + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + BOOST_ASSERT(! in_.op); + BOOST_ASSERT(buffer_size(buffers) > 0); + if(in_.fc && in_.fc->fail(ec)) + return 0; + std::unique_lock lock{in_.m}; + in_.cv.wait(lock, + [&]() + { + return in_.b.size() > 0 || in_.eof; + }); + std::size_t bytes_transferred; + if(in_.b.size() > 0) + { + ec.assign(0, ec.category()); + bytes_transferred = buffer_copy( + buffers, in_.b.data(), in_.read_max); + in_.b.consume(bytes_transferred); + } + else + { + BOOST_ASSERT(in_.eof); + bytes_transferred = 0; + ec = boost::asio::error::eof; + } + ++in_.nread; + return bytes_transferred; +} + +template +async_return_type< + ReadHandler, void(error_code, std::size_t)> +stream:: +async_read_some(MutableBufferSequence const& buffers, + ReadHandler&& handler) +{ + static_assert(is_mutable_buffer_sequence< + MutableBufferSequence>::value, + "MutableBufferSequence requirements not met"); + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + BOOST_ASSERT(! in_.op); + BOOST_ASSERT(buffer_size(buffers) > 0); + async_completion init{handler}; + if(in_.fc) + { + error_code ec; + if(in_.fc->fail(ec)) + return in_.ios.post(bind_handler( + init.completion_handler, ec, 0)); + } + { + std::unique_lock lock{in_.m}; + if(in_.eof) + { + lock.unlock(); + ++in_.nread; + in_.ios.post(bind_handler(init.completion_handler, + boost::asio::error::eof, 0)); + } + else if(buffer_size(buffers) == 0 || + buffer_size(in_.b.data()) > 0) + { + auto const bytes_transferred = buffer_copy( + buffers, in_.b.data(), in_.read_max); + in_.b.consume(bytes_transferred); + lock.unlock(); + ++in_.nread; + in_.ios.post(bind_handler(init.completion_handler, + error_code{}, bytes_transferred)); + } + else + { + in_.op.reset(new + detail::stream_impl::read_op_impl, + MutableBufferSequence>{in_, buffers, + init.completion_handler}); + } + } + return init.result.get(); +} + +template +std::size_t +stream:: +write_some(ConstBufferSequence const& buffers) +{ + static_assert(is_const_buffer_sequence< + ConstBufferSequence>::value, + "ConstBufferSequence requirements not met"); + BOOST_ASSERT(! out_.eof); + error_code ec; + auto const bytes_transferred = + write_some(buffers, ec); + if(ec) + BOOST_THROW_EXCEPTION(system_error{ec}); + return bytes_transferred; +} + +template +std::size_t +stream:: +write_some( + ConstBufferSequence const& buffers, error_code& ec) +{ + static_assert(is_const_buffer_sequence< + ConstBufferSequence>::value, + "ConstBufferSequence requirements not met"); + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + BOOST_ASSERT(! out_.eof); + if(in_.fc && in_.fc->fail(ec)) + return 0; + auto const n = (std::min)( + buffer_size(buffers), out_.write_max); + std::unique_lock lock{out_.m}; + auto const bytes_transferred = + buffer_copy(out_.b.prepare(n), buffers); + out_.b.commit(bytes_transferred); + if(out_.op) + out_.op.get()->operator()(); + else + out_.cv.notify_all(); + lock.unlock(); + ++out_.nwrite; + ec.assign(0, ec.category()); + return bytes_transferred; +} + +template +async_return_type< + WriteHandler, void(error_code, std::size_t)> +stream:: +async_write_some(ConstBufferSequence const& buffers, + WriteHandler&& handler) +{ + static_assert(is_const_buffer_sequence< + ConstBufferSequence>::value, + "ConstBufferSequence requirements not met"); + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + BOOST_ASSERT(! out_.eof); + async_completion init{handler}; + if(in_.fc) + { + error_code ec; + if(in_.fc->fail(ec)) + return in_.ios.post(bind_handler( + init.completion_handler, ec, 0)); + } + auto const n = + (std::min)(buffer_size(buffers), out_.write_max); + std::unique_lock lock{out_.m}; + auto const bytes_transferred = + buffer_copy(out_.b.prepare(n), buffers); + out_.b.commit(bytes_transferred); + if(out_.op) + out_.op.get()->operator()(); + else + out_.cv.notify_all(); + lock.unlock(); + ++out_.nwrite; + in_.ios.post(bind_handler(init.completion_handler, + error_code{}, bytes_transferred)); + return init.result.get(); +} + +inline +void +teardown(websocket::teardown_tag, + stream& s, boost::system::error_code& ec) +{ + if(s.in_.fc) + { + if(s.in_.fc->fail(ec)) + return; + } + else + { + s.close(); + ec.assign(0, ec.category()); + } +} + +template +inline +void +async_teardown(websocket::teardown_tag, + stream& s, TeardownHandler&& handler) +{ + error_code ec; + if(s.in_.fc && s.in_.fc->fail(ec)) + return s.get_io_service().post( + bind_handler(std::move(handler), ec)); + s.close(); + s.get_io_service().post( + bind_handler(std::move(handler), ec)); +} + +template +void +stream:: +close() +{ + std::lock_guard lock{out_.m}; + if(! out_.eof) + { + out_.eof = true; + if(out_.op) + out_.op.get()->operator()(); + else + out_.cv.notify_all(); + } +} + +} // test +} // beast +} // boost + +#endif diff --git a/test/beast/http/read.cpp b/test/beast/http/read.cpp index 245efb94..e9961c24 100644 --- a/test/beast/http/read.cpp +++ b/test/beast/http/read.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -113,9 +114,10 @@ public: try { multi_buffer b; - test::string_istream ss(ios_, "GET / X"); + test::stream c{ios_, "GET / X"}; + c.remote().close(); request_parser p; - read(ss, b, p); + read(c, b, p); fail(); } catch(std::exception const&) @@ -128,8 +130,8 @@ public: testBufferOverflow() { { - test::pipe p{ios_}; - ostream(p.server.buffer) << + test::stream c{ios_}; + ostream(c.buffer()) << "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "User-Agent: test\r\n" @@ -142,7 +144,7 @@ public: request req; try { - read(p.server, b, req); + read(c, b, req); pass(); } catch(std::exception const& e) @@ -151,8 +153,8 @@ public: } } { - test::pipe p{ios_}; - ostream(p.server.buffer) << + test::stream c{ios_}; + ostream(c.buffer()) << "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "User-Agent: test\r\n" @@ -164,7 +166,7 @@ public: error_code ec = test::error::fail_error; flat_static_buffer<10> b; request req; - read(p.server, b, req, ec); + read(c, b, req, ec); BEAST_EXPECTS(ec == error::buffer_overflow, ec.message()); } @@ -239,18 +241,19 @@ public: for(n = 0; n < limit; ++n) { - test::fail_stream fs(n, ios_, + test::fail_counter fc{n}; + test::stream c{ios_, fc, "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "User-Agent: test\r\n" "Content-Length: 0\r\n" "\r\n" - ); + }; request m; try { multi_buffer b; - read(fs, b, m); + read(c, b, m); break; } catch(std::exception const&) @@ -279,17 +282,18 @@ public: for(n = 0; n < limit; ++n) { - test::fail_stream fs(n, ios_, + test::fail_counter fc{n}; + test::stream c{ios_, fc, "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "User-Agent: test\r\n" "Content-Length: 0\r\n" "\r\n" - ); + }; request m; error_code ec = test::error::fail_error; multi_buffer b; - async_read(fs, b, m, do_yield[ec]); + async_read(c, b, m, do_yield[ec]); if(! ec) break; } diff --git a/test/beast/websocket/stream.cpp b/test/beast/websocket/stream.cpp index cdcf786d..93bb7c6e 100644 --- a/test/beast/websocket/stream.cpp +++ b/test/beast/websocket/stream.cpp @@ -1838,6 +1838,12 @@ public: BEAST_EXPECT(n < limit); } + void + testPipe() + { + //test::stream + } + void run() override {