websocket idle pings

This commit is contained in:
Vinnie Falco
2019-02-19 17:14:34 -08:00
parent 62aa1b9717
commit 28d3b41a43
7 changed files with 260 additions and 184 deletions

View File

@ -1,3 +1,9 @@
Version 217:
* websocket idle pings
--------------------------------------------------------------------------------
Version 216:
* Refactor websocket::stream operations

View File

@ -109,100 +109,96 @@ public:
//------------------------------------------------------------------------------
#if 0
template<
class NextLayer,
bool deflateSupported,
class Handler>
void
async_auto_ping(
stream<NextLayer, deflateSupported>& ws,
Handler&& handler)
// sends the idle ping
template<class NextLayer, bool deflateSupported>
template<class Executor>
class stream<NextLayer, deflateSupported>::idle_ping_op
: public net::coroutine
, public boost::empty_value<Executor>
{
using handler_type =
typename std::decay<Handler>::type;
boost::weak_ptr<impl_type> wp_;
std::unique_ptr<detail::frame_buffer> fb_;
using base_type =
beast::stable_async_op_base<
handler_type, beast::executor_type<
stream<NextLayer, deflateSupported>>>;
public:
static constexpr int id = 4; // for soft_mutex
struct async_op : base_type, net::coroutine
using executor_type = Executor;
executor_type
get_executor() const noexcept
{
boost::weak_ptr<impl_type> impl_;
detail::frame_buffer& fb_;
return this->get();
}
public:
static constexpr int id = 4; // for soft_mutex
async_op(
Handler&& h,
stream<NextLayer, deflateSupported>& ws)
: base_type(std::move(h), ws.get_executor())
, impl_(ws.impl_)
, fb_(beast::allocate_stable<
detail::frame_buffer>(*this))
idle_ping_op(
boost::shared_ptr<impl_type> const& sp,
Executor const& ex)
: boost::empty_value<Executor>(
boost::empty_init_t{}, ex)
, wp_(sp)
, fb_(new detail::frame_buffer)
{
if(! sp->idle_pinging)
{
// Serialize the ping or pong frame
ping_data payload;
ws.template write_ping<
flat_static_buffer_base>(fb_, op, payload);
(*this)({}, 0, false);
// Create the ping frame
ping_data payload; // empty for now
sp->template write_ping<
flat_static_buffer_base>(*fb_,
detail::opcode::ping, payload);
sp->idle_pinging = true;
(*this)({}, 0);
}
void operator()(
error_code ec = {},
std::size_t bytes_transferred = 0,
bool cont = true)
else
{
boost::ignore_unused(bytes_transferred);
auto sp = impl_.lock();
if(! sp)
return;
// if we are already in the middle of sending
// an idle ping, don't bother sending another.
}
}
auto& impl = *ws_.impl_;
BOOST_ASIO_CORO_REENTER(*this)
void operator()(
error_code ec = {},
std::size_t bytes_transferred = 0)
{
boost::ignore_unused(bytes_transferred);
auto sp = wp_.lock();
if(! sp)
return;
auto& impl = *sp;
BOOST_ASIO_CORO_REENTER(*this)
{
// Acquire the write lock
if(! impl.wr_block.try_lock(this))
{
// Acquire the write lock
if(! impl.wr_block.try_lock(this))
{
BOOST_ASIO_CORO_YIELD
impl.op_idle_ping.emplace(std::move(*this));
impl.wr_block.lock(this);
BOOST_ASIO_CORO_YIELD
net::post(std::move(*this));
BOOST_ASSERT(impl.wr_block.is_locked(this));
}
if(impl.check_stop_now(ec))
goto upcall;
// Send ping frame
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, fb_.data(),
beast::detail::bind_continuation(std::move(*this)));
if(impl.check_stop_now(ec))
goto upcall;
upcall:
impl.wr_block.unlock(this);
impl.op_close.maybe_invoke()
|| impl.op_ping.maybe_invoke()
|| impl.op_rd.maybe_invoke()
|| impl.op_wr.maybe_invoke();
if(! cont)
{
BOOST_ASIO_CORO_YIELD
net::post(bind_front_handler(
std::move(*this), ec));
}
this->invoke(ec);
impl.op_idle_ping.emplace(std::move(*this));
impl.wr_block.lock(this);
BOOST_ASIO_CORO_YIELD
net::post(this->get(), std::move(*this));
BOOST_ASSERT(impl.wr_block.is_locked(this));
}
}
};
if(impl.check_stop_now(ec))
goto upcall;
async_op op(ws, std::forward<Handler>(handler));
}
#endif
// Send ping frame
BOOST_ASIO_CORO_YIELD
net::async_write(impl.stream, fb_->data(),
//beast::detail::bind_continuation(std::move(*this)));
std::move(*this));
if(impl.check_stop_now(ec))
goto upcall;
upcall:
BOOST_ASSERT(sp->idle_pinging);
sp->idle_pinging = false;
impl.wr_block.unlock(this);
impl.op_close.maybe_invoke()
|| impl.op_ping.maybe_invoke()
|| impl.op_rd.maybe_invoke()
|| impl.op_wr.maybe_invoke();
}
}
};
//------------------------------------------------------------------------------

View File

