diff --git a/.drone/boost-script.sh b/.drone/boost-script.sh index 82bc82a5..066d9282 100755 --- a/.drone/boost-script.sh +++ b/.drone/boost-script.sh @@ -35,3 +35,4 @@ echo '==================================> COMPILE' cd ../boost-root libs/beast/tools/retry.sh libs/beast/tools/build-and-test.sh + diff --git a/CHANGELOG.md b/CHANGELOG.md index 220923e2..cf864d1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Add executor rebind to test::stream. * Remove floating point arithmetic requirement. * Add `cxxstd` to json field. diff --git a/include/boost/beast/_experimental/test/detail/stream_state.hpp b/include/boost/beast/_experimental/test/detail/stream_state.hpp new file mode 100644 index 00000000..5df55cf3 --- /dev/null +++ b/include/boost/beast/_experimental/test/detail/stream_state.hpp @@ -0,0 +1,135 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// Copyright (c) 2020 Richard Hodges (hodges.r@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_TEST_DETAIL_STREAM_STATE_HPP +#define BOOST_BEAST_TEST_DETAIL_STREAM_STATE_HPP + +#include +#include +#include + +#include +#include +#include +#include + +namespace boost { +namespace beast { +namespace test { +namespace detail { + +struct stream_state; + +struct stream_service_impl +{ + std::mutex m_; + std::vector v_; + + BOOST_BEAST_DECL + void + remove(stream_state& impl); +}; + +//------------------------------------------------------------------------------ + +class stream_service + : public beast::detail::service_base +{ + boost::shared_ptr sp_; + + BOOST_BEAST_DECL + void + shutdown() override; + +public: + BOOST_BEAST_DECL + explicit + stream_service(net::execution_context& ctx); + + BOOST_BEAST_DECL + static + auto + make_impl( + net::any_io_executor exec, + test::fail_count* fc) -> + boost::shared_ptr; +}; + +//------------------------------------------------------------------------------ + +struct stream_read_op_base +{ + virtual ~stream_read_op_base() = default; + virtual void operator()(error_code ec) = 0; +}; + +//------------------------------------------------------------------------------ + +enum class stream_status +{ + ok, + eof, +}; + +//------------------------------------------------------------------------------ + +struct stream_state +{ + net::any_io_executor exec; + boost::weak_ptr wp; + std::mutex m; + flat_buffer b; + std::condition_variable cv; + std::unique_ptr op; + stream_status code = stream_status::ok; + fail_count* fc = nullptr; + std::size_t nread = 0; + std::size_t nread_bytes = 0; + std::size_t nwrite = 0; + std::size_t nwrite_bytes = 0; + std::size_t read_max = + (std::numeric_limits::max)(); + std::size_t write_max = + (std::numeric_limits::max)(); + + BOOST_BEAST_DECL + stream_state( + net::any_io_executor exec_, + boost::weak_ptr wp_, + fail_count* fc_); + + BOOST_BEAST_DECL + ~stream_state(); + + BOOST_BEAST_DECL + void + remove() noexcept; + + BOOST_BEAST_DECL + void + notify_read(); + + BOOST_BEAST_DECL + void + cancel_read(); +}; + + + +} // detail +} // test +} // beast +} // boost + +#ifdef BOOST_BEAST_HEADER_ONLY +#include +#endif + +#endif // BOOST_BEAST_TEST_DETAIL_STREAM_STATE_HPP diff --git a/include/boost/beast/_experimental/test/detail/stream_state.ipp b/include/boost/beast/_experimental/test/detail/stream_state.ipp new file mode 100644 index 00000000..c86c7e97 --- /dev/null +++ b/include/boost/beast/_experimental/test/detail/stream_state.ipp @@ -0,0 +1,149 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// Copyright (c) 2020 Richard Hodges (hodges.r@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_TEST_DETAIL_STREAM_STATE_IPP +#define BOOST_BEAST_TEST_DETAIL_STREAM_STATE_IPP + +#include + +namespace boost { +namespace beast { +namespace test { + +namespace detail { + +//------------------------------------------------------------------------------ + +stream_service:: +stream_service(net::execution_context& ctx) + : beast::detail::service_base(ctx) + , sp_(boost::make_shared()) +{ +} + +void +stream_service:: +shutdown() +{ + std::vector> v; + std::lock_guard g1(sp_->m_); + v.reserve(sp_->v_.size()); + for(auto p : sp_->v_) + { + std::lock_guard g2(p->m); + v.emplace_back(std::move(p->op)); + p->code = detail::stream_status::eof; + } +} + +auto +stream_service:: +make_impl( + net::any_io_executor exec, + test::fail_count* fc) -> + boost::shared_ptr +{ +#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT) + auto& ctx = exec.context(); +#else + auto& ctx = net::query( + exec, + net::execution::context); +#endif + auto& svc = net::use_service(ctx); + auto sp = boost::make_shared(exec, svc.sp_, fc); + std::lock_guard g(svc.sp_->m_); + svc.sp_->v_.push_back(sp.get()); + return sp; +} + +//------------------------------------------------------------------------------ + +void +stream_service_impl:: +remove(stream_state& impl) +{ + std::lock_guard g(m_); + *std::find( + v_.begin(), v_.end(), + &impl) = std::move(v_.back()); + v_.pop_back(); +} + +//------------------------------------------------------------------------------ + +stream_state:: +stream_state( + net::any_io_executor exec_, + boost::weak_ptr wp_, + fail_count* fc_) + : exec(std::move(exec_)) + , wp(std::move(wp_)) + , fc(fc_) +{ +} + +stream_state:: +~stream_state() +{ + // cancel outstanding read + if(op != nullptr) + (*op)(net::error::operation_aborted); +} + +void +stream_state:: +remove() noexcept +{ + auto sp = wp.lock(); + + // If this goes off, it means the lifetime of a test::stream object + // extended beyond the lifetime of the associated execution context. + BOOST_ASSERT(sp); + + sp->remove(*this); +} + +void +stream_state:: +notify_read() +{ + if(op) + { + auto op_ = std::move(op); + op_->operator()(error_code{}); + } + else + { + cv.notify_all(); + } +} + +void +stream_state:: +cancel_read() +{ + std::unique_ptr p; + { + std::lock_guard lock(m); + code = stream_status::eof; + p = std::move(op); + } + if(p != nullptr) + (*p)(net::error::operation_aborted); +} + +} // detail + +} // test +} // beast +} // boost + +#endif // BOOST_BEAST_TEST_DETAIL_STREAM_STATE_IPP diff --git a/include/boost/beast/_experimental/test/impl/stream.hpp b/include/boost/beast/_experimental/test/impl/stream.hpp index f7eac6af..57025833 100644 --- a/include/boost/beast/_experimental/test/impl/stream.hpp +++ b/include/boost/beast/_experimental/test/impl/stream.hpp @@ -27,53 +27,19 @@ namespace test { //------------------------------------------------------------------------------ -struct stream::service_impl -{ - std::mutex m_; - std::vector v_; - - BOOST_BEAST_DECL - void - remove(state& impl); -}; - -class stream::service - : public beast::detail::service_base -{ - boost::shared_ptr sp_; - - BOOST_BEAST_DECL - void - shutdown() override; - -public: - BOOST_BEAST_DECL - explicit - service(net::execution_context& ctx); - - BOOST_BEAST_DECL - static - auto - make_impl( - net::io_context& ctx, - test::fail_count* fc) -> - boost::shared_ptr; -}; - -//------------------------------------------------------------------------------ - +template template -class stream::read_op : public stream::read_op_base +class basic_stream::read_op : public detail::stream_read_op_base { using ex1_type = - net::io_context::executor_type; + executor_type; using ex2_type = net::associated_executor_t; struct lambda { Handler h_; - boost::weak_ptr wp_; + boost::weak_ptr wp_; Buffers b_; #if defined(BOOST_ASIO_NO_TS_EXECUTORS) net::any_io_executor wg2_; @@ -87,7 +53,7 @@ class stream::read_op : public stream::read_op_base template lambda( Handler_&& h, - boost::shared_ptr const& s, + boost::shared_ptr const& s, Buffers const& b) : h_(std::forward(h)) , wp_(s) @@ -95,11 +61,11 @@ class stream::read_op : public stream::read_op_base #if defined(BOOST_ASIO_NO_TS_EXECUTORS) , wg2_(net::prefer( net::get_associated_executor( - h_, s->ioc.get_executor()), + h_, s->exec), net::execution::outstanding_work.tracked)) #else // defined(BOOST_ASIO_NO_TS_EXECUTORS) , wg2_(net::get_associated_executor( - h_, s->ioc.get_executor())) + h_, s->exec)) #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS) { } @@ -151,43 +117,44 @@ class stream::read_op : public stream::read_op_base }; lambda fn_; -#if defined(BOOST_ASIO_NO_TS_EXECUTORS) +#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT) + net::executor_work_guard wg1_; +#else net::any_io_executor wg1_; -#else // defined(BOOST_ASIO_NO_TS_EXECUTORS) - net::executor_work_guard wg1_; -#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS) +#endif public: template read_op( Handler_&& h, - boost::shared_ptr const& s, + boost::shared_ptr const& s, Buffers const& b) : fn_(std::forward(h), s, b) -#if defined(BOOST_ASIO_NO_TS_EXECUTORS) - , wg1_(net::prefer(s->ioc.get_executor(), +#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT) + , wg1_(s->exec) +#else + , wg1_(net::prefer(s->exec, net::execution::outstanding_work.tracked)) -#else // defined(BOOST_ASIO_NO_TS_EXECUTORS) - , wg1_(s->ioc.get_executor()) -#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS) +#endif { } void operator()(error_code ec) override { -#if defined(BOOST_ASIO_NO_TS_EXECUTORS) - net::post(wg1_, beast::bind_front_handler(std::move(fn_), ec)); - wg1_ = net::any_io_executor(); // probably unnecessary -#else // defined(BOOST_ASIO_NO_TS_EXECUTORS) +#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT) net::post(wg1_.get_executor(), beast::bind_front_handler(std::move(fn_), ec)); wg1_.reset(); -#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS) +#else + net::post(wg1_, beast::bind_front_handler(std::move(fn_), ec)); + wg1_ = net::any_io_executor(); // probably unnecessary +#endif } }; -struct stream::run_read_op +template +struct basic_stream::run_read_op { template< class ReadHandler, @@ -195,7 +162,7 @@ struct stream::run_read_op void operator()( ReadHandler&& h, - boost::shared_ptr const& in, + boost::shared_ptr const& in, MutableBufferSequence const& buffers) { // If you get an error on the following line it means @@ -209,7 +176,7 @@ struct stream::run_read_op initiate_read( in, - std::unique_ptr{ + std::unique_ptr{ new read_op< typename std::decay::type, MutableBufferSequence>( @@ -220,7 +187,8 @@ struct stream::run_read_op } }; -struct stream::run_write_op +template +struct basic_stream::run_write_op { template< class WriteHandler, @@ -228,8 +196,8 @@ struct stream::run_write_op void operator()( WriteHandler&& h, - boost::shared_ptr in_, - boost::weak_ptr out_, + boost::shared_ptr in_, + boost::weak_ptr out_, ConstBufferSequence const& buffers) { // If you get an error on the following line it means @@ -245,7 +213,7 @@ struct stream::run_write_op auto const upcall = [&](error_code ec, std::size_t n) { net::post( - in_->ioc.get_executor(), + in_->exec, beast::bind_front_handler(std::move(h), ec, n)); }; @@ -281,9 +249,10 @@ struct stream::run_write_op //------------------------------------------------------------------------------ +template template std::size_t -stream:: +basic_stream:: read_some(MutableBufferSequence const& buffers) { static_assert(net::is_mutable_buffer_sequence< @@ -296,9 +265,10 @@ read_some(MutableBufferSequence const& buffers) return n; } +template template std::size_t -stream:: +basic_stream:: read_some(MutableBufferSequence const& buffers, error_code& ec) { @@ -326,7 +296,7 @@ read_some(MutableBufferSequence const& buffers, { return in_->b.size() > 0 || - in_->code != status::ok; + in_->code != detail::stream_status::ok; }); // deliver bytes before eof @@ -340,14 +310,16 @@ read_some(MutableBufferSequence const& buffers, } // deliver error - BOOST_ASSERT(in_->code != status::ok); + BOOST_ASSERT(in_->code != detail::stream_status::ok); ec = net::error::eof; return 0; } -template -BOOST_BEAST_ASYNC_RESULT2(ReadHandler) -stream:: +template +template +BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, void(error_code, std::size_t)) +basic_stream:: async_read_some( MutableBufferSequence const& buffers, ReadHandler&& handler) @@ -365,9 +337,10 @@ async_read_some( buffers); } +template template std::size_t -stream:: +basic_stream:: write_some(ConstBufferSequence const& buffers) { static_assert(net::is_const_buffer_sequence< @@ -381,9 +354,10 @@ write_some(ConstBufferSequence const& buffers) return bytes_transferred; } +template template std::size_t -stream:: +basic_stream:: write_some( ConstBufferSequence const& buffers, error_code& ec) { @@ -425,9 +399,11 @@ write_some( return n; } -template -BOOST_BEAST_ASYNC_RESULT2(WriteHandler) -stream:: +template +template +BOOST_ASIO_INITFN_RESULT_TYPE(WriteHandler, void(error_code, std::size_t)) +basic_stream:: async_write_some( ConstBufferSequence const& buffers, WriteHandler&& handler) @@ -448,11 +424,11 @@ async_write_some( //------------------------------------------------------------------------------ -template +template void async_teardown( role_type, - stream& s, + basic_stream& s, TeardownHandler&& handler) { error_code ec; @@ -477,8 +453,8 @@ async_teardown( //------------------------------------------------------------------------------ -template -stream +template +basic_stream connect(stream& to, Arg1&& arg1, ArgN&&... argn) { stream from{ @@ -488,6 +464,34 @@ connect(stream& to, Arg1&& arg1, ArgN&&... argn) return from; } +namespace detail +{ +template +struct extract_executor_op +{ + To operator()(net::any_io_executor& ex) const + { + assert(ex.template target()); + return *ex.template target(); + } +}; + +template<> +struct extract_executor_op +{ + net::any_io_executor operator()(net::any_io_executor& ex) const + { + return ex; + } +}; +} + +template +auto basic_stream::get_executor() noexcept -> executor_type +{ + return detail::extract_executor_op()(in_->exec); +} + } // test } // beast } // boost diff --git a/include/boost/beast/_experimental/test/impl/stream.ipp b/include/boost/beast/_experimental/test/impl/stream.ipp index ec544ce1..98ddea90 100644 --- a/include/boost/beast/_experimental/test/impl/stream.ipp +++ b/include/boost/beast/_experimental/test/impl/stream.ipp @@ -23,62 +23,10 @@ namespace test { //------------------------------------------------------------------------------ -stream:: -service:: -service(net::execution_context& ctx) - : beast::detail::service_base(ctx) - , sp_(boost::make_shared()) -{ -} - -void -stream:: -service:: -shutdown() -{ - std::vector> v; - std::lock_guard g1(sp_->m_); - v.reserve(sp_->v_.size()); - for(auto p : sp_->v_) - { - std::lock_guard g2(p->m); - v.emplace_back(std::move(p->op)); - p->code = status::eof; - } -} - -auto -stream:: -service:: -make_impl( - net::io_context& ctx, - test::fail_count* fc) -> - boost::shared_ptr -{ - auto& svc = net::use_service(ctx); - auto sp = boost::make_shared(ctx, svc.sp_, fc); - std::lock_guard g(svc.sp_->m_); - svc.sp_->v_.push_back(sp.get()); - return sp; -} - -void -stream:: -service_impl:: -remove(state& impl) -{ - std::lock_guard g(m_); - *std::find( - v_.begin(), v_.end(), - &impl) = std::move(v_.back()); - v_.pop_back(); -} - -//------------------------------------------------------------------------------ - -void stream::initiate_read( - boost::shared_ptr const& in_, - std::unique_ptr&& op, +template +void basic_stream::initiate_read( + boost::shared_ptr const& in_, + std::unique_ptr&& op, std::size_t buf_size) { std::unique_lock lock(in_->m); @@ -106,7 +54,7 @@ void stream::initiate_read( } // deliver error - if(in_->code != status::ok) + if(in_->code != detail::stream_status::ok) { lock.unlock(); (*op)(net::error::eof); @@ -117,98 +65,36 @@ void stream::initiate_read( in_->op = std::move(op); } -stream:: -state:: -state( - net::io_context& ioc_, - boost::weak_ptr wp_, - fail_count* fc_) - : ioc(ioc_) - , wp(std::move(wp_)) - , fc(fc_) -{ -} - -stream:: -state:: -~state() -{ - // cancel outstanding read - if(op != nullptr) - (*op)(net::error::operation_aborted); -} - -void -stream:: -state:: -remove() noexcept -{ - auto sp = wp.lock(); - - // If this goes off, it means the lifetime of a test::stream object - // extended beyond the lifetime of the associated execution context. - BOOST_ASSERT(sp); - - sp->remove(*this); -} - -void -stream:: -state:: -notify_read() -{ - if(op) - { - auto op_ = std::move(op); - op_->operator()(error_code{}); - } - else - { - cv.notify_all(); - } -} - -void -stream:: -state:: -cancel_read() -{ - std::unique_ptr p; - { - std::lock_guard lock(m); - code = status::eof; - p = std::move(op); - } - if(p != nullptr) - (*p)(net::error::operation_aborted); -} - //------------------------------------------------------------------------------ -stream:: -~stream() +template +basic_stream:: +~basic_stream() { close(); in_->remove(); } -stream:: -stream(stream&& other) +template +basic_stream:: +basic_stream(basic_stream&& other) { - auto in = service::make_impl( - other.in_->ioc, other.in_->fc); + auto in = detail::stream_service::make_impl( + other.in_->exec, other.in_->fc); in_ = std::move(other.in_); out_ = std::move(other.out_); other.in_ = in; } -stream& -stream:: -operator=(stream&& other) + +template +basic_stream& +basic_stream:: +operator=(basic_stream&& other) { close(); - auto in = service::make_impl( - other.in_->ioc, other.in_->fc); + auto in = detail::stream_service::make_impl( + other.in_->exec, other.in_->fc); in_->remove(); in_ = std::move(other.in_); out_ = std::move(other.out_); @@ -218,46 +104,51 @@ operator=(stream&& other) //------------------------------------------------------------------------------ -stream:: -stream(net::io_context& ioc) - : in_(service::make_impl(ioc, nullptr)) +template +basic_stream:: +basic_stream(executor_type exec) + : in_(detail::stream_service::make_impl(std::move(exec), nullptr)) { } -stream:: -stream( +template +basic_stream:: +basic_stream( net::io_context& ioc, fail_count& fc) - : in_(service::make_impl(ioc, &fc)) + : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc)) { } -stream:: -stream( +template +basic_stream:: +basic_stream( net::io_context& ioc, string_view s) - : in_(service::make_impl(ioc, nullptr)) + : in_(detail::stream_service::make_impl(ioc.get_executor(), nullptr)) { in_->b.commit(net::buffer_copy( in_->b.prepare(s.size()), net::buffer(s.data(), s.size()))); } -stream:: -stream( +template +basic_stream:: +basic_stream( net::io_context& ioc, fail_count& fc, string_view s) - : in_(service::make_impl(ioc, &fc)) + : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc)) { in_->b.commit(net::buffer_copy( in_->b.prepare(s.size()), net::buffer(s.data(), s.size()))); } +template void -stream:: -connect(stream& remote) +basic_stream:: +connect(basic_stream& remote) { BOOST_ASSERT(! out_.lock()); BOOST_ASSERT(! remote.out_.lock()); @@ -266,12 +157,13 @@ connect(stream& remote) std::lock_guard guard2{remote.in_->m, std::adopt_lock}; out_ = remote.in_; remote.out_ = in_; - in_->code = status::ok; - remote.in_->code = status::ok; + in_->code = detail::stream_status::ok; + remote.in_->code = detail::stream_status::ok; } +template string_view -stream:: +basic_stream:: str() const { auto const bs = in_->b.data(); @@ -281,8 +173,9 @@ str() const return {static_cast(b.data()), b.size()}; } +template void -stream:: +basic_stream:: append(string_view s) { std::lock_guard lock{in_->m}; @@ -291,16 +184,18 @@ append(string_view s) net::buffer(s.data(), s.size()))); } +template void -stream:: +basic_stream:: clear() { std::lock_guard lock{in_->m}; in_->b.consume(in_->b.size()); } +template void -stream:: +basic_stream:: close() { in_->cancel_read(); @@ -314,31 +209,33 @@ close() if(out) { std::lock_guard lock(out->m); - if(out->code == status::ok) + if(out->code == detail::stream_status::ok) { - out->code = status::eof; + out->code = detail::stream_status::eof; out->notify_read(); } } } } +template void -stream:: +basic_stream:: close_remote() { std::lock_guard lock{in_->m}; - if(in_->code == status::ok) + if(in_->code == detail::stream_status::ok) { - in_->code = status::eof; + in_->code = detail::stream_status::eof; in_->notify_read(); } } +template void teardown( role_type, - stream& s, + basic_stream& s, boost::system::error_code& ec) { if( s.in_->fc && @@ -356,20 +253,18 @@ teardown( //------------------------------------------------------------------------------ -stream -connect(stream& to) +template +basic_stream +connect(basic_stream& to) { -#if defined(BOOST_ASIO_NO_TS_EXECUTORS) - stream from{net::query(to.get_executor(), net::execution::context)}; -#else // defined(BOOST_ASIO_NO_TS_EXECUTORS) - stream from{to.get_executor().context()}; -#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS) + basic_stream from(to.get_executor()); from.connect(to); return from; } +template void -connect(stream& s1, stream& s2) +connect(basic_stream& s1, basic_stream& s2) { s1.connect(s2); } diff --git a/include/boost/beast/_experimental/test/stream.hpp b/include/boost/beast/_experimental/test/stream.hpp index 6b725ba1..f9626923 100644 --- a/include/boost/beast/_experimental/test/stream.hpp +++ b/include/boost/beast/_experimental/test/stream.hpp @@ -16,10 +16,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -106,71 +108,45 @@ namespace test { @li AsyncReadStream @li AsyncWriteStream */ -class stream +template +class basic_stream; + +template +void +teardown( + role_type, + basic_stream& s, + boost::system::error_code& ec); + +template +void +async_teardown( + role_type role, + basic_stream& s, + TeardownHandler&& handler); + +template +class basic_stream { - struct state; +public: + /// The type of the executor associated with the object. + using executor_type = + Executor; - boost::shared_ptr in_; - boost::weak_ptr out_; - - enum class status + /// Rebinds the socket type to another executor. + template + struct rebind_executor { - ok, - eof, + /// The socket type when rebound to the specified executor. + typedef basic_stream other; }; - class service; - struct service_impl; +private: + template + friend class basic_stream; - struct read_op_base - { - virtual ~read_op_base() = default; - virtual void operator()(error_code ec) = 0; - }; - - struct state - { - friend class stream; - - net::io_context& ioc; - boost::weak_ptr wp; - std::mutex m; - flat_buffer b; - std::condition_variable cv; - std::unique_ptr op; - status code = status::ok; - fail_count* fc = nullptr; - std::size_t nread = 0; - std::size_t nread_bytes = 0; - std::size_t nwrite = 0; - std::size_t nwrite_bytes = 0; - std::size_t read_max = - (std::numeric_limits::max)(); - std::size_t write_max = - (std::numeric_limits::max)(); - - BOOST_BEAST_DECL - state( - net::io_context& ioc_, - boost::weak_ptr wp_, - fail_count* fc_); - - - BOOST_BEAST_DECL - ~state(); - - BOOST_BEAST_DECL - void - remove() noexcept; - - BOOST_BEAST_DECL - void - notify_read(); - - BOOST_BEAST_DECL - void - cancel_read(); - }; + boost::shared_ptr in_; + boost::weak_ptr out_; template class read_op; @@ -178,12 +154,11 @@ class stream struct run_read_op; struct run_write_op; - BOOST_BEAST_DECL static void initiate_read( - boost::shared_ptr const& in, - std::unique_ptr&& op, + boost::shared_ptr const& in, + std::unique_ptr&& op, std::size_t buf_size); #if ! BOOST_BEAST_DOXYGEN @@ -192,7 +167,7 @@ class stream template friend class boost::asio::ssl::stream; // DEPRECATED - using lowest_layer_type = stream; + using lowest_layer_type = basic_stream; // DEPRECATED lowest_layer_type& lowest_layer() noexcept @@ -220,25 +195,40 @@ public: the peer will see the error `net::error::connection_reset` when performing any reads or writes. */ - BOOST_BEAST_DECL - ~stream(); + ~basic_stream(); /** Move Constructor Moving the stream while asynchronous operations are pending results in undefined behavior. */ - BOOST_BEAST_DECL - stream(stream&& other); + basic_stream(basic_stream&& other); + + /** Move Constructor + + Moving the stream while asynchronous operations are pending + results in undefined behavior. + */ + template + basic_stream(basic_stream&& other) + : in_(std::move(other.in_)) + , out_(std::move(other.out_)) + { + assert(in_->exec.target_type() == typeid(Executor2)); + in_->exec = executor_type(*in_->exec.template target()); + } /** Move Assignment Moving the stream while asynchronous operations are pending results in undefined behavior. */ - BOOST_BEAST_DECL - stream& - operator=(stream&& other); + basic_stream& + operator=(basic_stream&& other); + + template + basic_stream& + operator==(basic_stream&& other); /** Construct a stream @@ -247,9 +237,24 @@ public: @param ioc The `io_context` object that the stream will use to dispatch handlers for any asynchronous operations. */ - BOOST_BEAST_DECL + template + explicit basic_stream(ExecutionContext& context, + typename std::enable_if< + std::is_convertible::value + >::type* = 0) + : basic_stream(context.get_executor()) + { + } + + /** Construct a stream + + The stream will be created in a disconnected state. + + @param exec The `executor` object that the stream will use to + dispatch handlers for any asynchronous operations. + */ explicit - stream(net::io_context& ioc); + basic_stream(executor_type exec); /** Construct a stream @@ -263,8 +268,7 @@ public: fail count. When the fail count reaches its internal limit, a simulated failure error will be raised. */ - BOOST_BEAST_DECL - stream( + basic_stream( net::io_context& ioc, fail_count& fc); @@ -278,8 +282,7 @@ public: @param s A string which will be appended to the input area, not including the null terminator. */ - BOOST_BEAST_DECL - stream( + basic_stream( net::io_context& ioc, string_view s); @@ -298,27 +301,18 @@ public: @param s A string which will be appended to the input area, not including the null terminator. */ - BOOST_BEAST_DECL - stream( + basic_stream( net::io_context& ioc, fail_count& fc, string_view s); /// Establish a connection - BOOST_BEAST_DECL void - connect(stream& remote); - - /// The type of the executor associated with the object. - using executor_type = - net::io_context::executor_type; + connect(basic_stream& remote); /// Return the executor associated with the object. executor_type - get_executor() noexcept - { - return in_->ioc.get_executor(); - }; + get_executor() noexcept; /// Set the maximum number of bytes returned by read_some void @@ -342,17 +336,14 @@ public: } /// Returns a string view representing the pending input data - BOOST_BEAST_DECL string_view str() const; /// Appends a string to the pending input data - BOOST_BEAST_DECL void append(string_view s); /// Clear the pending input area - BOOST_BEAST_DECL void clear(); @@ -389,7 +380,6 @@ public: The other end of the connection will see `error::eof` after reading all the remaining data. */ - BOOST_BEAST_DECL void close(); @@ -398,7 +388,6 @@ public: This end of the connection will see `error::eof` after reading all the remaining data. */ - BOOST_BEAST_DECL void close_remote(); @@ -477,11 +466,12 @@ public: */ template< class MutableBufferSequence, - BOOST_BEAST_ASYNC_TPARAM2 ReadHandler> - BOOST_BEAST_ASYNC_RESULT2(ReadHandler) + BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) ReadHandler + BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)> + BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, void(error_code, std::size_t)) async_read_some( MutableBufferSequence const& buffers, - ReadHandler&& handler); + ReadHandler&& handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)); /** Write some data to the stream. @@ -555,36 +545,36 @@ public: */ template< class ConstBufferSequence, - BOOST_BEAST_ASYNC_TPARAM2 WriteHandler> - BOOST_BEAST_ASYNC_RESULT2(WriteHandler) + BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) WriteHandler + BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)> + BOOST_ASIO_INITFN_RESULT_TYPE(WriteHandler, void(error_code, std::size_t)) async_write_some( ConstBufferSequence const& buffers, - WriteHandler&& handler); + WriteHandler&& handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type) + ); #if ! BOOST_BEAST_DOXYGEN friend - BOOST_BEAST_DECL void - teardown( + teardown<>( role_type, - stream& s, + basic_stream& s, boost::system::error_code& ec); - template + template friend - BOOST_BEAST_DECL void async_teardown( role_type role, - stream& s, + basic_stream& s, TeardownHandler&& handler); #endif }; #if ! BOOST_BEAST_DOXYGEN -inline +template void -beast_close_socket(stream& s) +beast_close_socket(basic_stream& s) { s.close(); } @@ -599,31 +589,34 @@ beast_close_socket(stream& s) @return The new, connected stream. */ +template template -stream -connect(stream& to, Args&&... args); +bascic_stream +connect(basic_stream& to, Args&&... args); #else -BOOST_BEAST_DECL -stream -connect(stream& to); +template +basic_stream +connect(basic_stream& to); -BOOST_BEAST_DECL +template void -connect(stream& s1, stream& s2); +connect(basic_stream& s1, basic_stream& s2); -template -stream -connect(stream& to, Arg1&& arg1, ArgN&&... argn); +template +basic_stream +connect(basic_stream& to, Arg1&& arg1, ArgN&&... argn); #endif +using stream = basic_stream<>; + } // test } // beast } // boost #include -#ifdef BOOST_BEAST_HEADER_ONLY +//#ifdef BOOST_BEAST_HEADER_ONLY #include -#endif +//#endif #endif diff --git a/include/boost/beast/src.hpp b/include/boost/beast/src.hpp index 2376c3fc..62229d24 100644 --- a/include/boost/beast/src.hpp +++ b/include/boost/beast/src.hpp @@ -28,6 +28,7 @@ the program, with the macro BOOST_BEAST_SEPARATE_COMPILATION defined. #include #include #include +#include #include #include diff --git a/test/beast/_experimental/stream.cpp b/test/beast/_experimental/stream.cpp index 65f326f2..58b57bfa 100644 --- a/test/beast/_experimental/stream.cpp +++ b/test/beast/_experimental/stream.cpp @@ -14,7 +14,11 @@ #include #include +#if defined(BOOST_ASIO_HAS_CO_AWAIT) +#include +#endif +#define DEF boost::asio::use_future_t namespace boost { namespace beast { @@ -153,6 +157,18 @@ public: test::stream>::value); } +#if defined(BOOST_ASIO_HAS_CO_AWAIT) + net::awaitable + testRebind(net::mutable_buffer& b) + { + auto ex = co_await net::this_coro::executor; + auto s1 = test::stream(ex); + auto s2 = net::use_awaitable.as_default_on(std::move(s1)); + auto bt = co_await s2.async_read_some(b); + bt = co_await s2.async_write_some(b); + } +#endif + void run() override { @@ -160,6 +176,9 @@ public: testSharedAbandon(); //testLifetimeViolation(); boost::ignore_unused(&stream_test::testAsioSSLCompat); +#if defined(BOOST_ASIO_HAS_CO_AWAIT) + boost::ignore_unused(&stream_test::testRebind); +#endif } }; diff --git a/test/beast/core/async_base.cpp b/test/beast/core/async_base.cpp index 2125d4e7..6c9a3f90 100644 --- a/test/beast/core/async_base.cpp +++ b/test/beast/core/async_base.cpp @@ -34,12 +34,18 @@ namespace beast { namespace { #if defined(BOOST_ASIO_NO_TS_EXECUTORS) + +static struct ex1_context : net::execution_context +{ + +} ex1ctx; + struct ex1_type { net::execution_context & query(net::execution::context_t c) const noexcept - { return *reinterpret_cast(0); } + { return *reinterpret_cast(&ex1ctx); } net::execution::blocking_t query(net::execution::blocking_t) const noexcept @@ -716,9 +722,9 @@ public: net::steady_timer timer; - temporary_data(std::string message_, net::io_context& ctx) + temporary_data(std::string message_, net::any_io_executor ex) : message(std::move(message_)) - , timer(ctx) + , timer(std::move(ex)) { } }; @@ -733,7 +739,7 @@ public: , repeats_(repeats) , data_(allocate_stable(*this, std::move(message), - net::query(stream.get_executor(), net::execution::context))) + stream.get_executor())) { (*this)(); // start the operation } diff --git a/test/beast/core/buffered_read_stream.cpp b/test/beast/core/buffered_read_stream.cpp index 808b0213..cf3996ed 100644 --- a/test/beast/core/buffered_read_stream.cpp +++ b/test/beast/core/buffered_read_stream.cpp @@ -16,7 +16,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -41,10 +43,17 @@ public: buffered_read_stream srs(ioc); buffered_read_stream srs2(std::move(srs)); srs = std::move(srs2); +#if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT) + BEAST_EXPECT(&srs.get_executor().context() == &ioc); + BEAST_EXPECT( + &srs.get_executor().context() == + &srs2.get_executor().context()); +#else BEAST_EXPECT(&net::query(srs.get_executor(), net::execution::context) == &ioc); BEAST_EXPECT( &net::query(srs.get_executor(), net::execution::context) == &net::query(srs2.get_executor(), net::execution::context)); +#endif } { test::stream ts{ioc}; diff --git a/test/beast/websocket/accept.cpp b/test/beast/websocket/accept.cpp index 4b4ae751..ab5a0f70 100644 --- a/test/beast/websocket/accept.cpp +++ b/test/beast/websocket/accept.cpp @@ -55,7 +55,9 @@ public: static void fail_loop( - std::function&)> f, + std::function>&)> + f, std::chrono::steady_clock::duration amount = std::chrono::seconds(5)) { @@ -68,7 +70,8 @@ public: test::fail_count fc(n); try { - stream ws(ioc, fc); + stream> + ws(ioc, fc); auto tr = connect(ws.next_layer()); f(ws); break; @@ -101,7 +104,7 @@ public: net::io_context ioc; // request in stream - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { ws.next_layer().append( "GET / HTTP/1.1\r\n" @@ -116,7 +119,7 @@ public: }); // request in stream, decorator - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { ws.next_layer().append( "GET / HTTP/1.1\r\n" @@ -135,7 +138,7 @@ public: }); // request in buffers - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { api.accept(ws, sbuf( "GET / HTTP/1.1\r\n" @@ -149,7 +152,7 @@ public: }); // request in buffers, decorator - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { bool called = false; ws.set_option(stream_base::decorator( @@ -166,7 +169,7 @@ public: }); // request in buffers and stream - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { ws.next_layer().append( "Connection: upgrade\r\n" @@ -183,7 +186,7 @@ public: }); // request in buffers and stream, decorator - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { ws.next_layer().append( "Connection: upgrade\r\n" @@ -213,7 +216,7 @@ public: req.insert(http::field::sec_websocket_key, "dGhlIHNhbXBsZSBub25jZQ=="); req.insert(http::field::sec_websocket_version, "13"); - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { api.accept(ws, req); }); @@ -231,7 +234,7 @@ public: req.insert(http::field::sec_websocket_key, "dGhlIHNhbXBsZSBub25jZQ=="); req.insert(http::field::sec_websocket_version, "13"); - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { bool called = false; ws.set_option(stream_base::decorator( @@ -253,7 +256,7 @@ public: req.insert(http::field::sec_websocket_key, "dGhlIHNhbXBsZSBub25jZQ=="); req.insert(http::field::sec_websocket_version, "13"); - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { ws.next_layer().append("\x88\x82\xff\xff\xff\xff\xfc\x17"); api.accept(ws, req); @@ -272,7 +275,7 @@ public: } // failed handshake (missing Sec-WebSocket-Key) - fail_loop([&](stream& ws) + fail_loop([&](stream>& ws) { ws.next_layer().append( "GET / HTTP/1.1\r\n" @@ -311,7 +314,7 @@ public: // request in stream { - stream ws{ioc, + stream> ws{ioc, "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -337,7 +340,7 @@ public: // request in stream, decorator { - stream ws{ioc, + stream> ws{ioc, "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -366,7 +369,7 @@ public: // request in buffers { - stream ws{ioc}; + stream> ws{ioc}; auto tr = connect(ws.next_layer()); try { @@ -392,7 +395,7 @@ public: // request in buffers, decorator { - stream ws{ioc}; + stream> ws{ioc}; auto tr = connect(ws.next_layer()); try { @@ -420,7 +423,7 @@ public: // request in buffers and stream { - stream ws{ioc, + stream> ws{ioc, "Connection: upgrade\r\n" "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" "Sec-WebSocket-Version: 13\r\n" @@ -446,7 +449,7 @@ public: // request in buffers and stream, decorator { - stream ws{ioc, + stream> ws{ioc, "Connection: upgrade\r\n" "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" "Sec-WebSocket-Version: 13\r\n" @@ -497,7 +500,7 @@ public: n = s.size() - 1; break; } - stream ws(ioc); + stream> ws(ioc); auto tr = connect(ws.next_layer()); ws.next_layer().append( s.substr(n, s.size() - n)); @@ -658,7 +661,7 @@ public: { net::io_context ioc; { - stream ws(ioc); + stream> ws(ioc); auto tr = connect(ws.next_layer()); tr.close(); try @@ -675,7 +678,8 @@ public: } } { - stream ws(ioc); + stream> + ws(ioc.get_executor()); auto tr = connect(ws.next_layer()); tr.close(); try @@ -713,8 +717,8 @@ public: } { - stream ws1(ioc); - stream ws2(ioc); + stream> ws1(ioc); + stream> ws2(ioc); test::connect(ws1.next_layer(), ws2.next_layer()); ws1.async_handshake("test", "/", test::success_handler()); @@ -739,8 +743,8 @@ public: } { - stream ws1(ioc); - stream ws2(ioc); + stream> ws1(ioc); + stream> ws2(ioc); test::connect(ws1.next_layer(), ws2.next_layer()); ws1.set_option(stream_base::timeout{ @@ -768,8 +772,8 @@ public: } { - stream ws1(ioc); - stream ws2(ioc); + stream> ws1(ioc); + stream> ws2(ioc); test::connect(ws1.next_layer(), ws2.next_layer()); ws1.set_option(stream_base::timeout{ diff --git a/test/extras/include/boost/beast/test/yield_to.hpp b/test/extras/include/boost/beast/test/yield_to.hpp index 4a809745..b6571ebd 100644 --- a/test/extras/include/boost/beast/test/yield_to.hpp +++ b/test/extras/include/boost/beast/test/yield_to.hpp @@ -35,8 +35,9 @@ protected: net::io_context ioc_; private: - detail::select_work_guard_t - work_; + beast::detail::select_work_guard_t< + net::io_context::executor_type> + work_; std::vector threads_; std::mutex m_; std::condition_variable cv_; @@ -49,7 +50,8 @@ public: explicit enable_yield_to(std::size_t concurrency = 1) - : work_(detail::make_work_guard(ioc_.get_executor())) + : work_(beast::detail::make_work_guard( + ioc_.get_executor())) { threads_.reserve(concurrency); while(concurrency--)