From c9a445a937e3d7c07be72f23fef498ce53a1a3fc Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 27 Apr 2018 16:20:59 -0700 Subject: [PATCH] New flat_stream example class: fix #1108 The `flat_stream` is a stream wrapper designed to overcome a performance limitation of the `boost::asio::ssl::stream` implementation. Specifically, when writing buffer sequences having length greater than one, the `ssl::stream` implementation does not use scatter-gather I/O and instead performs a kernel transition for every buffer in the sequence. The wrapper addresses this problem by allocating memory and presenting the buffer sequence into a single buffer, using some logic to determine when this allocation is advantageous versus simply passing the buffers through as-is. See Also: https://github.com/boostorg/asio/issues/100 https://stackoverflow.com/questions/50026167/performance-drop-on-port-from-beast-1-0-0-b66-to-boost-1-67-0-beast --- CHANGELOG.md | 1 + doc/qbk/02_examples.qbk | 10 + example/common/flat_stream.hpp | 506 ++++++++++++++++++ test/example/common/CMakeLists.txt | 4 +- test/example/common/Jamfile | 1 + test/example/common/flat_stream.cpp | 109 ++++ test/example/common/main.cpp | 12 - .../include/boost/beast/test/websocket.hpp | 249 +++++++++ 8 files changed, 879 insertions(+), 13 deletions(-) create mode 100644 example/common/flat_stream.hpp create mode 100644 test/example/common/flat_stream.cpp delete mode 100644 test/example/common/main.cpp create mode 100644 test/extras/include/boost/beast/test/websocket.hpp diff --git a/CHANGELOG.md b/CHANGELOG.md index 33ddd5d2..06f1237a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Version 169: * Use boost::void_t * Refactor HTTP write_op implementation * Use fully qualified namespace in BOOST_BEAST_HANDLER_INIT +* New flat_stream example class -------------------------------------------------------------------------------- diff --git a/doc/qbk/02_examples.qbk b/doc/qbk/02_examples.qbk index 22d0fb14..5b757206 100644 --- a/doc/qbk/02_examples.qbk +++ b/doc/qbk/02_examples.qbk @@ -216,6 +216,16 @@ listed here along with a description of their use: in the documentation. It is used by the "flex" servers which support both plain and SSL sessions on the same port. ] +][ + [[source_file example/common/flat_stream.hpp]] + [ + This wrapper flattens buffer sequences having length greater than 1 + and total size below a predefined amount, using a dynamic memory + allocation. It is primarily designed to overcome a performance + limitation of the current Boost.Asio version of `ssl::stream`, which + does not use OpenSSL's scatter/gather interface for its primitive + read and write operations. + ] ][ [[source_file example/common/root_certificates.hpp]] [ diff --git a/example/common/flat_stream.hpp b/example/common/flat_stream.hpp new file mode 100644 index 00000000..d7f08540 --- /dev/null +++ b/example/common/flat_stream.hpp @@ -0,0 +1,506 @@ +// +// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_EXAMPLE_COMMON_FLAT_STREAM_HPP +#define BOOST_BEAST_EXAMPLE_COMMON_FLAT_STREAM_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace detail { + +class flat_stream_base +{ +public: + static std::size_t constexpr coalesce_limit = 64 * 1024; + + // calculates the coalesce settings for a buffer sequence + template + static + std::pair + coalesce(BufferSequence const& buffers, std::size_t limit) + { + std::pair result{0, false}; + auto first = boost::asio::buffer_sequence_begin(buffers); + auto last = boost::asio::buffer_sequence_end(buffers); + if(first != last) + { + result.first = boost::asio::buffer_size(*first); + if(result.first < limit) + { + auto it = first; + auto prev = first; + while(++it != last) + { + auto const n = + boost::asio::buffer_size(*it); + if(result.first + n > limit) + break; + result.first += n; + prev = it; + } + result.second = prev != first; + } + } + return result; + } +}; + +} // detail + +/** Flat stream wrapper. + + This wrapper flattens buffer sequences having length greater than 1 + and total size below a predefined amount, using a dynamic memory + allocation. It is primarily designed to overcome a performance + limitation of the current Boost.Asio version of ssl::stream, which + does not use OpenSSL's scatter/gather interface for its primitive + read and write operations. + + See: + https://github.com/boostorg/beast/issues/1108 + https://github.com/boostorg/asio/issues/100 + https://stackoverflow.com/questions/38198638/openssl-ssl-write-from-multiple-buffers-ssl-writev + https://stackoverflow.com/questions/50026167/performance-drop-on-port-from-beast-1-0-0-b66-to-boost-1-67-0-beast +*/ +template +class flat_stream : private detail::flat_stream_base +{ + // Largest buffer size we will flatten + static std::size_t constexpr max_size = 1024 * 1024; + + template class write_op; + + NextLayer stream_; + +public: + /// The type of the next layer. + using next_layer_type = + typename std::remove_reference::type; + + /// The type of the lowest layer. + using lowest_layer_type = boost::beast::get_lowest_layer; + + /// The type of the executor associated with the object. + using executor_type = typename next_layer_type::executor_type; + + ~flat_stream() = default; + flat_stream(flat_stream&&) = default; + flat_stream(flat_stream const&) = delete; + flat_stream& operator=(flat_stream&&) = default; + flat_stream& operator=(flat_stream const&) = delete; + + /** Constructor + + Arguments, if any, are forwarded to the next layer's constructor. + */ + template + explicit + flat_stream(Args&&... args); + + //-------------------------------------------------------------------------- + + /** Get the executor associated with the object. + + This function may be used to obtain the executor object that the + stream uses to dispatch handlers for asynchronous operations. + + @return A copy of the executor that stream will use to dispatch handlers. + */ + executor_type + get_executor() noexcept + { + return stream_.get_executor(); + } + + /** Get a reference to the next layer + + This function returns a reference to the next layer + in a stack of stream layers. + + @return A reference to the next layer in the stack of + stream layers. + */ + next_layer_type& + next_layer() + { + return stream_; + } + + /** Get a reference to the next layer + + This function returns a reference to the next layer in a + stack of stream layers. + + @return A reference to the next layer in the stack of + stream layers. + */ + next_layer_type const& + next_layer() const + { + return stream_; + } + + /** Get a reference to the lowest layer + + This function returns a reference to the lowest layer + in a stack of stream layers. + + @return A reference to the lowest layer in the stack of + stream layers. + */ + lowest_layer_type& + lowest_layer() + { + return stream_.lowest_layer(); + } + + /** Get a reference to the lowest layer + + This function returns a reference to the lowest layer + in a stack of stream layers. + + @return A reference to the lowest layer in the stack of + stream layers. Ownership is not transferred to the caller. + */ + lowest_layer_type const& + lowest_layer() const + { + return stream_.lowest_layer(); + } + + //-------------------------------------------------------------------------- + + template + std::size_t + read_some(MutableBufferSequence const& buffers); + + template + std::size_t + read_some( + MutableBufferSequence const& buffers, + boost::system::error_code& ec); + + template + std::size_t + write_some(ConstBufferSequence const& buffers); + + template + std::size_t + write_some( + ConstBufferSequence const& buffers, + boost::system::error_code& ec); + + template< + class MutableBufferSequence, + class ReadHandler> + BOOST_ASIO_INITFN_RESULT_TYPE( + ReadHandler, void(boost::system::error_code, std::size_t)) + async_read_some( + MutableBufferSequence const& buffers, + ReadHandler&& handler); + + template< + class ConstBufferSequence, + class WriteHandler> + BOOST_ASIO_INITFN_RESULT_TYPE( + WriteHandler, void(boost::system::error_code, std::size_t)) + async_write_some( + ConstBufferSequence const& buffers, + WriteHandler&& handler); + + template + friend + void + teardown(boost::beast::websocket::role_type, + flat_stream& s, boost::system::error_code& ec); + + template + friend + void + async_teardown(boost::beast::websocket::role_type role, + flat_stream& s, TeardownHandler&& handler); +}; + +//------------------------------------------------------------------------------ + +template +template +class flat_stream::write_op + : public boost::asio::coroutine +{ + using alloc_type = typename + boost::asio::associated_allocator_t::template + rebind::other; + + struct deleter + { + alloc_type alloc; + std::size_t size = 0; + + explicit + deleter(alloc_type const& alloc_) + : alloc(alloc_) + { + } + + void + operator()(char* p) + { + alloc.deallocate(p, size); + } + }; + + flat_stream& s_; + ConstBufferSequence b_; + std::unique_ptr p_; + Handler h_; + +public: + template + write_op( + flat_stream& s, + ConstBufferSequence const& b, + DeducedHandler&& h) + : s_(s) + , b_(b) + , p_(nullptr, deleter{ + (boost::asio::get_associated_allocator)(h)}) + , h_(std::forward(h)) + { + } + + using allocator_type = + boost::asio::associated_allocator_t; + + allocator_type + get_allocator() const noexcept + { + return (boost::asio::get_associated_allocator)(h_); + } + + using executor_type = boost::asio::associated_executor_t< + Handler, decltype(std::declval().get_executor())>; + + executor_type + get_executor() const noexcept + { + return (boost::asio::get_associated_executor)( + h_, s_.get_executor()); + } + + void + operator()( + boost::system::error_code ec, + std::size_t bytes_transferred) + { + BOOST_ASIO_CORO_REENTER(*this) + { + BOOST_ASIO_CORO_YIELD + { + auto const result = coalesce(b_, coalesce_limit); + if(result.second) + { + p_.get_deleter().size = result.first; + p_.reset(p_.get_deleter().alloc.allocate( + p_.get_deleter().size)); + boost::asio::buffer_copy( + boost::asio::buffer( + p_.get(), p_.get_deleter().size), + b_, result.first); + s_.stream_.async_write_some( + boost::asio::buffer( + p_.get(), p_.get_deleter().size), + std::move(*this)); + } + else + { + s_.stream_.async_write_some( + boost::beast::buffers_prefix(result.first, b_), + std::move(*this)); + } + } + p_.reset(); + h_(ec, bytes_transferred); + } + } + + friend + bool asio_handler_is_continuation(write_op* op) + { + using boost::asio::asio_handler_is_continuation; + return asio_handler_is_continuation( + std::addressof(op->h_)); + } + + template + friend + void asio_handler_invoke(Function&& f, write_op* op) + { + using boost::asio::asio_handler_invoke; + asio_handler_invoke(f, std::addressof(op->h_)); + } +}; + +//------------------------------------------------------------------------------ + +template +template +flat_stream:: +flat_stream(Args&&... args) + : stream_(std::forward(args)...) +{ +} + +template +template +std::size_t +flat_stream:: +read_some(MutableBufferSequence const& buffers) +{ + static_assert(boost::beast::is_sync_read_stream::value, + "SyncReadStream requirements not met"); + static_assert(boost::asio::is_mutable_buffer_sequence< + MutableBufferSequence>::value, + "MutableBufferSequence requirements not met"); + boost::system::error_code ec; + auto n = read_some(buffers, ec); + if(ec) + BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); + return n; +} + +template +template +std::size_t +flat_stream:: +read_some(MutableBufferSequence const& buffers, boost::system::error_code& ec) +{ + return stream_.read_some(buffers, ec); +} + +template +template +std::size_t +flat_stream:: +write_some(ConstBufferSequence const& buffers) +{ + static_assert(boost::beast::is_sync_write_stream::value, + "SyncWriteStream requirements not met"); + static_assert(boost::asio::is_const_buffer_sequence< + ConstBufferSequence>::value, + "ConstBufferSequence requirements not met"); + boost::system::error_code ec; + auto n = write_some(buffers, ec); + if(ec) + BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); + return n; +} + +template +template +std::size_t +flat_stream:: +write_some( + ConstBufferSequence const& buffers, + boost::system::error_code& ec) +{ + auto const result = coalesce(buffers, coalesce_limit); + if(result.second) + { + std::unique_ptr p{new char[result.first]}; + auto const b = boost::asio::buffer(p.get(), result.first); + boost::asio::buffer_copy(b, buffers); + return stream_.write_some(b, ec); + } + return stream_.write_some( + boost::beast::buffers_prefix(result.first, buffers), ec); +} + +template +template< + class MutableBufferSequence, + class ReadHandler> +BOOST_ASIO_INITFN_RESULT_TYPE( + ReadHandler, void(boost::system::error_code, std::size_t)) +flat_stream:: +async_read_some( + MutableBufferSequence const& buffers, + ReadHandler&& handler) +{ + static_assert(boost::beast::is_async_read_stream::value, + "AsyncReadStream requirements not met"); + static_assert(boost::asio::is_mutable_buffer_sequence< + MutableBufferSequence >::value, + "MutableBufferSequence requirements not met"); + return stream_.async_read_some( + buffers, std::forward(handler)); +} + +template +template< + class ConstBufferSequence, + class WriteHandler> +BOOST_ASIO_INITFN_RESULT_TYPE( + WriteHandler, void(boost::system::error_code, std::size_t)) +flat_stream:: +async_write_some( + ConstBufferSequence const& buffers, + WriteHandler&& handler) +{ + static_assert(boost::beast::is_async_write_stream::value, + "AsyncWriteStream requirements not met"); + static_assert(boost::asio::is_const_buffer_sequence< + ConstBufferSequence>::value, + "ConstBufferSequence requirements not met"); + BOOST_BEAST_HANDLER_INIT( + WriteHandler, void(boost::system::error_code, std::size_t)); + write_op{ + *this, buffers, std::move(init.completion_handler)}({}, 0); + return init.result.get(); +} + +template +void +teardown( + boost::beast::websocket::role_type role, + flat_stream& s, + boost::system::error_code& ec) +{ + using boost::beast::websocket::teardown; + teardown(role, s.stream_, ec); +} + +template +void +async_teardown( + boost::beast::websocket::role_type role, + flat_stream& s, + TeardownHandler&& handler) +{ + using boost::beast::websocket::async_teardown; + async_teardown(role, s.stream_, std::move(handler)); +} + +#endif diff --git a/test/example/common/CMakeLists.txt b/test/example/common/CMakeLists.txt index 2e3ee447..44e16418 100644 --- a/test/example/common/CMakeLists.txt +++ b/test/example/common/CMakeLists.txt @@ -16,13 +16,15 @@ GroupSources(test/example/common "/") add_executable (tests-example-common ${BOOST_BEAST_FILES} ${COMMON_FILES} + ${EXTRAS_FILES} + ${TEST_MAIN} Jamfile detect_ssl.cpp + flat_stream.cpp root_certificates.cpp server_certificate.cpp session_alloc.cpp ssl_stream.cpp - main.cpp ) set_property(TARGET tests-example-common PROPERTY FOLDER "tests") \ No newline at end of file diff --git a/test/example/common/Jamfile b/test/example/common/Jamfile index 719deba0..79ec00c0 100644 --- a/test/example/common/Jamfile +++ b/test/example/common/Jamfile @@ -9,6 +9,7 @@ local SOURCES = detect_ssl.cpp + flat_stream.cpp root_certificates.cpp server_certificate.cpp session_alloc.cpp diff --git a/test/example/common/flat_stream.cpp b/test/example/common/flat_stream.cpp new file mode 100644 index 00000000..7ef48488 --- /dev/null +++ b/test/example/common/flat_stream.cpp @@ -0,0 +1,109 @@ +// +// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +// Test that header file is self-contained. +#include + +#include +#include +#include +#include +#include + +namespace boost { +namespace beast { + +class flat_stream_test + : public unit_test::suite + , public test::enable_yield_to +{ +public: + void + testSplit() + { + auto const check = + [&]( + std::initializer_list v0, + std::size_t limit, + unsigned long count, + bool copy) + { + std::vector v; + v.reserve(v0.size()); + for(auto const n : v0) + v.emplace_back("", n); + auto const result = + ::detail::flat_stream_base::coalesce(v, limit); + BEAST_EXPECT(result.first == count); + BEAST_EXPECT(result.second == copy); + return result; + }; + check({}, 1, 0, false); + check({1,2}, 1, 1, false); + check({1,2}, 2, 1, false); + check({1,2}, 3, 3, true); + check({1,2}, 4, 3, true); + check({1,2,3}, 1, 1, false); + check({1,2,3}, 2, 1, false); + check({1,2,3}, 3, 3, true); + check({1,2,3}, 4, 3, true); + check({1,2,3}, 7, 6, true); + check({1,2,3,4}, 3, 3, true); + } + + void + testHttp() + { + pass(); + } + + void + testWebsocket() + { + { + error_code ec; + test::ws_echo_server es{log}; + boost::asio::io_context ioc; + websocket::stream> ws{ioc}; + ws.next_layer().next_layer().connect(es.stream()); + ws.handshake("localhost", "/", ec); + BEAST_EXPECTS(! ec, ec.message()); + ws.close({}, ec); + BEAST_EXPECTS(! ec, ec.message()); + } + { + test::ws_echo_server es{log}; + boost::asio::io_context ioc; + websocket::stream> ws{ioc}; + ws.next_layer().next_layer().connect(es.stream()); + ws.async_handshake("localhost", "/", + [&](error_code) + { + ws.async_close({}, + [&](error_code) + { + }); + }); + ioc.run(); + } + } + + void + run() override + { + testSplit(); + testHttp(); + testWebsocket(); + } +}; + +BEAST_DEFINE_TESTSUITE(beast,example,flat_stream); + +} // beast +} // boost diff --git a/test/example/common/main.cpp b/test/example/common/main.cpp deleted file mode 100644 index fc0903f5..00000000 --- a/test/example/common/main.cpp +++ /dev/null @@ -1,12 +0,0 @@ -// -// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/boostorg/beast -// - -int main() -{ -} diff --git a/test/extras/include/boost/beast/test/websocket.hpp b/test/extras/include/boost/beast/test/websocket.hpp new file mode 100644 index 00000000..09685624 --- /dev/null +++ b/test/extras/include/boost/beast/test/websocket.hpp @@ -0,0 +1,249 @@ +// +// Copyright (w) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_TEST_WEBSOCKET_HPP +#define BOOST_BEAST_TEST_WEBSOCKET_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace beast { +namespace test { + +class ws_echo_server +{ + std::ostream& log_; + boost::asio::io_context ioc_; + boost::asio::executor_work_guard< + boost::asio::io_context::executor_type> work_; + multi_buffer buffer_; + test::stream ts_; + std::thread t_; + websocket::stream ws_; + bool close_ = false; + +public: + enum class kind + { + sync, + async, + async_client + }; + + explicit + ws_echo_server( + std::ostream& log, + kind k = kind::sync) + : log_(log) + , work_(ioc_.get_executor()) + , ts_(ioc_) + , ws_(ts_) + { + beast::websocket::permessage_deflate pmd; + pmd.server_enable = true; + pmd.server_max_window_bits = 9; + pmd.compLevel = 1; + ws_.set_option(pmd); + + switch(k) + { + case kind::sync: + t_ = std::thread{[&]{ do_sync(); }}; + break; + + case kind::async: + t_ = std::thread{[&]{ ioc_.run(); }}; + do_accept(); + break; + + case kind::async_client: + t_ = std::thread{[&]{ ioc_.run(); }}; + break; + } + } + + ~ws_echo_server() + { + work_.reset(); + t_.join(); + } + + test::stream& + stream() + { + return ts_; + } + + void + async_handshake() + { + ws_.async_handshake("localhost", "/", + std::bind( + &ws_echo_server::on_handshake, + this, + std::placeholders::_1)); + } + + void + async_close() + { + boost::asio::post(ioc_, + [&] + { + if(ws_.is_open()) + { + ws_.async_close({}, + std::bind( + &ws_echo_server::on_close, + this, + std::placeholders::_1)); + } + else + { + close_ = true; + } + }); + } + +private: + void + do_sync() + { + try + { + ws_.accept(); + for(;;) + { + ws_.read(buffer_); + ws_.text(ws_.got_text()); + ws_.write(buffer_.data()); + buffer_.consume(buffer_.size()); + } + } + catch(system_error const& se) + { + boost::ignore_unused(se); +#if 0 + if( se.code() != error::closed && + se.code() != error::failed && + se.code() != boost::asio::error::eof) + log_ << "ws_echo_server: " << se.code().message() << std::endl; +#endif + } + catch(std::exception const& e) + { + log_ << "ws_echo_server: " << e.what() << std::endl; + } + } + + void + do_accept() + { + ws_.async_accept(std::bind( + &ws_echo_server::on_accept, + this, + std::placeholders::_1)); + } + + void + on_handshake(error_code ec) + { + if(ec) + return fail(ec); + + do_read(); + } + + void + on_accept(error_code ec) + { + if(ec) + return fail(ec); + + if(close_) + { + return ws_.async_close({}, + std::bind( + &ws_echo_server::on_close, + this, + std::placeholders::_1)); + } + + do_read(); + } + + void + do_read() + { + ws_.async_read(buffer_, + std::bind( + &ws_echo_server::on_read, + this, + std::placeholders::_1)); + } + + void + on_read(error_code ec) + { + if(ec) + return fail(ec); + ws_.text(ws_.got_text()); + ws_.async_write(buffer_.data(), + std::bind( + &ws_echo_server::on_write, + this, + std::placeholders::_1)); + } + + void + on_write(error_code ec) + { + if(ec) + return fail(ec); + buffer_.consume(buffer_.size()); + do_read(); + } + + void + on_close(error_code ec) + { + if(ec) + return fail(ec); + } + + void + fail(error_code ec) + { + boost::ignore_unused(ec); +#if 0 + if( ec != error::closed && + ec != error::failed && + ec != boost::asio::error::eof) + log_ << + "echo_server_async: " << + ec.message() << + std::endl; +#endif + } +}; + +} // test +} // beast +} // boost + +#endif