Add flat_stream to experimental:

This adds a new stream wrapper class template designed to address
a performance shortcoming of boost::asio::ssl::stream.
This commit is contained in:
Vinnie Falco
2018-04-29 15:01:22 -07:00
parent 46e658a91b
commit 6108cf3eb7
17 changed files with 801 additions and 850 deletions
-506
View File
@@ -1,506 +0,0 @@
//
// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_EXAMPLE_COMMON_FLAT_STREAM_HPP
#define BOOST_BEAST_EXAMPLE_COMMON_FLAT_STREAM_HPP
#include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/type_traits.hpp>
#include <boost/beast/websocket/teardown.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/throw_exception.hpp>
#include <iterator>
#include <memory>
#include <utility>
namespace detail {
class flat_stream_base
{
public:
static std::size_t constexpr coalesce_limit = 64 * 1024;
// calculates the coalesce settings for a buffer sequence
template<class BufferSequence>
static
std::pair<std::size_t, bool>
coalesce(BufferSequence const& buffers, std::size_t limit)
{
std::pair<std::size_t, bool> result{0, false};
auto first = boost::asio::buffer_sequence_begin(buffers);
auto last = boost::asio::buffer_sequence_end(buffers);
if(first != last)
{
result.first = boost::asio::buffer_size(*first);
if(result.first < limit)
{
auto it = first;
auto prev = first;
while(++it != last)
{
auto const n =
boost::asio::buffer_size(*it);
if(result.first + n > limit)
break;
result.first += n;
prev = it;
}
result.second = prev != first;
}
}
return result;
}
};
} // detail
/** Flat stream wrapper.
This wrapper flattens buffer sequences having length greater than 1
and total size below a predefined amount, using a dynamic memory
allocation. It is primarily designed to overcome a performance
limitation of the current Boost.Asio version of ssl::stream, which
does not use OpenSSL's scatter/gather interface for its primitive
read and write operations.
See:
https://github.com/boostorg/beast/issues/1108
https://github.com/boostorg/asio/issues/100
https://stackoverflow.com/questions/38198638/openssl-ssl-write-from-multiple-buffers-ssl-writev
https://stackoverflow.com/questions/50026167/performance-drop-on-port-from-beast-1-0-0-b66-to-boost-1-67-0-beast
*/
template<class NextLayer>
class flat_stream : private detail::flat_stream_base
{
// Largest buffer size we will flatten
static std::size_t constexpr max_size = 1024 * 1024;
template<class, class> class write_op;
NextLayer stream_;
public:
/// The type of the next layer.
using next_layer_type =
typename std::remove_reference<NextLayer>::type;
/// The type of the lowest layer.
using lowest_layer_type = boost::beast::get_lowest_layer<next_layer_type>;
/// The type of the executor associated with the object.
using executor_type = typename next_layer_type::executor_type;
~flat_stream() = default;
flat_stream(flat_stream&&) = default;
flat_stream(flat_stream const&) = delete;
flat_stream& operator=(flat_stream&&) = default;
flat_stream& operator=(flat_stream const&) = delete;
/** Constructor
Arguments, if any, are forwarded to the next layer's constructor.
*/
template<class... Args>
explicit
flat_stream(Args&&... args);
//--------------------------------------------------------------------------
/** Get the executor associated with the object.
This function may be used to obtain the executor object that the
stream uses to dispatch handlers for asynchronous operations.
@return A copy of the executor that stream will use to dispatch handlers.
*/
executor_type
get_executor() noexcept
{
return stream_.get_executor();
}
/** Get a reference to the next layer
This function returns a reference to the next layer
in a stack of stream layers.
@return A reference to the next layer in the stack of
stream layers.
*/
next_layer_type&
next_layer()
{
return stream_;
}
/** Get a reference to the next layer
This function returns a reference to the next layer in a
stack of stream layers.
@return A reference to the next layer in the stack of
stream layers.
*/
next_layer_type const&
next_layer() const
{
return stream_;
}
/** Get a reference to the lowest layer
This function returns a reference to the lowest layer
in a stack of stream layers.
@return A reference to the lowest layer in the stack of
stream layers.
*/
lowest_layer_type&
lowest_layer()
{
return stream_.lowest_layer();
}
/** Get a reference to the lowest layer
This function returns a reference to the lowest layer
in a stack of stream layers.
@return A reference to the lowest layer in the stack of
stream layers. Ownership is not transferred to the caller.
*/
lowest_layer_type const&
lowest_layer() const
{
return stream_.lowest_layer();
}
//--------------------------------------------------------------------------
template<class MutableBufferSequence>
std::size_t
read_some(MutableBufferSequence const& buffers);
template<class MutableBufferSequence>
std::size_t
read_some(
MutableBufferSequence const& buffers,
boost::system::error_code& ec);
template<class ConstBufferSequence>
std::size_t
write_some(ConstBufferSequence const& buffers);
template<class ConstBufferSequence>
std::size_t
write_some(
ConstBufferSequence const& buffers,
boost::system::error_code& ec);
template<
class MutableBufferSequence,
class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(boost::system::error_code, std::size_t))
async_read_some(
MutableBufferSequence const& buffers,
ReadHandler&& handler);
template<
class ConstBufferSequence,
class WriteHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
WriteHandler, void(boost::system::error_code, std::size_t))
async_write_some(
ConstBufferSequence const& buffers,
WriteHandler&& handler);
template<class T>
friend
void
teardown(boost::beast::websocket::role_type,
flat_stream<T>& s, boost::system::error_code& ec);
template<class T, class TeardownHandler>
friend
void
async_teardown(boost::beast::websocket::role_type role,
flat_stream<T>& s, TeardownHandler&& handler);
};
//------------------------------------------------------------------------------
template<class NextLayer>
template<class ConstBufferSequence, class Handler>
class flat_stream<NextLayer>::write_op
: public boost::asio::coroutine
{
using alloc_type = typename
boost::asio::associated_allocator_t<Handler>::template
rebind<char>::other;
struct deleter
{
alloc_type alloc;
std::size_t size = 0;
explicit
deleter(alloc_type const& alloc_)
: alloc(alloc_)
{
}
void
operator()(char* p)
{
alloc.deallocate(p, size);
}
};
flat_stream<NextLayer>& s_;
ConstBufferSequence b_;
std::unique_ptr<char, deleter> p_;
Handler h_;
public:
template<class DeducedHandler>
write_op(
flat_stream<NextLayer>& s,
ConstBufferSequence const& b,
DeducedHandler&& h)
: s_(s)
, b_(b)
, p_(nullptr, deleter{
(boost::asio::get_associated_allocator)(h)})
, h_(std::forward<DeducedHandler>(h))
{
}
using allocator_type =
boost::asio::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (boost::asio::get_associated_allocator)(h_);
}
using executor_type = boost::asio::associated_executor_t<
Handler, decltype(std::declval<NextLayer&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return (boost::asio::get_associated_executor)(
h_, s_.get_executor());
}
void
operator()(
boost::system::error_code ec,
std::size_t bytes_transferred)
{
BOOST_ASIO_CORO_REENTER(*this)
{
BOOST_ASIO_CORO_YIELD
{
auto const result = coalesce(b_, coalesce_limit);
if(result.second)
{
p_.get_deleter().size = result.first;
p_.reset(p_.get_deleter().alloc.allocate(
p_.get_deleter().size));
boost::asio::buffer_copy(
boost::asio::buffer(
p_.get(), p_.get_deleter().size),
b_, result.first);
s_.stream_.async_write_some(
boost::asio::buffer(
p_.get(), p_.get_deleter().size),
std::move(*this));
}
else
{
s_.stream_.async_write_some(
boost::beast::buffers_prefix(result.first, b_),
std::move(*this));
}
}
p_.reset();
h_(ec, bytes_transferred);
}
}
friend
bool asio_handler_is_continuation(write_op* op)
{
using boost::asio::asio_handler_is_continuation;
return asio_handler_is_continuation(
std::addressof(op->h_));
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, write_op* op)
{
using boost::asio::asio_handler_invoke;
asio_handler_invoke(f, std::addressof(op->h_));
}
};
//------------------------------------------------------------------------------
template<class NextLayer>
template<class... Args>
flat_stream<NextLayer>::
flat_stream(Args&&... args)
: stream_(std::forward<Args>(args)...)
{
}
template<class NextLayer>
template<class MutableBufferSequence>
std::size_t
flat_stream<NextLayer>::
read_some(MutableBufferSequence const& buffers)
{
static_assert(boost::beast::is_sync_read_stream<next_layer_type>::value,
"SyncReadStream requirements not met");
static_assert(boost::asio::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
boost::system::error_code ec;
auto n = read_some(buffers, ec);
if(ec)
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
return n;
}
template<class NextLayer>
template<class MutableBufferSequence>
std::size_t
flat_stream<NextLayer>::
read_some(MutableBufferSequence const& buffers, boost::system::error_code& ec)
{
return stream_.read_some(buffers, ec);
}
template<class NextLayer>
template<class ConstBufferSequence>
std::size_t
flat_stream<NextLayer>::
write_some(ConstBufferSequence const& buffers)
{
static_assert(boost::beast::is_sync_write_stream<next_layer_type>::value,
"SyncWriteStream requirements not met");
static_assert(boost::asio::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
boost::system::error_code ec;
auto n = write_some(buffers, ec);
if(ec)
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
return n;
}
template<class NextLayer>
template<class ConstBufferSequence>
std::size_t
flat_stream<NextLayer>::
write_some(
ConstBufferSequence const& buffers,
boost::system::error_code& ec)
{
auto const result = coalesce(buffers, coalesce_limit);
if(result.second)
{
std::unique_ptr<char[]> p{new char[result.first]};
auto const b = boost::asio::buffer(p.get(), result.first);
boost::asio::buffer_copy(b, buffers);
return stream_.write_some(b, ec);
}
return stream_.write_some(
boost::beast::buffers_prefix(result.first, buffers), ec);
}
template<class NextLayer>
template<
class MutableBufferSequence,
class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(boost::system::error_code, std::size_t))
flat_stream<NextLayer>::
async_read_some(
MutableBufferSequence const& buffers,
ReadHandler&& handler)
{
static_assert(boost::beast::is_async_read_stream<next_layer_type>::value,
"AsyncReadStream requirements not met");
static_assert(boost::asio::is_mutable_buffer_sequence<
MutableBufferSequence >::value,
"MutableBufferSequence requirements not met");
return stream_.async_read_some(
buffers, std::forward<ReadHandler>(handler));
}
template<class NextLayer>
template<
class ConstBufferSequence,
class WriteHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
WriteHandler, void(boost::system::error_code, std::size_t))
flat_stream<NextLayer>::
async_write_some(
ConstBufferSequence const& buffers,
WriteHandler&& handler)
{
static_assert(boost::beast::is_async_write_stream<next_layer_type>::value,
"AsyncWriteStream requirements not met");
static_assert(boost::asio::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
BOOST_BEAST_HANDLER_INIT(
WriteHandler, void(boost::system::error_code, std::size_t));
write_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(boost::system::error_code, std::size_t))>{
*this, buffers, std::move(init.completion_handler)}({}, 0);
return init.result.get();
}
template<class NextLayer>
void
teardown(
boost::beast::websocket::role_type role,
flat_stream<NextLayer>& s,
boost::system::error_code& ec)
{
using boost::beast::websocket::teardown;
teardown(role, s.stream_, ec);
}
template<class NextLayer, class TeardownHandler>
void
async_teardown(
boost::beast::websocket::role_type role,
flat_stream<NextLayer>& s,
TeardownHandler&& handler)
{
using boost::beast::websocket::async_teardown;
async_teardown(role, s.stream_, std::move(handler));
}
#endif
-329
View File
@@ -1,329 +0,0 @@
//
// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_EXAMPLE_COMMON_SSL_STREAM_HPP
#define BOOST_BEAST_EXAMPLE_COMMON_SSL_STREAM_HPP
// This include is necessary to work with `ssl::stream` and `boost::beast::websocket::stream`
#include <boost/beast/websocket/ssl.hpp>
#include "flat_stream.hpp"
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <cstddef>
#include <memory>
#include <type_traits>
#include <utility>
/** C++11 enabled SSL socket wrapper
This wrapper provides an interface identical to `boost::asio::ssl::stream`,
with the following additional properties:
@li Satisfies @b MoveConstructible
@li Satisfies @b MoveAssignable
@li Constructible from a moved socket.
*/
template<class NextLayer>
class ssl_stream
: public boost::asio::ssl::stream_base
{
using ssl_stream_type = boost::asio::ssl::stream<NextLayer>;
using stream_type = flat_stream<ssl_stream_type>;
std::unique_ptr<stream_type> p_;
boost::asio::ssl::context* ctx_;
public:
/// The native handle type of the SSL stream.
using native_handle_type =
typename ssl_stream_type::native_handle_type;
/// Structure for use with deprecated impl_type.
using impl_struct = typename ssl_stream_type::impl_struct;
/// The type of the next layer.
using next_layer_type = typename ssl_stream_type::next_layer_type;
/// The type of the lowest layer.
using lowest_layer_type = typename ssl_stream_type::lowest_layer_type;
/// The type of the executor associated with the object.
using executor_type = typename stream_type::executor_type;
template<class Arg>
ssl_stream(
Arg&& arg,
boost::asio::ssl::context& ctx)
: p_(new stream_type{
std::forward<Arg>(arg), ctx})
, ctx_(&ctx)
{
}
ssl_stream(ssl_stream&& other)
: p_(std::move(other.p_))
, ctx_(other.ctx_)
{
}
ssl_stream& operator=(ssl_stream&& other)
{
p_ = std::move(other.p_);
ctx_ = other.ctx_;
return *this;
}
executor_type
get_executor() noexcept
{
return p_->get_executor();
}
native_handle_type
native_handle()
{
return p_->next_layer().native_handle();
}
next_layer_type const&
next_layer() const
{
return p_->next_layer().next_layer();
}
next_layer_type&
next_layer()
{
return p_->next_layer().next_layer();
}
lowest_layer_type&
lowest_layer()
{
return p_->lowest_layer();
}
lowest_layer_type const&
lowest_layer() const
{
return p_->lowest_layer();
}
void
set_verify_mode(boost::asio::ssl::verify_mode v)
{
p_->next_layer().set_verify_mode(v);
}
boost::system::error_code
set_verify_mode(boost::asio::ssl::verify_mode v,
boost::system::error_code& ec)
{
return p_->next_layer().set_verify_mode(v, ec);
}
void
set_verify_depth(int depth)
{
p_->next_layer().set_verify_depth(depth);
}
boost::system::error_code
set_verify_depth(
int depth, boost::system::error_code& ec)
{
return p_->next_layer().set_verify_depth(depth, ec);
}
template<class VerifyCallback>
void
set_verify_callback(VerifyCallback callback)
{
p_->next_layer().set_verify_callback(callback);
}
template<class VerifyCallback>
boost::system::error_code
set_verify_callback(VerifyCallback callback,
boost::system::error_code& ec)
{
return p_->next_layer().set_verify_callback(callback, ec);
}
void
handshake(handshake_type type)
{
p_->next_layer().handshake(type);
}
boost::system::error_code
handshake(handshake_type type,
boost::system::error_code& ec)
{
return p_->next_layer().handshake(type, ec);
}
template<class ConstBufferSequence>
void
handshake(
handshake_type type, ConstBufferSequence const& buffers)
{
p_->next_layer().handshake(type, buffers);
}
template<class ConstBufferSequence>
boost::system::error_code
handshake(handshake_type type,
ConstBufferSequence const& buffers,
boost::system::error_code& ec)
{
return p_->next_layer().handshake(type, buffers, ec);
}
template<class HandshakeHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(HandshakeHandler,
void(boost::system::error_code))
async_handshake(handshake_type type,
BOOST_ASIO_MOVE_ARG(HandshakeHandler) handler)
{
return p_->next_layer().async_handshake(type,
BOOST_ASIO_MOVE_CAST(HandshakeHandler)(handler));
}
template<class ConstBufferSequence, class BufferedHandshakeHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(BufferedHandshakeHandler,
void (boost::system::error_code, std::size_t))
async_handshake(handshake_type type, ConstBufferSequence const& buffers,
BOOST_ASIO_MOVE_ARG(BufferedHandshakeHandler) handler)
{
return p_->next_layer().async_handshake(type, buffers,
BOOST_ASIO_MOVE_CAST(BufferedHandshakeHandler)(handler));
}
void
shutdown()
{
p_->next_layer().shutdown();
}
boost::system::error_code
shutdown(boost::system::error_code& ec)
{
return p_->next_layer().shutdown(ec);
}
template<class ShutdownHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(ShutdownHandler,
void (boost::system::error_code))
async_shutdown(BOOST_ASIO_MOVE_ARG(ShutdownHandler) handler)
{
return p_->next_layer().async_shutdown(
BOOST_ASIO_MOVE_CAST(ShutdownHandler)(handler));
}
template<class ConstBufferSequence>
std::size_t
write_some(ConstBufferSequence const& buffers)
{
return p_->write_some(buffers);
}
template<class ConstBufferSequence>
std::size_t
write_some(ConstBufferSequence const& buffers,
boost::system::error_code& ec)
{
return p_->write_some(buffers, ec);
}
template<class ConstBufferSequence, class WriteHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(WriteHandler,
void (boost::system::error_code, std::size_t))
async_write_some(ConstBufferSequence const& buffers,
BOOST_ASIO_MOVE_ARG(WriteHandler) handler)
{
return p_->async_write_some(buffers,
BOOST_ASIO_MOVE_CAST(WriteHandler)(handler));
}
template<class MutableBufferSequence>
std::size_t
read_some(MutableBufferSequence const& buffers)
{
return p_->read_some(buffers);
}
template<class MutableBufferSequence>
std::size_t
read_some(MutableBufferSequence const& buffers,
boost::system::error_code& ec)
{
return p_->read_some(buffers, ec);
}
template<class MutableBufferSequence, class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler,
void(boost::system::error_code, std::size_t))
async_read_some(MutableBufferSequence const& buffers,
BOOST_ASIO_MOVE_ARG(ReadHandler) handler)
{
return p_->async_read_some(buffers,
BOOST_ASIO_MOVE_CAST(ReadHandler)(handler));
}
template<class SyncStream>
friend
void
teardown(boost::beast::websocket::role_type,
ssl_stream<SyncStream>& stream,
boost::system::error_code& ec);
template<class AsyncStream, class TeardownHandler>
friend
void
async_teardown(boost::beast::websocket::role_type,
ssl_stream<AsyncStream>& stream, TeardownHandler&& handler);
};
// These hooks are used to inform boost::beast::websocket::stream on
// how to tear down the connection as part of the WebSocket
// protocol specifications
template<class SyncStream>
inline
void
teardown(
boost::beast::websocket::role_type role,
ssl_stream<SyncStream>& stream,
boost::system::error_code& ec)
{
// Just forward it to the wrapped stream
using boost::beast::websocket::teardown;
teardown(role, *stream.p_, ec);
}
template<class AsyncStream, class TeardownHandler>
inline
void
async_teardown(
boost::beast::websocket::role_type role,
ssl_stream<AsyncStream>& stream,
TeardownHandler&& handler)
{
// Just forward it to the wrapped stream
using boost::beast::websocket::async_teardown;
async_teardown(role,
*stream.p_, std::forward<TeardownHandler>(handler));
}
#endif