diff --git a/CHANGELOG.md b/CHANGELOG.md index 332cb80d..54b0ee67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Version 211: * close_socket is in stream_traits.hpp * Improvements to test::stream +* Add stranded_stream -------------------------------------------------------------------------------- diff --git a/doc/qbk/03_core/3_layers.qbk b/doc/qbk/03_core/3_layers.qbk index 744f7782..d90669b6 100644 --- a/doc/qbk/03_core/3_layers.qbk +++ b/doc/qbk/03_core/3_layers.qbk @@ -109,6 +109,16 @@ facilities for authoring and working with layered streams: allows for move-construction and move-assignment, and also implements a work-around for a performance limitation in the original SSL stream. ]] +[[ + [link beast.ref.boost__beast__stranded_stream `stranded_stream`] +][ + A timeout stream meets the requirements for synchronous and asynchronous + read and write streams by passing I/O through to an underlying + `net::basic_stream_socket`, and additionally supports + [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1322r0.html [P1322R0] "Networking TS enhancement to enable custom I/O executors"], + allowing a custom executor (such a strand) to be used for all + asynchronous operations. +]] ] [heading Example] diff --git a/doc/qbk/quickref.xml b/doc/qbk/quickref.xml index 7e21f714..178bf619 100644 --- a/doc/qbk/quickref.xml +++ b/doc/qbk/quickref.xml @@ -45,6 +45,7 @@ span static_string stable_async_op_base 🞲 + stranded_stream 🞲 string_param string_view timeout_stream 🞲 diff --git a/include/boost/beast/core.hpp b/include/boost/beast/core.hpp index d8f520cd..27e18b2a 100644 --- a/include/boost/beast/core.hpp +++ b/include/boost/beast/core.hpp @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include diff --git a/include/boost/beast/core/detail/bind_default_executor.hpp b/include/boost/beast/core/detail/bind_default_executor.hpp new file mode 100644 index 00000000..e83d3b1f --- /dev/null +++ b/include/boost/beast/core/detail/bind_default_executor.hpp @@ -0,0 +1,123 @@ +// +// Copyright (c) 2018 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_CORE_DETAIL_BIND_DEFAULT_EXECUTOR_HPP +#define BOOST_BEAST_CORE_DETAIL_BIND_DEFAULT_EXECUTOR_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace beast { +namespace detail { + +template +class bind_default_executor_wrapper + : private boost::empty_value +{ + Handler h_; + +public: + template + bind_default_executor_wrapper( + Handler_&& h, + Executor const& ex) + : boost::empty_value( + boost::empty_init_t{}, ex) + , h_(std::forward(h)) + { + } + + template + void + operator()(Args&&... args) + { + h_(std::forward(args)...); + } + + using allocator_type = + net::associated_allocator_t; + + allocator_type + get_allocator() const noexcept + { + return net::get_associated_allocator(h_); + } + + using executor_type = + net::associated_executor_t; + + executor_type + get_executor() const noexcept + { + return net::get_associated_executor( + h_, this->get()); + } + + template + void + asio_handler_invoke(Function&& f, + bind_default_executor_wrapper* p) + { + net::dispatch(p->get_executor(), std::move(f)); + } + + friend + void* asio_handler_allocate( + std::size_t size, bind_default_executor_wrapper* p) + { + using net::asio_handler_allocate; + return asio_handler_allocate( + size, std::addressof(p->h_)); + } + + friend + void asio_handler_deallocate( + void* mem, std::size_t size, + bind_default_executor_wrapper* p) + { + using net::asio_handler_deallocate; + asio_handler_deallocate(mem, size, + std::addressof(p->h_)); + } + + friend + bool asio_handler_is_continuation( + bind_default_executor_wrapper* p) + { + using net::asio_handler_is_continuation; + return asio_handler_is_continuation( + std::addressof(p->h_)); + } +}; + +template +auto +bind_default_executor(Executor const& ex, Handler&& h) -> + bind_default_executor_wrapper< + typename std::decay::type, + Executor> +{ + return bind_default_executor_wrapper< + typename std::decay::type, + Executor>(std::forward(h), ex); +} + +} // detail +} // beast +} // boost + +#endif diff --git a/include/boost/beast/core/detail/stranded_stream.hpp b/include/boost/beast/core/detail/stranded_stream.hpp new file mode 100644 index 00000000..f8448b00 --- /dev/null +++ b/include/boost/beast/core/detail/stranded_stream.hpp @@ -0,0 +1,42 @@ +// +// Copyright (c) 2018 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_CORE_DETAIL_STRANDED_STREAM_HPP +#define BOOST_BEAST_CORE_DETAIL_STRANDED_STREAM_HPP + +#include +#include +#include + +namespace boost { +namespace beast { +namespace detail { + +template +class stranded_stream_base +{ +protected: + net::basic_stream_socket socket_; + + template + explicit + stranded_stream_base(Args&&... args) + : socket_(std::forward(args)...) + { + } + + stranded_stream_base(stranded_stream_base&&) = default; + stranded_stream_base& operator=(stranded_stream_base&&) = default; +}; + +} // detail +} // beast +} // boost + +#endif diff --git a/include/boost/beast/core/stranded_stream.hpp b/include/boost/beast/core/stranded_stream.hpp new file mode 100644 index 00000000..583fba2e --- /dev/null +++ b/include/boost/beast/core/stranded_stream.hpp @@ -0,0 +1,433 @@ +// +// Copyright (c) 2018 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_CORE_STRANDED_STREAM_HPP +#define BOOST_BEAST_CORE_STRANDED_STREAM_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace beast { + +//------------------------------------------------------------------------------ + +/** A stream socket using a custom executor, defaulting to a strand + + This class template is parameterized on the executor type to be + used for all asynchronous operations. This achieves partial support for + [P1322]. The default template parameter uses a strand for the next + layer's executor. + + @see [P1322R0] + Networking TS enhancement to enable custom I/O executors +*/ +template< + class Protocol, + class Executor = beast::executor_type> +> +class stranded_stream +#ifndef BOOST_BEAST_DOXYGEN + : private detail::stranded_stream_base + , private boost::empty_value +#endif +{ + // Restricted until P1322R0 is incorporated into Boost.Asio. + static_assert( + std::is_convertible().context()), + net::io_context&>::value, + "Only net::io_context is currently supported for executor_type::context()"); + +public: + /// The type of the executor associated with the object. + using executor_type = Executor; + + /// The type of the next layer. + using next_layer_type = net::basic_stream_socket; + + /// The protocol type. + using protocol_type = Protocol; + + /// The endpoint type. + using endpoint_type = typename Protocol::endpoint; + + /** Construct the stream without opening it. + + This constructor creates a stream. The underlying socket needs + to be opened and then connected or accepted before data can be + sent or received on it. + + @param ctx An object whose type meets the requirements of + ExecutionContext, which the stream will use to dispatch + handlers for any asynchronous operations performed on the socket. + Currently, the only supported type for `ctx` is `net::io_context`. + + @param args A list of parameters forwarded to the constructor of + the underlying socket. + + @note This function does not participate in overload resolution unless: + @li `std::is_convertible::value` is `true`, and + @li `std::is_constructible::value` is `true`. + + @see http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1322r0.html + */ + template< + class ExecutionContext, + class... Args + #if ! BOOST_BEAST_DOXYGEN + , class = typename std::enable_if< + std::is_convertible< + ExecutionContext&, + net::execution_context&>::value && + std::is_constructible< + executor_type, + typename ExecutionContext::executor_type>::value + >::type + #endif + > + explicit + stranded_stream(ExecutionContext& ctx, Args&&... args) + : detail::stranded_stream_base( + ctx, std::forward(args)...) + , boost::empty_value( + boost::empty_init_t{}, ctx.get_executor()) + { + // Restriction is necessary until Asio fully supports P1322R0 + static_assert( + std::is_same::value, + "Only net::io_context is currently supported for ExecutionContext"); + } + + /** Construct the stream without opening it. + + This constructor creates a stream. The underlying socket needs + to be opened and then connected or accepted before data can be + sent or received on it. + + @param ex The executor which stream will use to dispatch handlers for + any asynchronous operations performed on the underlying socket. + Currently, only executors that return a `net::io_context&` from + `ex.context()` are supported. + + @param args A list of parameters forwarded to the constructor of + the underlying socket. + + @see http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1322r0.html + */ + template + explicit + stranded_stream(executor_type const& ex, Args&&... args) + : detail::stranded_stream_base( + ex.context(), std::forward(args)...) + , boost::empty_value( + boost::empty_init_t{}, ex) + { + // Restriction is necessary until Asio fully supports P1322R0 + if(ex.context().get_executor() != this->socket_.get_executor()) + throw std::invalid_argument( + "ctx.get_executor() != socket.get_executor()"); + } + + /** Move-construct a stream from another stream + + This constructor moves a stream from one object to another. + + The behavior of moving a stream while asynchronous operations + are outstanding is undefined. + + @param other The other object from which the move will occur. + + @note Following the move, the moved-from object is in a newly + constructed state. + */ + stranded_stream(stranded_stream&& other) = default; + + /** Move-assign a stream from another stream + + This assignment operator moves a stream socket from one object + to another. + + The behavior of move assignment while asynchronous operations + are pending is undefined. + + @param other The other basic_timeout_stream object from which the + move will occur. + + @note Following the move, the moved-from object is a newly + constructed state. + */ + stranded_stream& + operator=(stranded_stream&& other) = default; + + //-------------------------------------------------------------------------- + + /** Return the executor associated with the object. + + @return A copy of the executor that stream will use to dispatch handlers. + */ + executor_type + get_executor() const noexcept + { + return this->get(); + } + + /** Get a reference to the underlying socket. + + This function returns a reference to the next layer + in a stack of stream layers. + + @return A reference to the next layer in the stack of + stream layers. + */ + next_layer_type& + next_layer() noexcept + { + return this->socket_; + } + + /** Get a reference to the underlying socket. + + This function returns a reference to the next layer in a + stack of stream layers. + + @return A reference to the next layer in the stack of + stream layers. + */ + next_layer_type const& + next_layer() const noexcept + { + return this->socket_; + } + + /** Start an asynchronous connect. + + This function is used to asynchronously connect a socket to the + specified remote endpoint. The function call always returns immediately. + + The underlying socket is automatically opened if it is not already open. + If the connect fails, and the socket was automatically opened, the socket + is not returned to the closed state. + + @param ep The remote endpoint to which the underlying socket will be + connected. Copies will be made of the endpoint object as required. + + @param handler The handler to be called when the operation completes. + The implementation will take ownership of the handler by move construction. + The handler must be invocable with this signature: + @code + void handler( + error_code ec // Result of operation + ); + @endcode + Regardless of whether the asynchronous operation completes immediately or + not, the handler will not be invoked from within this function. Invocation + of the handler will be performed in a manner equivalent to using + `net::post()`. + */ + template + BOOST_ASIO_INITFN_RESULT_TYPE(ConnectHandler, + void(error_code)) + async_connect( + endpoint_type ep, + ConnectHandler&& handler); + + /** Read some data from the stream. + + This function is used to read data from the stream. The function call will + block until one or more bytes of data has been read successfully, or until + an error occurs. + + @param buffers The buffers into which the data will be read. + + @returns The number of bytes read. + + @throws boost::system::system_error Thrown on failure. + + @note The `read_some` operation may not read all of the requested number of + bytes. Consider using the function `net::read` if you need to ensure + that the requested amount of data is read before the blocking operation + completes. + */ + template + std::size_t + read_some(MutableBufferSequence const& buffers) + { + return this->socket_.read_some(buffers); + } + + /** Read some data from the stream. + + This function is used to read data from the stream. The function call will + block until one or more bytes of data has been read successfully, or until + an error occurs. + + @param buffers The buffers into which the data will be read. + + @param ec Set to indicate what error occurred, if any. + + @returns The number of bytes read. + + @note The `read_some` operation may not read all of the requested number of + bytes. Consider using the function `net::read` if you need to ensure + that the requested amount of data is read before the blocking operation + completes. + */ + template + std::size_t + read_some( + MutableBufferSequence const& buffers, + error_code& ec) + { + return this->socket_.read_some(buffers, ec); + } + + /** Start an asynchronous read. + + This function is used to asynchronously read data from the stream. + The function call always returns immediately. + + @param buffers A range of zero or more buffers to read stream data into. + Although the buffers object may be copied as necessary, ownership of the + underlying memory blocks is retained by the caller, which must guarantee + that they remain valid until the handler is called. + + @param handler The handler to be called when the operation completes. + The implementation will take ownership of the handler by move construction. + The handler must be invocable with this signature: + @code + void handler( + error_code error, // Result of operation. + std::size_t bytes_transferred // Number of bytes read. + ); + @endcode + Regardless of whether the asynchronous operation completes immediately or + not, the handler will not be invoked from within this function. Invocation + of the handler will be performed in a manner equivalent to using + `net::post()`. + */ + template + BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, + void(error_code, std::size_t)) + async_read_some( + MutableBufferSequence const& buffers, + ReadHandler&& handler) + { + BOOST_BEAST_HANDLER_INIT(ReadHandler, + void(error_code, std::size_t)); + this->socket_.async_read_some( + buffers, + detail::bind_default_executor( + this->get(), + std::move(init.completion_handler))); + return init.result.get(); + } + + /** Write some data to the stream. + + This function is used to write data on the stream. The function call will + block until one or more bytes of data has been written successfully, or + until an error occurs. + + @param buffers The data to be written. + + @returns The number of bytes written. + + @throws boost::system::system_error Thrown on failure. + + @note The `write_some` operation may not transmit all of the data to the + peer. Consider using the function `net::write` if you need to + ensure that all data is written before the blocking operation completes. + */ + template + std::size_t + write_some(ConstBufferSequence const& buffers) + { + return this->socket_.write_some(buffers); + } + + /** Write some data to the stream. + + This function is used to write data on the stream. The function call will + block until one or more bytes of data has been written successfully, or + until an error occurs. + + @param buffers The data to be written. + + @param ec Set to indicate what error occurred, if any. + + @returns The number of bytes written. + + @note The `write_some` operation may not transmit all of the data to the + peer. Consider using the function `net::write` if you need to + ensure that all data is written before the blocking operation completes. + */ + template + std::size_t + write_some( + ConstBufferSequence const& buffers, + error_code& ec) + { + return this->socket_.write_some(buffers, ec); + } + + /** Start an asynchronous write. + + This function is used to asynchronously write data to the stream. + The function call always returns immediately. + + @param buffers A range of zero or more buffers to be written to the stream. + Although the buffers object may be copied as necessary, ownership of the + underlying memory blocks is retained by the caller, which must guarantee + that they remain valid until the handler is called. + + @param handler The handler to be called when the operation completes. + The implementation will take ownership of the handler by move construction. + The handler must be invocable with this signature: + @code + void handler( + error_code error, // Result of operation. + std::size_t bytes_transferred // Number of bytes written. + ); + @endcode + Regardless of whether the asynchronous operation completes immediately or + not, the handler will not be invoked from within this function. Invocation + of the handler will be performed in a manner equivalent to using + `net::post()`. + */ + template + BOOST_ASIO_INITFN_RESULT_TYPE(WriteHandler, + void(error_code, std::size_t)) + async_write_some( + ConstBufferSequence const& buffers, + WriteHandler&& handler) + { + BOOST_BEAST_HANDLER_INIT(WriteHandler, + void(error_code, std::size_t)); + this->socket_.async_write_some( + buffers, + detail::bind_default_executor( + this->get(), + std::move(init.completion_handler))); + return init.result.get(); + } +}; + +} // beast +} // boost + +#endif diff --git a/test/beast/core/CMakeLists.txt b/test/beast/core/CMakeLists.txt index a4da6a87..d9890450 100644 --- a/test/beast/core/CMakeLists.txt +++ b/test/beast/core/CMakeLists.txt @@ -60,6 +60,7 @@ add_executable (tests-beast-core span.cpp static_buffer.cpp static_string.cpp + stranded_stream.cpp stream_traits.cpp string.cpp string_param.cpp diff --git a/test/beast/core/Jamfile b/test/beast/core/Jamfile index c65f2739..c53ce101 100644 --- a/test/beast/core/Jamfile +++ b/test/beast/core/Jamfile @@ -48,6 +48,7 @@ local SOURCES = span.cpp static_buffer.cpp static_string.cpp + stranded_stream.cpp stream_traits.cpp string.cpp string_param.cpp diff --git a/test/beast/core/stranded_stream.cpp b/test/beast/core/stranded_stream.cpp new file mode 100644 index 00000000..b282bcec --- /dev/null +++ b/test/beast/core/stranded_stream.cpp @@ -0,0 +1,81 @@ +// +// Copyright (c) 2018 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +// Test that header file is self-contained. +#include + +#include "stream_tests.hpp" + +#include +#include + +namespace boost { +namespace beast { + +class stranded_stream_test + : public beast::unit_test::suite +{ +public: + using tcp = net::ip::tcp; + + void + testStream() + { + net::io_context ioc; + { + using ex_t = net::io_context::executor_type; + auto ex = ioc.get_executor(); + stranded_stream s1(ioc); + stranded_stream s2(ex); + stranded_stream s3(ioc, tcp::v4()); + stranded_stream s4(std::move(s1)); + s2.next_layer() = tcp::socket(ioc); + BEAST_EXPECT(s1.get_executor() == ex); + BEAST_EXPECT(s2.get_executor() == ex); + BEAST_EXPECT(s3.get_executor() == ex); + BEAST_EXPECT(s4.get_executor() == ex); + } + { + using ex_t = net::io_context::strand; + auto ex = ex_t{ioc}; + stranded_stream s1(ex); + stranded_stream s2(ex, tcp::v4()); + stranded_stream s3(std::move(s1)); + s2.next_layer() = tcp::socket(ioc); + BEAST_EXPECT(s1.get_executor() == ex); + BEAST_EXPECT(s2.get_executor() == ex); + BEAST_EXPECT(s3.get_executor() == ex); + } + { + using ex_t = net::io_context::executor_type; + test_sync_stream>(); + test_async_stream>(); + } + } + + void + testJavadocs() + { + } + + //-------------------------------------------------------------------------- + + void + run() + { + testStream(); + testJavadocs(); + pass(); + } +}; + +BEAST_DEFINE_TESTSUITE(beast,core,stranded_stream); + +} // beast +} // boost diff --git a/test/beast/core/stream_tests.hpp b/test/beast/core/stream_tests.hpp index b17c3dc9..b0cde3f1 100644 --- a/test/beast/core/stream_tests.hpp +++ b/test/beast/core/stream_tests.hpp @@ -20,6 +20,40 @@ namespace boost { namespace beast { +template +void +test_sync_read_stream() +{ + BOOST_ASSERT(is_sync_read_stream::value); + BEAST_EXPECT(static_cast< + std::size_t(SyncReadStream::*)(net::mutable_buffer const&)>( + &SyncReadStream::template read_some)); + BEAST_EXPECT(static_cast< + std::size_t(SyncReadStream::*)(net::mutable_buffer const&, error_code&)>( + &SyncReadStream::template read_some)); +} + +template +void +test_sync_write_stream() +{ + BOOST_ASSERT(is_sync_write_stream::value); + BEAST_EXPECT(static_cast< + std::size_t(SyncWriteStream::*)(net::const_buffer const&)>( + &SyncWriteStream::template write_some)); + BEAST_EXPECT(static_cast< + std::size_t(SyncWriteStream::*)(net::const_buffer const&, error_code&)>( + &SyncWriteStream::template write_some)); +} + +template +void +test_sync_stream() +{ + test_sync_read_stream(); + test_sync_write_stream(); +} + template void test_async_read_stream()