Add flat_stream:

flat_stream, previously in _experimental, is now a public API.
This commit is contained in:
Vinnie Falco
2019-02-07 11:26:35 -08:00
parent 3896f9aa9c
commit 27a6f57200
13 changed files with 136 additions and 95 deletions

View File

@@ -3,6 +3,7 @@ Version 211:
* close_socket is in stream_traits.hpp
* Improvements to test::stream
* Add stranded_stream
* Add flat_stream
--------------------------------------------------------------------------------

View File

@@ -7,6 +7,8 @@
Official repository: https://github.com/boostorg/beast
]
[section Stream Types]
A __Stream__ is a communication channel where data is reliably transferred as
@@ -111,4 +113,30 @@ synchronous stream may check its argument:
[snippet_core_3]
[heading Stream Interfaces]
To facilitiate stream algorithms, these types provide enhanced functionality
above what is offered by networking:
[table Stream Interfaces
[[Name][Description]]
[[
[link beast.ref.boost__beast__basic_timeout_stream `basic_timeout_stream`]
[link beast.ref.boost__beast__timeout_stream `timeout_stream`]
][
Objects of this type are designed to act as a replacement for the
traditional networking socket. They
]]
[[
[link beast.ref.boost__beast__flat_stream `flat_stream`]
][
This stream wrapper improves performance by working around a limitation
of networking's `ssl::stream` to improve performance.
]]
]
[endsect]

View File

@@ -33,6 +33,7 @@
<member><link linkend="beast.ref.boost__beast__flat_buffer">flat_buffer</link></member>
<member><link linkend="beast.ref.boost__beast__flat_static_buffer">flat_static_buffer</link></member>
<member><link linkend="beast.ref.boost__beast__flat_static_buffer_base">flat_static_buffer_base</link></member>
<member><link linkend="beast.ref.boost__beast__flat_stream">flat_stream</link>&nbsp;<emphasis role="green">&#128946;</emphasis></member>
<member><link linkend="beast.ref.boost__beast__handler_ptr">handler_ptr</link></member>
<member><link linkend="beast.ref.boost__beast__iequal">iequal</link></member>
<member><link linkend="beast.ref.boost__beast__iless">iless</link></member>
@@ -354,7 +355,6 @@
<entry valign="top">
<bridgehead renderas="sect3">Classes</bridgehead>
<simplelist type="vert" columns="1">
<member><link linkend="beast.ref.boost__beast__flat_stream">flat_stream</link></member>
<member><link linkend="beast.ref.boost__beast__ssl_stream">ssl_stream</link></member>
<member><link linkend="beast.ref.boost__beast__http__icy_stream">http::icy_stream</link></member>
<member><link linkend="beast.ref.boost__beast__test__fail_count">test::fail_count</link></member>

View File

@@ -15,7 +15,7 @@
// This include is necessary to work with `ssl::stream` and `boost::beast::websocket::stream`
#include <boost/beast/websocket/ssl.hpp>
#include <boost/beast/_experimental/core/flat_stream.hpp>
#include <boost/beast/core/flat_stream.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <cstddef>
#include <memory>

View File

@@ -32,6 +32,7 @@
#include <boost/beast/core/file_win32.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/flat_stream.hpp>
#include <boost/beast/core/handler_ptr.hpp>
#include <boost/beast/core/make_printable.hpp>
#include <boost/beast/core/multi_buffer.hpp>

View File

@@ -12,7 +12,9 @@
#include <boost/beast/core/detail/config.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/_experimental/core/detail/flat_stream.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/core/detail/flat_stream.hpp>
#include <boost/asio/async_result.hpp>
#include <cstdlib>
#include <utility>
@@ -73,12 +75,12 @@ namespace beast {
`net::ssl::stream`.
@par Concepts
@b AsyncStream
@b SyncStream
@li SyncStream
@li AsyncStream
@see
@li https://github.com/boostorg/beast/issues/1108
@li https://github.com/boostorg/asio/issues/100
@li https://github.com/boostorg/beast/issues/1108
@li https://stackoverflow.com/questions/38198638/openssl-ssl-write-from-multiple-buffers-ssl-writev
@li https://stackoverflow.com/questions/50026167/performance-drop-on-port-from-beast-1-0-0-b66-to-boost-1-67-0-beast
*/
@@ -92,9 +94,22 @@ class flat_stream
// 16KB is the upper limit on reasonably sized HTTP messages.
static std::size_t constexpr max_size = 16 * 1024;
template<class, class> class write_op;
// Largest stack we will use to flatten
static std::size_t constexpr max_stack = 16 * 1024;
template<class> class write_op;
NextLayer stream_;
flat_buffer buffer_;
BOOST_STATIC_ASSERT(has_get_executor<NextLayer>::value);
template<class ConstBufferSequence>
std::size_t
stack_write_some(
std::size_t size,
ConstBufferSequence const& buffers,
error_code& ec);
public:
/// The type of the next layer.
@@ -102,7 +117,7 @@ public:
typename std::remove_reference<NextLayer>::type;
/// The type of the executor associated with the object.
using executor_type = typename next_layer_type::executor_type;
using executor_type = beast::executor_type<next_layer_type>;
flat_stream(flat_stream&&) = default;
flat_stream(flat_stream const&) = default;
@@ -322,6 +337,6 @@ public:
} // beast
} // boost
#include <boost/beast/_experimental/core/impl/flat_stream.hpp>
#include <boost/beast/core/impl/flat_stream.hpp>
#endif

