From c510662c67003b3447c57b08281ba54d058f3179 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 26 Feb 2019 18:55:23 -0800 Subject: [PATCH] Add test::stream::service --- CHANGELOG.md | 6 + .../beast/_experimental/test/impl/stream.hpp | 73 ++++++-- .../beast/_experimental/test/impl/stream.ipp | 114 +++++++++++-- .../boost/beast/_experimental/test/stream.hpp | 33 +++- .../boost/beast/core/detail/service_base.hpp | 43 +++++ test/beast/_experimental/stream.cpp | 160 ++++++++++-------- 6 files changed, 318 insertions(+), 111 deletions(-) create mode 100644 include/boost/beast/core/detail/service_base.hpp diff --git a/CHANGELOG.md b/CHANGELOG.md index 2507f90c..fa019f52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +Version 223: + +* Add test::stream::service + +-------------------------------------------------------------------------------- + Version 222: * stream_base::timeout::suggested is a nested function diff --git a/include/boost/beast/_experimental/test/impl/stream.hpp b/include/boost/beast/_experimental/test/impl/stream.hpp index 65a48138..3cc5dfce 100644 --- a/include/boost/beast/_experimental/test/impl/stream.hpp +++ b/include/boost/beast/_experimental/test/impl/stream.hpp @@ -13,7 +13,10 @@ #include #include #include +#include +#include #include +#include namespace boost { namespace beast { @@ -21,6 +24,41 @@ namespace test { //------------------------------------------------------------------------------ +struct stream::service_impl +{ + std::mutex m_; + std::vector v_; + + BOOST_BEAST_DECL + void + remove(state& impl); +}; + +class stream::service + : public beast::detail::service_base +{ + boost::shared_ptr sp_; + + BOOST_BEAST_DECL + void + shutdown() override; + +public: + BOOST_BEAST_DECL + explicit + service(net::execution_context& ctx); + + BOOST_BEAST_DECL + static + auto + make_impl( + net::io_context& ctx, + test::fail_count* fc) -> + boost::shared_ptr; +}; + +//------------------------------------------------------------------------------ + template class stream::read_op : public stream::read_op_base { @@ -32,7 +70,7 @@ class stream::read_op : public stream::read_op_base struct lambda { Handler h_; - state& s_; + boost::weak_ptr wp_; Buffers b_; net::executor_work_guard wg2_; @@ -42,13 +80,13 @@ class stream::read_op : public stream::read_op_base template lambda( Handler_&& h, - state& s, + boost::shared_ptr const& s, Buffers const& b) : h_(std::forward(h)) - , s_(s) + , wp_(s) , b_(b) , wg2_(net::get_associated_executor( - h_, s_.ioc.get_executor())) + h_, s->ioc.get_executor())) { } @@ -56,16 +94,19 @@ class stream::read_op : public stream::read_op_base operator()(error_code ec) { std::size_t bytes_transferred = 0; - if (!ec) + auto sp = wp_.lock(); + if(! sp) + ec = net::error::operation_aborted; + if(! ec) { - std::lock_guard lock(s_.m); - BOOST_ASSERT(! s_.op); - if(s_.b.size() > 0) + std::lock_guard lock(sp->m); + BOOST_ASSERT(! sp->op); + if(sp->b.size() > 0) { bytes_transferred = net::buffer_copy( - b_, s_.b.data(), s_.read_max); - s_.b.consume(bytes_transferred); + b_, sp->b.data(), sp->read_max); + sp->b.consume(bytes_transferred); } else if (buffer_size(b_) > 0) { @@ -88,10 +129,10 @@ public: template read_op( Handler_&& h, - state& s, + boost::shared_ptr const& s, Buffers const& b) : fn_(std::forward(h), s, b) - , wg1_(s.ioc.get_executor()) + , wg1_(s->ioc.get_executor()) { } @@ -114,7 +155,7 @@ struct stream::run_read_op void operator()( ReadHandler&& h, - std::shared_ptr const& in, + boost::shared_ptr const& in, MutableBufferSequence const& buffers) { // If you get an error on the following line it means @@ -133,7 +174,7 @@ struct stream::run_read_op typename std::decay::type, MutableBufferSequence>( std::move(h), - *in, + in, buffers)}, buffer_size(buffers)); } @@ -147,8 +188,8 @@ struct stream::run_write_op void operator()( WriteHandler&& h, - std::shared_ptr in_, - std::weak_ptr out_, + boost::shared_ptr in_, + boost::weak_ptr out_, ConstBufferSequence const& buffers) { // If you get an error on the following line it means diff --git a/include/boost/beast/_experimental/test/impl/stream.ipp b/include/boost/beast/_experimental/test/impl/stream.ipp index b5ebf614..b1dec0c4 100644 --- a/include/boost/beast/_experimental/test/impl/stream.ipp +++ b/include/boost/beast/_experimental/test/impl/stream.ipp @@ -14,7 +14,9 @@ #include #include #include +#include #include +#include namespace boost { namespace beast { @@ -22,8 +24,61 @@ namespace test { //------------------------------------------------------------------------------ +stream:: +service:: +service(net::execution_context& ctx) + : beast::detail::service_base(ctx) + , sp_(boost::make_shared()) +{ +} + +void +stream:: +service:: +shutdown() +{ + std::vector> v; + std::lock_guard g1(sp_->m_); + v.reserve(sp_->v_.size()); + for(auto p : sp_->v_) + { + std::lock_guard g2(p->m); + v.emplace_back(std::move(p->op)); + p->code = status::eof; + } +} + +auto +stream:: +service:: +make_impl( + net::io_context& ctx, + test::fail_count* fc) -> + boost::shared_ptr +{ + auto& svc = net::use_service(ctx); + auto sp = boost::make_shared(ctx, svc.sp_, fc); + std::lock_guard g(svc.sp_->m_); + svc.sp_->v_.push_back(sp.get()); + return sp; +} + +void +stream:: +service_impl:: +remove(state& impl) +{ + std::lock_guard g(m_); + *std::find( + v_.begin(), v_.end(), + &impl) = std::move(v_.back()); + v_.pop_back(); +} + +//------------------------------------------------------------------------------ + void stream::initiate_read( - std::shared_ptr const& in_, + boost::shared_ptr const& in_, std::unique_ptr&& op, std::size_t buf_size) { @@ -67,8 +122,10 @@ stream:: state:: state( net::io_context& ioc_, + boost::weak_ptr wp_, fail_count* fc_) : ioc(ioc_) + , wp(std::move(wp_)) , fc(fc_) { } @@ -82,6 +139,20 @@ state:: (*op)(net::error::operation_aborted); } +void +stream:: +state:: +remove() noexcept +{ + auto sp = wp.lock(); + + // If this goes off, it means the lifetime of a test::stream object + // extended beyond the lifetime of the associated execution context. + BOOST_ASSERT(sp); + + sp->remove(*this); +} + void stream:: state:: @@ -98,18 +169,34 @@ notify_read() } } +void +stream:: +state:: +cancel_read() +{ + std::unique_ptr p; + { + std::lock_guard lock(m); + code = status::eof; + p = std::move(op); + } + if(p != nullptr) + (*p)(net::error::operation_aborted); +} + //------------------------------------------------------------------------------ stream:: ~stream() { close(); + in_->remove(); } stream:: stream(stream&& other) { - auto in = std::make_shared( + auto in = service::make_impl( other.in_->ioc, other.in_->fc); in_ = std::move(other.in_); out_ = std::move(other.out_); @@ -121,8 +208,9 @@ stream:: operator=(stream&& other) { close(); - auto in = std::make_shared( + auto in = service::make_impl( other.in_->ioc, other.in_->fc); + in_->remove(); in_ = std::move(other.in_); out_ = std::move(other.out_); other.in_ = in; @@ -133,7 +221,7 @@ operator=(stream&& other) stream:: stream(net::io_context& ioc) - : in_(std::make_shared(ioc, nullptr)) + : in_(service::make_impl(ioc, nullptr)) { } @@ -141,7 +229,7 @@ stream:: stream( net::io_context& ioc, fail_count& fc) - : in_(std::make_shared(ioc, &fc)) + : in_(service::make_impl(ioc, &fc)) { } @@ -149,7 +237,7 @@ stream:: stream( net::io_context& ioc, string_view s) - : in_(std::make_shared(ioc, nullptr)) + : in_(service::make_impl(ioc, nullptr)) { in_->b.commit(net::buffer_copy( in_->b.prepare(s.size()), @@ -161,7 +249,7 @@ stream( net::io_context& ioc, fail_count& fc, string_view s) - : in_(std::make_shared(ioc, &fc)) + : in_(service::make_impl(ioc, &fc)) { in_->b.commit(net::buffer_copy( in_->b.prepare(s.size()), @@ -213,17 +301,7 @@ void stream:: close() { - // cancel outstanding read - { - std::unique_ptr op; - { - std::lock_guard lock(in_->m); - in_->code = status::eof; - op = std::move(in_->op); - } - if(op != nullptr) - (*op)(net::error::operation_aborted); - } + in_->cancel_read(); // disconnect { diff --git a/include/boost/beast/_experimental/test/stream.hpp b/include/boost/beast/_experimental/test/stream.hpp index 0c1132a6..7eb71ba7 100644 --- a/include/boost/beast/_experimental/test/stream.hpp +++ b/include/boost/beast/_experimental/test/stream.hpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -98,8 +100,8 @@ class stream { struct state; - std::shared_ptr in_; - std::weak_ptr out_; + boost::shared_ptr in_; + boost::weak_ptr out_; enum class status { @@ -107,6 +109,9 @@ class stream eof, }; + class service; + struct service_impl; + struct read_op_base { virtual ~read_op_base() = default; @@ -117,11 +122,12 @@ class stream { friend class stream; + net::io_context& ioc; + boost::weak_ptr wp; std::mutex m; flat_buffer b; std::condition_variable cv; std::unique_ptr op; - net::io_context& ioc; status code = status::ok; fail_count* fc = nullptr; std::size_t nread = 0; @@ -132,15 +138,26 @@ class stream (std::numeric_limits::max)(); BOOST_BEAST_DECL - explicit - state(net::io_context& ioc_, fail_count* fc_); + state( + net::io_context& ioc_, + boost::weak_ptr wp_, + fail_count* fc_); + BOOST_BEAST_DECL ~state(); + BOOST_BEAST_DECL + void + remove() noexcept; + BOOST_BEAST_DECL void notify_read(); + + BOOST_BEAST_DECL + void + cancel_read(); }; template @@ -153,7 +170,7 @@ class stream static void initiate_read( - std::shared_ptr const& in, + boost::shared_ptr const& in, std::unique_ptr&& op, std::size_t buf_size); @@ -402,7 +419,7 @@ public: std::size_t bytes_transferred // Number of bytes read. ); @endcode - + Regardless of whether the asynchronous operation completes immediately or not, the handler will not be invoked from within this function. Invocation of the handler will be performed in a @@ -481,7 +498,7 @@ public: std::size_t bytes_transferred // Number of bytes written. ); @endcode - + Regardless of whether the asynchronous operation completes immediately or not, the handler will not be invoked from within this function. Invocation of the handler will be performed in a diff --git a/include/boost/beast/core/detail/service_base.hpp b/include/boost/beast/core/detail/service_base.hpp new file mode 100644 index 00000000..adec09bb --- /dev/null +++ b/include/boost/beast/core/detail/service_base.hpp @@ -0,0 +1,43 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_DETAIL_SERVICE_BASE_HPP +#define BOOST_BEAST_DETAIL_SERVICE_BASE_HPP + +#include + +namespace boost { +namespace beast { +namespace detail { + +template +struct service_id : net::execution_context::id +{ +}; + +template +struct service_base : net::execution_context::service +{ + static service_id id; + + explicit + service_base(net::execution_context& ctx) + : net::execution_context::service(ctx) + { + } +}; + +template +service_id service_base::id; + +} // detail +} // beast +} // boost + +#endif diff --git a/test/beast/_experimental/stream.cpp b/test/beast/_experimental/stream.cpp index a783764b..d4089b20 100644 --- a/test/beast/_experimental/stream.cpp +++ b/test/beast/_experimental/stream.cpp @@ -1,4 +1,4 @@ -// +// // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -11,23 +11,25 @@ #include #include +#include namespace boost { namespace beast { -class test_stream_test +class stream_test : public unit_test::suite { public: void testTestStream() { - net::io_context ioc; char buf[1] = {}; net::mutable_buffer m0; net::mutable_buffer m1(buf, sizeof(buf)); { + net::io_context ioc; + { test::stream ts(ioc); } @@ -44,91 +46,111 @@ public: auto t2 = connect(t1); t2.close(); } - { -#if 0 - test::stream ts(ioc); - error_code ec; - ts.read_some(net::mutable_buffer{}, ec); - log << ec.message(); -#endif - } - { - error_code ec; + } + { + // abandon + net::io_context ioc; + test::stream ts(ioc); + ts.async_read_some(m1, + [](error_code, std::size_t) { - test::stream ts(ioc); - ts.async_read_some(m1, - [&](error_code ec_, std::size_t) - { - ec = ec_; - }); - } - ioc.run(); - ioc.restart(); - BEAST_EXPECTS( - //ec == net::error::eof, - ec == net::error::operation_aborted, - ec.message()); - } + BEAST_FAIL(); + }); + } + //--- + { + net::io_context ioc; { - error_code ec; test::stream ts(ioc); ts.async_read_some(m1, - [&](error_code ec_, std::size_t) - { - ec = ec_; - }); - ts.close(); - ioc.run(); - ioc.restart(); - BEAST_EXPECTS( - //ec == net::error::eof, - ec == net::error::operation_aborted, - ec.message()); + test::fail_handler( + net::error::operation_aborted)); } + test::run(ioc); + } + { + net::io_context ioc; + test::stream ts(ioc); + ts.async_read_some(m1, + test::fail_handler( + net::error::operation_aborted)); + ts.close(); + test::run(ioc); + } + { + net::io_context ioc; + test::stream t1(ioc); + auto t2 = connect(t1); + t1.async_read_some(m1, + test::fail_handler( + net::error::eof)); + t2.close(); + test::run(ioc); + } + { + net::io_context ioc; + test::stream t1(ioc); + auto t2 = connect(t1); + t1.async_read_some(m1, + test::fail_handler( + net::error::operation_aborted)); + t1.close(); + test::run(ioc); + } + } + + void + testSharedAbandon() + { + struct handler + { + std::shared_ptr ts_; + + void + operator()(error_code, std::size_t) { - error_code ec; - test::stream t1(ioc); - auto t2 = connect(t1); - t1.async_read_some(m1, - [&](error_code ec_, std::size_t) - { - ec = ec_; - }); - t2.close(); - ioc.run(); - ioc.restart(); - BEAST_EXPECTS( - ec == net::error::eof, - ec.message()); } + }; + + char buf[1] = {}; + net::mutable_buffer m1(buf, sizeof(buf)); + + std::weak_ptr wp; + + { + net::io_context ioc; { - error_code ec; - test::stream t1(ioc); - auto t2 = connect(t1); - t1.async_read_some(m1, - [&](error_code ec_, std::size_t) - { - ec = ec_; - }); - t1.close(); - ioc.run(); - ioc.restart(); - BEAST_EXPECTS( - ec == net::error::operation_aborted, - ec.message()); + auto sp = std::make_shared(ioc); + + sp->async_read_some(m1, handler{sp}); + wp = sp; } } + BEAST_EXPECT(! wp.lock()); + } + + void + testLifetimeViolation() + { + // This should assert + std::shared_ptr sp; + { + net::io_context ioc; + sp = std::make_shared(ioc); + } + sp.reset(); } void run() override { testTestStream(); - pass(); + testSharedAbandon(); + //testLifetimeViolation(); } }; -BEAST_DEFINE_TESTSUITE(beast,test,test_stream); +BEAST_DEFINE_TESTSUITE(beast,test,stream); } // beast } // boost