From 812a19706a5b8203462a2586f5667c4999766029 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 6 Feb 2019 14:40:48 -0800 Subject: [PATCH] Improvements to test::stream: The behavior of the test stream when either end is destroyed or closed is well-defined. --- CHANGELOG.md | 1 + .../beast/_experimental/test/impl/stream.hpp | 510 ++++++++++-------- .../boost/beast/_experimental/test/stream.hpp | 34 +- test/beast/experimental/stream.cpp | 123 +++++ 4 files changed, 423 insertions(+), 245 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7a9dade..332cb80d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ Version 211: * close_socket is in stream_traits.hpp +* Improvements to test::stream -------------------------------------------------------------------------------- diff --git a/include/boost/beast/_experimental/test/impl/stream.hpp b/include/boost/beast/_experimental/test/impl/stream.hpp index 0b6bf0ec..bba5574f 100644 --- a/include/boost/beast/_experimental/test/impl/stream.hpp +++ b/include/boost/beast/_experimental/test/impl/stream.hpp @@ -10,30 +10,146 @@ #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP #define BOOST_BEAST_TEST_IMPL_STREAM_HPP +#include #include #include +#include namespace boost { namespace beast { namespace test { +//------------------------------------------------------------------------------ + +template +class stream::read_op : public stream::read_op_base +{ + using ex1_type = + net::io_context::executor_type; + using ex2_type + = net::associated_executor_t; + + class lambda + { + state& s_; + Buffers b_; + Handler h_; + net::executor_work_guard wg2_; + + public: + lambda(lambda&&) = default; + lambda(lambda const&) = default; + + template + lambda(state& s, Buffers const& b, DeducedHandler&& h) + : s_(s) + , b_(b) + , h_(std::forward(h)) + , wg2_(net::get_associated_executor( + h_, s_.ioc.get_executor())) + { + } + + void + operator()(bool cancel) + { + error_code ec; + std::size_t bytes_transferred = 0; + if(cancel) + { + ec = net::error::operation_aborted; + } + else + { + std::lock_guard lock(s_.m); + BOOST_ASSERT(! s_.op); + if(s_.b.size() > 0) + { + bytes_transferred = + net::buffer_copy( + b_, s_.b.data(), s_.read_max); + s_.b.consume(bytes_transferred); + } + else + { + ec = net::error::eof; + } + } + auto alloc = net::get_associated_allocator(h_); + wg2_.get_executor().dispatch( + beast::bind_front_handler(std::move(h_), + ec, bytes_transferred), alloc); + wg2_.reset(); + } + }; + + lambda fn_; + net::executor_work_guard wg1_; + +public: + template + read_op(state& s, + Buffers const& b, Handler_&& h) + : fn_(s, b, std::forward(h)) + , wg1_(s.ioc.get_executor()) + { + } + + void + operator()(bool cancel) override + { + net::post( + wg1_.get_executor(), + bind_handler( + std::move(fn_), + cancel)); + wg1_.reset(); + } +}; + +//------------------------------------------------------------------------------ + +stream:: +state:: +state( + net::io_context& ioc_, + fail_count* fc_) + : ioc(ioc_) + , fc(fc_) +{ +} + +stream:: +state:: +~state() +{ + // cancel outstanding read + if(op != nullptr) + (*op)(true); +} + +void +stream:: +state:: +notify_read() +{ + if(op) + { + auto op_ = std::move(op); + op_->operator()(); + } + else + { + cv.notify_all(); + } +} + +//------------------------------------------------------------------------------ + stream:: ~stream() { - { - std::unique_lock lock{in_->m}; - in_->op.reset(); - } - auto out = out_.lock(); - if(out) - { - std::unique_lock lock{out->m}; - if(out->code == status::ok) - { - out->code = status::reset; - out->on_write(); - } - } + close(); } stream:: @@ -50,6 +166,7 @@ stream& stream:: operator=(stream&& other) { + close(); auto in = std::make_shared( other.in_->ioc, other.in_->fc); in_ = std::move(other.in_); @@ -58,6 +175,8 @@ operator=(stream&& other) return *this; } +//------------------------------------------------------------------------------ + stream:: stream(net::io_context& ioc) : in_(std::make_shared(ioc, nullptr)) @@ -103,6 +222,8 @@ connect(stream& remote) BOOST_ASSERT(! remote.out_.lock()); out_ = remote.in_; remote.out_ = in_; + in_->code = status::ok; + remote.in_->code = status::ok; } string_view @@ -138,15 +259,33 @@ void stream:: close() { - BOOST_ASSERT(! in_->op); - auto out = out_.lock(); - if(! out) - return; - std::lock_guard lock{out->m}; - if(out->code == status::ok) + // cancel outstanding read { - out->code = status::eof; - out->on_write(); + std::unique_ptr op; + { + std::lock_guard lock(in_->m); + in_->code = status::eof; + op = std::move(in_->op); + } + if(op != nullptr) + (*op)(true); + } + + // disconnect + { + auto out = out_.lock(); + out_.reset(); + + // notify peer + if(out) + { + std::lock_guard lock(out->m); + if(out->code == status::ok) + { + out->code = status::eof; + out->notify_read(); + } + } } } @@ -158,7 +297,7 @@ close_remote() if(in_->code == status::ok) { in_->code = status::eof; - in_->on_write(); + in_->notify_read(); } } @@ -186,13 +325,20 @@ read_some(MutableBufferSequence const& buffers, static_assert(net::is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence requirements not met"); + + ++in_->nread; + + // test failure if(in_->fc && in_->fc->fail(ec)) return 0; + + // 0-byte reads are no-ops if(buffer_size(buffers) == 0) { ec.clear(); return 0; } + std::unique_lock lock{in_->m}; BOOST_ASSERT(! in_->op); in_->cv.wait(lock, @@ -202,25 +348,20 @@ read_some(MutableBufferSequence const& buffers, in_->b.size() > 0 || in_->code != status::ok; }); - std::size_t bytes_transferred; + + // deliver bytes before eof if(in_->b.size() > 0) { - ec = {}; - bytes_transferred = net::buffer_copy( + auto const n = net::buffer_copy( buffers, in_->b.data(), in_->read_max); - in_->b.consume(bytes_transferred); + in_->b.consume(n); + return n; } - else - { - BOOST_ASSERT(in_->code != status::ok); - bytes_transferred = 0; - if(in_->code == status::eof) - ec = net::error::eof; - else if(in_->code == status::reset) - ec = net::error::connection_reset; - } - ++in_->nread; - return bytes_transferred; + + // deliver error + BOOST_ASSERT(in_->code != status::ok); + ec = net::error::eof; + return 0; } template @@ -234,8 +375,18 @@ async_read_some( static_assert(net::is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence requirements not met"); + BOOST_BEAST_HANDLER_INIT( ReadHandler, void(error_code, std::size_t)); + + ++in_->nread; + + std::unique_lock lock(in_->m); + if(in_->op != nullptr) + throw std::logic_error( + "in_->op != nullptr"); + + // test failure error_code ec; if(in_->fc && in_->fc->fail(ec)) { @@ -243,51 +394,55 @@ async_read_some( in_->ioc.get_executor(), beast::bind_front_handler( std::move(init.completion_handler), - ec, - 0)); + ec, std::size_t{0})); + return init.result.get(); } - else + + // 0-byte reads are no-ops + if(buffer_size(buffers) == 0) { - std::unique_lock lock{in_->m}; - BOOST_ASSERT(! in_->op); - if( buffer_size(buffers) == 0 || - buffer_size(in_->b.data()) > 0) - { - auto const bytes_transferred = net::buffer_copy( - buffers, in_->b.data(), in_->read_max); - in_->b.consume(bytes_transferred); - lock.unlock(); - ++in_->nread; - net::post( - in_->ioc.get_executor(), - beast::bind_front_handler( - std::move(init.completion_handler), - error_code{}, - bytes_transferred)); - } - else if(in_->code != status::ok) - { - lock.unlock(); - ++in_->nread; - if(in_->code == status::eof) - ec = net::error::eof; - else if(in_->code == status::reset) - ec = net::error::connection_reset; - net::post( - in_->ioc.get_executor(), - beast::bind_front_handler( - std::move(init.completion_handler), - ec, - 0)); - } - else - { - in_->op.reset(new read_op{*in_, buffers, - std::move(init.completion_handler)}); - } + lock.unlock(); + net::post( + in_->ioc.get_executor(), + beast::bind_front_handler( + std::move(init.completion_handler), + ec, std::size_t{0})); + return init.result.get(); } + + // deliver bytes before eof + if(buffer_size(in_->b.data()) > 0) + { + auto n = net::buffer_copy( + buffers, in_->b.data(), in_->read_max); + in_->b.consume(n); + lock.unlock(); + net::post( + in_->ioc.get_executor(), + beast::bind_front_handler( + std::move(init.completion_handler), + ec, n)); + return init.result.get(); + } + + // deliver error + if(in_->code != status::ok) + { + lock.unlock(); + ec = net::error::eof; + net::post( + in_->ioc.get_executor(), + beast::bind_front_handler( + std::move(init.completion_handler), + ec, std::size_t{0})); + return init.result.get(); + } + + // complete when bytes available or closed + in_->op.reset(new read_op{*in_, buffers, + std::move(init.completion_handler)}); return init.result.get(); } @@ -316,26 +471,31 @@ write_some( static_assert(net::is_const_buffer_sequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); + + ++in_->nwrite; + + // connection closed auto out = out_.lock(); if(! out) { ec = net::error::connection_reset; return 0; } - BOOST_ASSERT(out->code == status::ok); + + // test failure if(in_->fc && in_->fc->fail(ec)) return 0; - auto const n = std::min( + + // copy buffers + auto n = std::min( buffer_size(buffers), in_->write_max); - std::unique_lock lock{out->m}; - auto const bytes_transferred = - net::buffer_copy(out->b.prepare(n), buffers); - out->b.commit(bytes_transferred); - out->on_write(); - lock.unlock(); - ++in_->nwrite; - ec = {}; - return bytes_transferred; + { + std::lock_guard lock(out->m); + n = net::buffer_copy(out->b.prepare(n), buffers); + out->b.commit(n); + out->notify_read(); + } + return n; } template @@ -350,6 +510,10 @@ async_write_some(ConstBufferSequence const& buffers, "ConstBufferSequence requirements not met"); BOOST_BEAST_HANDLER_INIT( WriteHandler, void(error_code, std::size_t)); + + ++in_->nwrite; + + // connection closed auto out = out_.lock(); if(! out) { @@ -358,49 +522,46 @@ async_write_some(ConstBufferSequence const& buffers, beast::bind_front_handler( std::move(init.completion_handler), net::error::connection_reset, - 0)); + std::size_t{0})); + return init.result.get(); } - else + + // test failure + error_code ec; + if(in_->fc && in_->fc->fail(ec)) { - BOOST_ASSERT(out->code == status::ok); - error_code ec; - if(in_->fc && in_->fc->fail(ec)) - { - net::post( - in_->ioc.get_executor(), - beast::bind_front_handler( - std::move(init.completion_handler), - ec, - 0)); - } - else - { - auto const n = std::min( - buffer_size(buffers), in_->write_max); - std::unique_lock lock{out->m}; - auto const bytes_transferred = - net::buffer_copy(out->b.prepare(n), buffers); - out->b.commit(bytes_transferred); - out->on_write(); - lock.unlock(); - ++in_->nwrite; - net::post( - in_->ioc.get_executor(), - beast::bind_front_handler( - std::move(init.completion_handler), - error_code{}, - bytes_transferred)); - } + net::post( + in_->ioc.get_executor(), + beast::bind_front_handler( + std::move(init.completion_handler), + ec, + std::size_t{0})); + return init.result.get(); } + // copy buffers + auto n = std::min( + buffer_size(buffers), in_->write_max); + { + std::lock_guard lock(out->m); + n = net::buffer_copy(out->b.prepare(n), buffers); + out->b.commit(n); + out->notify_read(); + } + net::post( + in_->ioc.get_executor(), + beast::bind_front_handler( + std::move(init.completion_handler), + error_code{}, n)); + return init.result.get(); } void teardown( -websocket::role_type, -stream& s, -boost::system::error_code& ec) + websocket::role_type, + stream& s, + boost::system::error_code& ec) { if( s.in_->fc && s.in_->fc->fail(ec)) @@ -412,15 +573,15 @@ boost::system::error_code& ec) s.in_->fc->fail(ec)) ec = net::error::eof; else - ec = {}; + ec.clear(); } template void async_teardown( -websocket::role_type, -stream& s, -TeardownHandler&& handler) + websocket::role_type, + stream& s, + TeardownHandler&& handler) { error_code ec; if( s.in_->fc && @@ -434,7 +595,7 @@ TeardownHandler&& handler) s.in_->fc->fail(ec)) ec = net::error::eof; else - ec = {}; + ec.clear(); net::post( s.get_executor(), @@ -442,95 +603,6 @@ TeardownHandler&& handler) std::move(handler), ec)); } -//------------------------------------------------------------------------------ - -template -class stream::read_op : public stream::read_op_base -{ - using ex1_type = - net::io_context::executor_type; - using ex2_type - = net::associated_executor_t; - - class lambda - { - state& s_; - Buffers b_; - Handler h_; - net::executor_work_guard wg2_; - - public: - lambda(lambda&&) = default; - lambda(lambda const&) = default; - - template - lambda(state& s, Buffers const& b, DeducedHandler&& h) - : s_(s) - , b_(b) - , h_(std::forward(h)) - , wg2_(net::get_associated_executor( - h_, s_.ioc.get_executor())) - { - } - - void - operator()() - { - std::unique_lock lock{s_.m}; - error_code ec; - std::size_t bytes_transferred = 0; - BOOST_ASSERT(! s_.op); - if(s_.b.size() > 0) - { - bytes_transferred = - net::buffer_copy( - b_, s_.b.data(), s_.read_max); - s_.b.consume(bytes_transferred); - ++s_.nread; - } - else - { - BOOST_ASSERT(s_.code != status::ok); - auto& s = s_; - ++s.nread; - if(s.code == status::eof) - ec = net::error::eof; - else if(s.code == status::reset) - ec = net::error::connection_reset; - - } - lock.unlock(); - auto alloc = net::get_associated_allocator(h_); - wg2_.get_executor().dispatch( - beast::bind_front_handler( - std::move(h_), - ec, - bytes_transferred), alloc); - wg2_.reset(); - } - }; - - lambda fn_; - net::executor_work_guard wg1_; - -public: - template - read_op(state& s, Buffers const& b, DeducedHandler&& h) - : fn_(s, b, std::forward(h)) - , wg1_(s.ioc.get_executor()) - { - } - - void - operator()() override - { - net::post( - wg1_.get_executor(), - std::move(fn_)); - wg1_.reset(); - } -}; - stream connect(stream& to) { diff --git a/include/boost/beast/_experimental/test/stream.hpp b/include/boost/beast/_experimental/test/stream.hpp index 37e1a53e..d8095f66 100644 --- a/include/boost/beast/_experimental/test/stream.hpp +++ b/include/boost/beast/_experimental/test/stream.hpp @@ -98,7 +98,7 @@ class stream struct read_op_base { virtual ~read_op_base() = default; - virtual void operator()() = 0; + virtual void operator()(bool cancel = false) = 0; }; template @@ -108,7 +108,6 @@ class stream { ok, eof, - reset }; struct state @@ -129,33 +128,16 @@ class stream std::size_t write_max = (std::numeric_limits::max)(); - ~state() - { - BOOST_ASSERT(! op); - } - + BOOST_BEAST_DECL explicit - state( - net::io_context& ioc_, - fail_count* fc_) - : ioc(ioc_) - , fc(fc_) - { - } + state(net::io_context& ioc_, fail_count* fc_); + BOOST_BEAST_DECL + ~state(); + + BOOST_BEAST_DECL void - on_write() - { - if(op) - { - std::unique_ptr op_ = std::move(op); - op_->operator()(); - } - else - { - cv.notify_all(); - } - } + notify_read(); }; std::shared_ptr in_; diff --git a/test/beast/experimental/stream.cpp b/test/beast/experimental/stream.cpp index 5c9ef371..93066636 100644 --- a/test/beast/experimental/stream.cpp +++ b/test/beast/experimental/stream.cpp @@ -9,3 +9,126 @@ // Test that header file is self-contained. #include + +#include + +namespace boost { +namespace beast { + +class test_stream_test + : public unit_test::suite +{ +public: + void + testTestStream() + { + net::io_context ioc; + char buf[1] = {}; + net::mutable_buffer m0; + net::mutable_buffer m1(buf, sizeof(buf)); + + { + { + test::stream ts(ioc); + } + { + test::stream ts(ioc); + ts.close(); + } + { + test::stream t1(ioc); + auto t2 = connect(t1); + } + { + test::stream t1(ioc); + auto t2 = connect(t1); + t2.close(); + } + { +#if 0 + test::stream ts(ioc); + error_code ec; + ts.read_some(net::mutable_buffer{}, ec); + log << ec.message(); +#endif + } + { + error_code ec; + { + test::stream ts(ioc); + ts.async_read_some(m1, + [&](error_code ec_, std::size_t) + { + ec = ec_; + }); + } + ioc.run(); + ioc.restart(); + BEAST_EXPECTS( + //ec == net::error::eof, + ec == net::error::operation_aborted, + ec.message()); + } + { + error_code ec; + test::stream ts(ioc); + ts.async_read_some(m1, + [&](error_code ec_, std::size_t) + { + ec = ec_; + }); + ts.close(); + ioc.run(); + ioc.restart(); + BEAST_EXPECTS( + //ec == net::error::eof, + ec == net::error::operation_aborted, + ec.message()); + } + { + error_code ec; + test::stream t1(ioc); + auto t2 = connect(t1); + t1.async_read_some(m1, + [&](error_code ec_, std::size_t) + { + ec = ec_; + }); + t2.close(); + ioc.run(); + ioc.restart(); + BEAST_EXPECTS( + ec == net::error::eof, + ec.message()); + } + { + error_code ec; + test::stream t1(ioc); + auto t2 = connect(t1); + t1.async_read_some(m1, + [&](error_code ec_, std::size_t) + { + ec = ec_; + }); + t1.close(); + ioc.run(); + ioc.restart(); + BEAST_EXPECTS( + ec == net::error::operation_aborted, + ec.message()); + } + } + } + + void + run() override + { + testTestStream(); + pass(); + } +}; + +BEAST_DEFINE_TESTSUITE(beast,test,test_stream); + +} // beast +} // boost