View File

@@ -12,6 +12,7 @@
#include <boost/beast/core/async_op_base.hpp>
#include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/static_buffer.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/websocket/teardown.hpp>
#include <boost/asio/buffer.hpp>
@@ -22,39 +23,16 @@ namespace boost {
namespace beast {
template<class NextLayer>
template<class ConstBufferSequence, class Handler>
template<class Handler>
class flat_stream<NextLayer>::write_op
: public async_op_base<Handler,
beast::executor_type<flat_stream>>
, public net::coroutine
{
using alloc_type = std::allocator<char>;
struct deleter
{
alloc_type alloc;
std::size_t size = 0;
explicit
deleter(alloc_type const& alloc_)
: alloc(alloc_)
{
}
void
operator()(char* p)
{
alloc.deallocate(p, size);
}
};
flat_stream<NextLayer>& s_;
ConstBufferSequence b_;
std::unique_ptr<char, deleter> p_;
public:
template<class Handler_>
template<
class ConstBufferSequence,
class Handler_>
write_op(
flat_stream<NextLayer>& s,
ConstBufferSequence const& b,
@@ -63,11 +41,26 @@ public:
beast::executor_type<flat_stream>>(
std::forward<Handler_>(h),
s.get_executor())
, s_(s)
, b_(b)
, p_(nullptr, deleter{alloc_type{}})
{
(*this)({}, 0);
auto const result =
coalesce(b, coalesce_limit);
if(result.needs_coalescing)
{
s.buffer_.clear();
s.buffer_.commit(net::buffer_copy(
s.buffer_.prepare(result.size),
b, result.size));
s.stream_.async_write_some(
s.buffer_.data(), std::move(*this));
}
else
{
s.buffer_.clear();
s.buffer_.shrink_to_fit();
s.stream_.async_write_some(
beast::buffers_prefix(
result.size, b), std::move(*this));
}
}
void
@@ -75,36 +68,7 @@ public:
boost::system::error_code ec,
std::size_t bytes_transferred)
{
BOOST_ASIO_CORO_REENTER(*this)
{
BOOST_ASIO_CORO_YIELD
{
auto const result =
coalesce(b_, coalesce_limit);
if(result.needs_coalescing)
{
p_.get_deleter().size = result.size;
p_.reset(p_.get_deleter().alloc.allocate(
p_.get_deleter().size));
net::buffer_copy(
net::buffer(
p_.get(), p_.get_deleter().size),
b_, result.size);
s_.stream_.async_write_some(
net::buffer(
p_.get(), p_.get_deleter().size),
std::move(*this));
}
else
{
s_.stream_.async_write_some(
boost::beast::buffers_prefix(
result.size, b_), std::move(*this));
}
}
p_.reset();
this->invoke(ec, bytes_transferred);
}
this->invoke(ec, bytes_transferred);
}
};
@@ -128,7 +92,7 @@ read_some(MutableBufferSequence const& buffers)
"SyncReadStream requirements not met");
static_assert(net::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
"MutableBufferSequence requirements not met");
error_code ec;
auto n = read_some(buffers, ec);
if(ec)
@@ -142,6 +106,11 @@ std::size_t
flat_stream<NextLayer>::
read_some(MutableBufferSequence const& buffers, error_code& ec)
{
static_assert(boost::beast::is_sync_read_stream<next_layer_type>::value,
"SyncReadStream requirements not met");
static_assert(net::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
return stream_.read_some(buffers, ec);
}
@@ -159,7 +128,7 @@ async_read_some(
static_assert(boost::beast::is_async_read_stream<next_layer_type>::value,
"AsyncReadStream requirements not met");
static_assert(net::is_mutable_buffer_sequence<
MutableBufferSequence >::value,
MutableBufferSequence >::value,
"MutableBufferSequence requirements not met");
return stream_.async_read_some(
buffers, std::forward<ReadHandler>(handler));
@@ -176,16 +145,26 @@ write_some(ConstBufferSequence const& buffers)
static_assert(net::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
auto const result = coalesce(buffers, coalesce_limit);
if(result.needs_coalescing)
{
std::unique_ptr<char[]> p{new char[result.size]};
auto const b = net::buffer(p.get(), result.size);
net::buffer_copy(b, buffers);
return stream_.write_some(b);
}
return stream_.write_some(
boost::beast::buffers_prefix(result.size, buffers));
error_code ec;
auto n = write_some(buffers, ec);
if(ec)
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
return n;
}
template<class NextLayer>
template<class ConstBufferSequence>
std::size_t
flat_stream<NextLayer>::
stack_write_some(
std::size_t size,
ConstBufferSequence const& buffers,
error_code& ec)
{
static_buffer<max_stack> b;
b.commit(net::buffer_copy(
b.prepare(size), buffers));
return stream_.write_some(b.data(), ec);
}
template<class NextLayer>
@@ -198,15 +177,21 @@ write_some(ConstBufferSequence const& buffers, error_code& ec)
"SyncWriteStream requirements not met");
static_assert(net::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
"ConstBufferSequence requirements not met");
auto const result = coalesce(buffers, coalesce_limit);
if(result.needs_coalescing)
{
std::unique_ptr<char[]> p{new char[result.size]};
auto const b = net::buffer(p.get(), result.size);
net::buffer_copy(b, buffers);
return stream_.write_some(b, ec);
if(result.size > max_stack)
return stack_write_some(result.size, buffers, ec);
buffer_.clear();
buffer_.commit(net::buffer_copy(
buffer_.prepare(result.size),
buffers));
return stream_.write_some(buffer_.data(), ec);
}
buffer_.clear();
buffer_.shrink_to_fit();
return stream_.write_some(
boost::beast::buffers_prefix(result.size, buffers), ec);
}
@@ -225,13 +210,13 @@ async_write_some(
static_assert(boost::beast::is_async_write_stream<next_layer_type>::value,
"AsyncWriteStream requirements not met");
static_assert(net::is_const_buffer_sequence<
ConstBufferSequence>::value,
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
BOOST_BEAST_HANDLER_INIT(
WriteHandler, void(error_code, std::size_t));
write_op<ConstBufferSequence, BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code, std::size_t))>{
*this, buffers, std::move(init.completion_handler)};
write_op<BOOST_ASIO_HANDLER_TYPE(
WriteHandler, void(error_code, std::size_t))>(
*this, buffers, std::move(init.completion_handler));
return init.result.get();
}

