beast support default completion & rebind.

buffers_generator uses default_completion.
websocket::stream has a rebinding constructor.
ssl_stream has a rebind_executor member.
basic_stream has rebinding constructor.
This commit is contained in:
Klemens Morgenstern
2022-10-11 18:01:48 +08:00
committed by Klemens Morgenstern
parent d3b82a2fb0
commit 1fc340713c
9 changed files with 112 additions and 43 deletions

View File

@ -233,6 +233,7 @@ public:
using endpoint_type = typename Protocol::endpoint; using endpoint_type = typename Protocol::endpoint;
private: private:
using op_state = basic_op_state<Executor>;
static_assert( static_assert(
net::is_executor<Executor>::value || net::execution::is_executor<Executor>::value, net::is_executor<Executor>::value || net::execution::is_executor<Executor>::value,
"Executor type requirements not met"); "Executor type requirements not met");
@ -247,15 +248,12 @@ private:
op_state read; op_state read;
op_state write; op_state write;
#if 0
net::basic_waitable_timer< net::basic_waitable_timer<
std::chrono::steady_clock, std::chrono::steady_clock,
net::wait_traits< net::wait_traits<
std::chrono::steady_clock>, std::chrono::steady_clock>,
Executor> timer; // rate timer; Executor> timer; // rate timer;
#else
net::steady_timer timer;
#endif
int waiting = 0; int waiting = 0;
impl_type(impl_type&&) = default; impl_type(impl_type&&) = default;
@ -356,6 +354,22 @@ public:
! std::is_constructible<RatePolicy, Arg0>::value>::type> ! std::is_constructible<RatePolicy, Arg0>::value>::type>
explicit explicit
basic_stream(Arg0&& argo, Args&&... args); basic_stream(Arg0&& argo, Args&&... args);
/** Constructor
*
* A constructor that rebinds the executor.
*
* @tparam Executor_ The new executor
* @param other The original socket to be rebound.
*/
template<class Executor_>
explicit
basic_stream(basic_stream<Protocol, Executor_, RatePolicy> && other);
template<typename, typename, typename>
friend class basic_stream;
#endif #endif
/** Constructor /** Constructor

View File

@ -13,6 +13,7 @@
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#include <boost/beast/core/detail/type_traits.hpp> #include <boost/beast/core/detail/type_traits.hpp>
#include <boost/beast/core/error.hpp> #include <boost/beast/core/error.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/asio/async_result.hpp> #include <boost/asio/async_result.hpp>
#include <type_traits> #include <type_traits>
@ -176,21 +177,19 @@ write(
template< template<
class AsyncWriteStream, class AsyncWriteStream,
class BuffersGenerator, class BuffersGenerator,
class CompletionToken BOOST_BEAST_ASYNC_TPARAM2 CompletionToken
= net::default_completion_token_t<executor_type<AsyncWriteStream>>
#if !BOOST_BEAST_DOXYGEN #if !BOOST_BEAST_DOXYGEN
, typename std::enable_if<is_buffers_generator< , typename std::enable_if<is_buffers_generator<
BuffersGenerator>::value>::type* = nullptr BuffersGenerator>::value>::type* = nullptr
#endif #endif
> >
auto BOOST_BEAST_ASYNC_RESULT2(CompletionToken)
async_write( async_write(
AsyncWriteStream& stream, AsyncWriteStream& stream,
BuffersGenerator generator, BuffersGenerator generator,
CompletionToken&& token) -> CompletionToken&& token
typename net::async_result< = net::default_completion_token_t<executor_type<AsyncWriteStream>>{});
typename std::decay<
CompletionToken>::type,
void(error_code, std::size_t)>::return_type;
} // beast } // beast
} // boost } // boost

View File

@ -39,16 +39,21 @@ struct stream_base
std::chrono::steady_clock::time_point; std::chrono::steady_clock::time_point;
using tick_type = std::uint64_t; using tick_type = std::uint64_t;
struct op_state template<typename Executor>
struct basic_op_state
{ {
net::steady_timer timer; // for timing out net::basic_waitable_timer<
std::chrono::steady_clock,
net::wait_traits<
std::chrono::steady_clock>,
Executor> timer; // for timing out
tick_type tick = 0; // counts waits tick_type tick = 0; // counts waits
bool pending = false; // if op is pending bool pending = false; // if op is pending
bool timeout = false; // if timed out bool timeout = false; // if timed out
template<class... Args> template<class... Args>
explicit explicit
op_state(Args&&... args) basic_op_state(Args&&... args)
: timer(std::forward<Args>(args)...) : timer(std::forward<Args>(args)...)
{ {
} }

View File

@ -768,6 +768,14 @@ basic_stream(basic_stream&& other)
// controlling its lifetime. // controlling its lifetime.
} }
template<class Protocol, class Executor, class RatePolicy>
template<class Executor_>
basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(basic_stream<Protocol, Executor_, RatePolicy> && other)
: impl_(boost::make_shared<impl_type>(std::false_type{}, std::move(other.impl_->socket)))
{
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy> template<class Protocol, class Executor, class RatePolicy>

View File

@ -143,18 +143,15 @@ write(SyncWriteStream& stream, BuffersGenerator&& generator)
template< template<
class AsyncWriteStream, class AsyncWriteStream,
class BuffersGenerator, class BuffersGenerator,
class CompletionToken, BOOST_BEAST_ASYNC_TPARAM2 CompletionToken,
typename std::enable_if<is_buffers_generator< typename std::enable_if<is_buffers_generator<
BuffersGenerator>::value>::type* /*= nullptr*/ BuffersGenerator>::value>::type* /*= nullptr*/
> >
auto BOOST_BEAST_ASYNC_RESULT2(CompletionToken)
async_write( async_write(
AsyncWriteStream& stream, AsyncWriteStream& stream,
BuffersGenerator generator, BuffersGenerator generator,
CompletionToken&& token) -> CompletionToken&& token)
typename net::async_result<
typename std::decay<CompletionToken>::type,
void(error_code, std::size_t)>::return_type
{ {
static_assert( static_assert(
beast::is_async_write_stream<AsyncWriteStream>::value, beast::is_async_write_stream<AsyncWriteStream>::value,

View File

@ -92,6 +92,17 @@ public:
/// The type of the executor associated with the object. /// The type of the executor associated with the object.
using executor_type = typename stream_type::executor_type; using executor_type = typename stream_type::executor_type;
/// Rebinds the stream type to another executor.
template<class Executor1>
struct rebind_executor
{
/// The stream type when rebound to the specified executor.
using other = ssl_stream<
typename stream_type::template rebind_executor<Executor1>::other
>;
};
/** Construct a stream. /** Construct a stream.
This constructor creates a stream and initialises the underlying stream This constructor creates a stream and initialises the underlying stream
@ -400,10 +411,10 @@ public:
const boost::system::error_code& error // Result of operation. const boost::system::error_code& error // Result of operation.
); @endcode ); @endcode
*/ */
template<class HandshakeHandler> template<BOOST_BEAST_ASYNC_TPARAM1 HandshakeHandler = net::default_completion_token_t<executor_type>>
BOOST_ASIO_INITFN_RESULT_TYPE(HandshakeHandler, void(boost::system::error_code)) BOOST_ASIO_INITFN_RESULT_TYPE(HandshakeHandler, void(boost::system::error_code))
async_handshake(handshake_type type, async_handshake(handshake_type type,
BOOST_ASIO_MOVE_ARG(HandshakeHandler) handler) BOOST_ASIO_MOVE_ARG(HandshakeHandler) handler = net::default_completion_token_t<executor_type>{})
{ {
return p_->next_layer().async_handshake(type, return p_->next_layer().async_handshake(type,
BOOST_ASIO_MOVE_CAST(HandshakeHandler)(handler)); BOOST_ASIO_MOVE_CAST(HandshakeHandler)(handler));
@ -430,10 +441,12 @@ public:
std::size_t bytes_transferred // Amount of buffers used in handshake. std::size_t bytes_transferred // Amount of buffers used in handshake.
); @endcode ); @endcode
*/ */
template<class ConstBufferSequence, class BufferedHandshakeHandler> template<class ConstBufferSequence,
BOOST_BEAST_ASYNC_TPARAM1 BufferedHandshakeHandler = net::default_completion_token_t<executor_type>>
BOOST_ASIO_INITFN_RESULT_TYPE(BufferedHandshakeHandler, void(boost::system::error_code, std::size_t)) BOOST_ASIO_INITFN_RESULT_TYPE(BufferedHandshakeHandler, void(boost::system::error_code, std::size_t))
async_handshake(handshake_type type, ConstBufferSequence const& buffers, async_handshake(handshake_type type, ConstBufferSequence const& buffers,
BOOST_ASIO_MOVE_ARG(BufferedHandshakeHandler) handler) BOOST_ASIO_MOVE_ARG(BufferedHandshakeHandler) handler
= net::default_completion_token_t<executor_type>{})
{ {
return p_->next_layer().async_handshake(type, buffers, return p_->next_layer().async_handshake(type, buffers,
BOOST_ASIO_MOVE_CAST(BufferedHandshakeHandler)(handler)); BOOST_ASIO_MOVE_CAST(BufferedHandshakeHandler)(handler));
@ -477,9 +490,9 @@ public:
const boost::system::error_code& error // Result of operation. const boost::system::error_code& error // Result of operation.
); @endcode ); @endcode
*/ */
template<class ShutdownHandler> template<BOOST_BEAST_ASYNC_TPARAM1 ShutdownHandler = net::default_completion_token_t<executor_type>>
BOOST_ASIO_INITFN_RESULT_TYPE(ShutdownHandler, void(boost::system::error_code)) BOOST_ASIO_INITFN_RESULT_TYPE(ShutdownHandler, void(boost::system::error_code))
async_shutdown(BOOST_ASIO_MOVE_ARG(ShutdownHandler) handler) async_shutdown(BOOST_ASIO_MOVE_ARG(ShutdownHandler) handler = net::default_completion_token_t<executor_type>{})
{ {
return p_->next_layer().async_shutdown( return p_->next_layer().async_shutdown(
BOOST_ASIO_MOVE_CAST(ShutdownHandler)(handler)); BOOST_ASIO_MOVE_CAST(ShutdownHandler)(handler));
@ -555,10 +568,11 @@ public:
need to ensure that all data is written before the asynchronous operation need to ensure that all data is written before the asynchronous operation
completes. completes.
*/ */
template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler> template<class ConstBufferSequence,
BOOST_BEAST_ASYNC_TPARAM2 WriteHandler = net::default_completion_token_t<executor_type>>
BOOST_ASIO_INITFN_RESULT_TYPE(WriteHandler, void(boost::system::error_code, std::size_t)) BOOST_ASIO_INITFN_RESULT_TYPE(WriteHandler, void(boost::system::error_code, std::size_t))
async_write_some(ConstBufferSequence const& buffers, async_write_some(ConstBufferSequence const& buffers,
BOOST_ASIO_MOVE_ARG(WriteHandler) handler) BOOST_ASIO_MOVE_ARG(WriteHandler) handler= net::default_completion_token_t<executor_type>{})
{ {
return p_->async_write_some(buffers, return p_->async_write_some(buffers,
BOOST_ASIO_MOVE_CAST(WriteHandler)(handler)); BOOST_ASIO_MOVE_CAST(WriteHandler)(handler));
@ -636,10 +650,12 @@ public:
if you need to ensure that the requested amount of data is read before if you need to ensure that the requested amount of data is read before
the asynchronous operation completes. the asynchronous operation completes.
*/ */
template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler> template<class MutableBufferSequence,
BOOST_BEAST_ASYNC_TPARAM2 ReadHandler = net::default_completion_token_t<executor_type>>
BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, void(boost::system::error_code, std::size_t)) BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, void(boost::system::error_code, std::size_t))
async_read_some(MutableBufferSequence const& buffers, async_read_some(MutableBufferSequence const& buffers,
BOOST_ASIO_MOVE_ARG(ReadHandler) handler) BOOST_ASIO_MOVE_ARG(ReadHandler) handler
= net::default_completion_token_t<executor_type>{})
{ {
return p_->async_read_some(buffers, return p_->async_read_some(buffers,
BOOST_ASIO_MOVE_CAST(ReadHandler)(handler)); BOOST_ASIO_MOVE_CAST(ReadHandler)(handler));
@ -657,7 +673,7 @@ public:
ssl_stream<SyncStream>& stream, ssl_stream<SyncStream>& stream,
boost::system::error_code& ec); boost::system::error_code& ec);
template<class AsyncStream, class TeardownHandler> template<class AsyncStream, BOOST_BEAST_ASYNC_TPARAM1 TeardownHandler>
friend friend
void void
async_teardown( async_teardown(
@ -680,12 +696,13 @@ teardown(
teardown(role, *stream.p_, ec); teardown(role, *stream.p_, ec);
} }
template<class AsyncStream, class TeardownHandler> template<class AsyncStream,
BOOST_BEAST_ASYNC_TPARAM1 TeardownHandler = net::default_completion_token_t<beast::executor_type<AsyncStream>>>
void void
async_teardown( async_teardown(
boost::beast::role_type role, boost::beast::role_type role,
ssl_stream<AsyncStream>& stream, ssl_stream<AsyncStream>& stream,
TeardownHandler&& handler) TeardownHandler&& handler = net::default_completion_token_t<beast::executor_type<AsyncStream>>{})
{ {
// Just forward it to the underlying ssl::stream // Just forward it to the underlying ssl::stream
using boost::beast::websocket::async_teardown; using boost::beast::websocket::async_teardown;

View File

@ -58,6 +58,15 @@ stream(Args&&... args)
max_control_frame_size); max_control_frame_size);
} }
template<class NextLayer, bool deflateSupported>
template<class Other>
stream<NextLayer, deflateSupported>::
stream(stream<Other> && other)
: impl_(boost::make_shared<impl_type>(std::move(other.next_layer())))
{
}
template<class NextLayer, bool deflateSupported> template<class NextLayer, bool deflateSupported>
auto auto
stream<NextLayer, deflateSupported>:: stream<NextLayer, deflateSupported>::

View File

@ -162,6 +162,7 @@ class stream
} }
public: public:
/// Indicates if the permessage-deflate extension is supported /// Indicates if the permessage-deflate extension is supported
using is_deflate_supported = using is_deflate_supported =
std::integral_constant<bool, deflateSupported>; std::integral_constant<bool, deflateSupported>;
@ -174,6 +175,16 @@ public:
using executor_type = using executor_type =
beast::executor_type<next_layer_type>; beast::executor_type<next_layer_type>;
/// Rebinds the stream type to another executor.
template<class Executor1>
struct rebind_executor
{
/// The stream type when rebound to the specified executor.
using other = stream<
typename next_layer_type::template rebind_executor<Executor1>::other,
deflateSupported>;
};
/** Destructor /** Destructor
Destroys the stream and all associated resources. Destroys the stream and all associated resources.
@ -211,6 +222,20 @@ public:
explicit explicit
stream(Args&&... args); stream(Args&&... args);
/** Rebinding constructor
*
* This constructor creates a the websocket stream from a
* websocket stream with a different executor.
*
* @throw Any exception thrown by the NextLayer rebind constructor.
*
* @param other The other websocket stream to construct from.
*/
template<class Other>
explicit
stream(stream<Other> && other);
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
/** Get the executor associated with the object. /** Get the executor associated with the object.

View File

@ -23,6 +23,7 @@
#include <boost/beast/http/string_body.hpp> #include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/write.hpp> #include <boost/beast/http/write.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/write.hpp> #include <boost/asio/write.hpp>
@ -551,13 +552,10 @@ public:
// stream destroyed // stream destroyed
test_server srv("", ep, log); test_server srv("", ep, log);
{ {
stream_type s(ioc); auto s = net::detached.as_default_on(stream_type(ioc));
s.socket().connect(srv.local_endpoint()); s.socket().connect(srv.local_endpoint());
s.expires_after(std::chrono::seconds(0)); s.expires_after(std::chrono::seconds(0));
s.async_read_some(mb, s.async_read_some(mb);
[](error_code, std::size_t)
{
});
} }
ioc.run(); ioc.run();
ioc.restart(); ioc.restart();
@ -566,12 +564,9 @@ public:
{ {
// stale timer // stale timer
test_acceptor a; test_acceptor a;
stream_type s(ioc); auto s = net::detached.as_default_on(stream_type(ioc));
s.expires_after(std::chrono::milliseconds(50)); s.expires_after(std::chrono::milliseconds(50));
s.async_read_some(mb, s.async_read_some(mb);
[](error_code, std::size_t)
{
});
std::this_thread::sleep_for( std::this_thread::sleep_for(
std::chrono::milliseconds(100)); std::chrono::milliseconds(100));
ioc.run(); ioc.run();