@ -91,6 +91,7 @@ struct stream<NextLayer, deflateSupported>::impl_type
saved_handler op_r_rd; // paused read op (async read)
saved_handler op_r_close; // paused close op (async read)
bool idle_pinging = false;
bool secure_prng_ = true;
bool ec_delivered = false;
bool timed_out = false;
@ -335,6 +336,9 @@ struct stream<NextLayer, deflateSupported>::impl_type
case status::handshake:
break;
case status::open:
break;
case status::closing:
//BOOST_ASSERT(status_ == status::open);
break;
@ -445,7 +449,7 @@ private:
using executor_type = Executor;
executor_type
get_executor() const
get_executor() const noexcept
{
return this->get();
}
@ -479,7 +483,7 @@ private:
if( impl.timeout_opt.keep_alive_pings &&
impl.idle_counter < 1)
{
// <- send ping
idle_ping_op<Executor>(sp, get_executor());
++impl.idle_counter;
impl.timer.expires_after(

View File

@ -2603,7 +2603,7 @@ private:
template<class> class close_op;
template<class> class handshake_op;
template<class> class ping_op;
template<class> class auto_ping_op;
template<class> class idle_ping_op;
template<class, class> class read_some_op;
template<class, class> class read_op;
template<class> class response_op;

View File

@ -15,9 +15,6 @@
#include <boost/beast/_experimental/unit_test/suite.hpp>
#include "test.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
namespace boost {
namespace beast {
namespace websocket {

View File

@ -238,96 +238,6 @@ public:
);
}
void
testTimeout()
{
using tcp = net::ip::tcp;
net::io_context ioc;
// success
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
{
stream<test::stream> ws1(ioc);
stream<test::stream> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
// success, timeout enabled
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
{
stream<test::stream> ws1(ioc);
stream<test::stream> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
// timeout
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/",
test::fail_handler(beast::error::timeout));
test::run_for(ioc, std::chrono::seconds(1));
}
{
stream<test::stream> ws1(ioc);
stream<test::stream> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/",
test::fail_handler(beast::error::timeout));
test::run_for(ioc, std::chrono::seconds(1));
}
}
// Compression Extensions for WebSocket
//
// https://tools.ietf.org/html/rfc7692
@ -597,15 +507,117 @@ public:
}
};
void
testAsync()
{
using tcp = net::ip::tcp;
net::io_context ioc;
// success, no timeout
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
{
stream<test::stream> ws1(ioc);
stream<test::stream> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
// success, timeout enabled
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
{
stream<test::stream> ws1(ioc);
stream<test::stream> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/", test::success_handler());
ws2.async_accept(test::success_handler());
test::run_for(ioc, std::chrono::seconds(1));
}
// timeout
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/",
test::fail_handler(beast::error::timeout));
test::run_for(ioc, std::chrono::seconds(1));
}
{
stream<test::stream> ws1(ioc);
stream<test::stream> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.set_option(stream_base::timeout{
std::chrono::milliseconds(50),
stream_base::none(),
false});
ws1.async_handshake("test", "/",
test::fail_handler(beast::error::timeout));
test::run_for(ioc, std::chrono::seconds(1));
}
// abandoned operation
{
{
stream<tcp::socket> ws1(ioc);
ws1.async_handshake("test", "/",
test::fail_handler(
net::error::operation_aborted));
}
test::run(ioc);
}
}
void
run() override
{
testHandshake();
testTimeout();
testExtRead();
testExtWrite();
testExtNegotiate();
testMoveOnly();
testAsync();
}
};

View File

@ -10,24 +10,85 @@
// Test that header file is self-contained.
#include <boost/beast/websocket/stream.hpp>
#include <boost/beast/_experimental/test/stream.hpp>
#include <boost/beast/_experimental/test/tcp.hpp>
#include <boost/beast/_experimental/unit_test/suite.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include "test.hpp"
namespace boost {
namespace beast {
namespace websocket {
class timer_test
: public websocket_test_suite
struct timer_test : unit_test::suite
{
public:
using tcp = boost::asio::ip::tcp;
void
testIdlePing()
{
net::io_context ioc;
// idle ping, no timeout
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.async_accept(test::success_handler());
ws2.async_handshake("test", "/", test::success_handler());
test::run(ioc);
ws2.set_option(stream_base::timeout{
stream_base::none(),
std::chrono::milliseconds(50),
true});
flat_buffer b1;
flat_buffer b2;
bool received = false;
ws1.control_callback(
[&received](frame_type ft, string_view)
{
received = true;
BEAST_EXPECT(ft == frame_type::ping);
});
ws1.async_read(b1, test::fail_handler(
net::error::operation_aborted));
ws2.async_read(b2, test::fail_handler(
net::error::operation_aborted));
test::run_for(ioc, std::chrono::milliseconds(100));
BEAST_EXPECT(received);
}
test::run(ioc);
// idle ping, timeout
{
stream<tcp::socket> ws1(ioc);
stream<tcp::socket> ws2(ioc);
test::connect(ws1.next_layer(), ws2.next_layer());
ws1.async_accept(test::success_handler());
ws2.async_handshake("test", "/", test::success_handler());
test::run(ioc);
ws2.set_option(stream_base::timeout{
stream_base::none(),
std::chrono::milliseconds(50),
true});
flat_buffer b;
ws2.async_read(b,
test::fail_handler(beast::error::timeout));
test::run(ioc);
}
test::run(ioc);
}
void
run() override
{
pass();
testIdlePing();
}
};