View File

@@ -51,6 +51,7 @@ add_executable (tests-beast-core
file_win32.cpp
flat_buffer.cpp
flat_static_buffer.cpp
flat_stream.cpp
handler_ptr.cpp
make_printable.cpp
multi_buffer.cpp

View File

@@ -39,6 +39,7 @@ local SOURCES =
file_win32.cpp
flat_buffer.cpp
flat_static_buffer.cpp
flat_stream.cpp
handler_ptr.cpp
make_printable.cpp
multi_buffer.cpp

View File

@@ -8,11 +8,14 @@
//
// Test that header file is self-contained.
#include <boost/beast/_experimental/core/flat_stream.hpp>
#include <boost/beast/core/flat_stream.hpp>
#include "stream_tests.hpp"
#include <boost/beast/test/websocket.hpp>
#include <boost/beast/test/yield_to.hpp>
#include <boost/beast/_experimental/unit_test/suite.hpp>
#include <boost/beast/_experimental/test/stream.hpp>
#include <initializer_list>
#include <vector>
@@ -24,6 +27,13 @@ class flat_stream_test
, public test::enable_yield_to
{
public:
void
testStream()
{
test_sync_stream<flat_stream<test::stream>>();
test_async_stream<flat_stream<test::stream>>();
}
void
testSplit()
{
@@ -81,7 +91,7 @@ public:
test::ws_echo_server es{log};
net::io_context ioc;
websocket::stream<flat_stream<test::stream>> ws{ioc};
ws.next_layer().next_layer().connect(es.stream());
get_lowest_layer(ws).connect(es.stream());
ws.async_handshake("localhost", "/",
[&](error_code)
{
@@ -97,6 +107,7 @@ public:
void
run() override
{
testStream();
testSplit();
testHttp();
testWebsocket();

View File

@@ -17,7 +17,6 @@ add_executable (tests-beast-experimental
${TEST_MAIN}
Jamfile
error.cpp
flat_stream.cpp
icy_stream.cpp
ssl_stream.cpp
stream.cpp

View File

@@ -9,7 +9,6 @@
local SOURCES =
error.cpp
flat_stream.cpp
icy_stream.cpp
ssl_stream.cpp
stream.cpp