Send idle pings in advanced servers:

fix #899

The timer logic for the advanced-server and advanced-server-flex
examples is refactored to use idle pings when the connection has
not seen activity for some period of time. This demonstrates the
use of the stream's control_callback interface.
This commit is contained in:
Vinnie Falco
2017-11-27 16:44:59 -08:00
parent 8181851719
commit 7ac24d77be
3 changed files with 226 additions and 56 deletions

View File

@@ -6,6 +6,7 @@ Version 147:
WebSocket:
* control callback is copied or moved
* Send idle pings in advanced servers
--------------------------------------------------------------------------------

View File

@@ -234,6 +234,7 @@ class websocket_session
}
boost::beast::multi_buffer buffer_;
char ping_state_ = 0;
protected:
boost::asio::strand<
@@ -255,6 +256,15 @@ public:
void
do_accept(http::request<Body, http::basic_fields<Allocator>> req)
{
// Set the control callback. This will be called
// on every incoming ping, pong, and close frame.
derived().ws().control_callback(
std::bind(
&websocket_session::on_control_callback,
this,
std::placeholders::_1,
std::placeholders::_2));
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
@@ -269,27 +279,6 @@ public:
std::placeholders::_1)));
}
// Called when the timer expires.
void
on_timer(boost::system::error_code ec)
{
if(ec && ec != boost::asio::error::operation_aborted)
return fail(ec, "timer");
// Verify that the timer really expired since the deadline may have moved.
if(timer_.expiry() <= std::chrono::steady_clock::now())
derived().do_timeout();
// Wait on the timer
timer_.async_wait(
boost::asio::bind_executor(
strand_,
std::bind(
&websocket_session::on_timer,
derived().shared_from_this(),
std::placeholders::_1)));
}
void
on_accept(boost::system::error_code ec)
{
@@ -304,12 +293,106 @@ public:
do_read();
}
// Called when the timer expires.
void
on_timer(boost::system::error_code ec)
{
if(ec && ec != boost::asio::error::operation_aborted)
return fail(ec, "timer");
// See if the timer really expired since the deadline may have moved.
if(timer_.expiry() <= std::chrono::steady_clock::now())
{
// If this is the first time the timer expired,
// send a ping to see if the other end is there.
if(derived().ws().is_open() && ping_state_ == 0)
{
// Note that we are sending a ping
ping_state_ = 1;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Now send the ping
derived().ws().async_ping({},
boost::asio::bind_executor(
strand_,
std::bind(
&websocket_session::on_ping,
derived().shared_from_this(),
std::placeholders::_1)));
}
else
{
// The timer expired while trying to handshake,
// or we sent a ping and it never completed or
// we never got back a control frame, so close.
derived().do_timeout();
return;
}
}
// Wait on the timer
timer_.async_wait(
boost::asio::bind_executor(
strand_,
std::bind(
&websocket_session::on_timer,
derived().shared_from_this(),
std::placeholders::_1)));
}
// Called to indicate activity from the remote peer
void
activity()
{
// Note that the connection is alive
ping_state_ = 0;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
}
// Called after a ping is sent.
void
on_ping(boost::system::error_code ec)
{
// Happens when the timer closes the socket
if(ec == boost::asio::error::operation_aborted)
return;
if(ec)
return fail(ec, "ping");
// Note that the ping was sent.
if(ping_state_ == 1)
{
ping_state_ = 2;
}
else
{
// ping_state_ could have been set to 0
// if an incoming control frame was received
// at exactly the same time we sent a ping.
BOOST_ASSERT(ping_state_ == 0);
}
}
void
on_control_callback(
websocket::frame_type kind,
boost::beast::string_view payload)
{
boost::ignore_unused(kind, payload);
// Note that there is activity
activity();
}
void
do_read()
{
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Read a message into our buffer
derived().ws().async_read(
buffer_,
@@ -340,6 +423,9 @@ public:
if(ec)
fail(ec, "read");
// Note that there is activity
activity();
// Echo the message
derived().ws().text(derived().ws().got_text());
derived().ws().async_write(

View File

@@ -219,6 +219,7 @@ class websocket_session : public std::enable_shared_from_this<websocket_session>
boost::asio::io_context::executor_type> strand_;
boost::asio::steady_timer timer_;
boost::beast::multi_buffer buffer_;
char ping_state_ = 0;
public:
// Take ownership of the socket
@@ -234,8 +235,17 @@ public:
// Start the asynchronous operation
template<class Body, class Allocator>
void
run(http::request<Body, http::basic_fields<Allocator>> req)
do_accept(http::request<Body, http::basic_fields<Allocator>> req)
{
// Set the control callback. This will be called
// on every incoming ping, pong, and close frame.
ws_.control_callback(
std::bind(
&websocket_session::on_control_callback,
this,
std::placeholders::_1,
std::placeholders::_2));
// Run the timer. The timer is operated
// continuously, this simplifies the code.
on_timer({});
@@ -254,33 +264,6 @@ public:
std::placeholders::_1)));
}
// Called when the timer expires.
void
on_timer(boost::system::error_code ec)
{
if(ec && ec != boost::asio::error::operation_aborted)
return fail(ec, "timer");
// Verify that the timer really expired since the deadline may have moved.
if(timer_.expiry() <= std::chrono::steady_clock::now())
{
// Closing the socket cancels all outstanding operations. They
// will complete with boost::asio::error::operation_aborted
ws_.next_layer().shutdown(tcp::socket::shutdown_both, ec);
ws_.next_layer().close(ec);
return;
}
// Wait on the timer
timer_.async_wait(
boost::asio::bind_executor(
strand_,
std::bind(
&websocket_session::on_timer,
shared_from_this(),
std::placeholders::_1)));
}
void
on_accept(boost::system::error_code ec)
{
@@ -295,12 +278,109 @@ public:
do_read();
}
// Called when the timer expires.
void
on_timer(boost::system::error_code ec)
{
if(ec && ec != boost::asio::error::operation_aborted)
return fail(ec, "timer");
// See if the timer really expired since the deadline may have moved.
if(timer_.expiry() <= std::chrono::steady_clock::now())
{
// If this is the first time the timer expired,
// send a ping to see if the other end is there.
if(ws_.is_open() && ping_state_ == 0)
{
// Note that we are sending a ping
ping_state_ = 1;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Now send the ping
ws_.async_ping({},
boost::asio::bind_executor(
strand_,
std::bind(
&websocket_session::on_ping,
shared_from_this(),
std::placeholders::_1)));
}
else
{
// The timer expired while trying to handshake,
// or we sent a ping and it never completed or
// we never got back a control frame, so close.
// Closing the socket cancels all outstanding operations. They
// will complete with boost::asio::error::operation_aborted
ws_.next_layer().shutdown(tcp::socket::shutdown_both, ec);
ws_.next_layer().close(ec);
return;
}
}
// Wait on the timer
timer_.async_wait(
boost::asio::bind_executor(
strand_,
std::bind(
&websocket_session::on_timer,
shared_from_this(),
std::placeholders::_1)));
}
// Called to indicate activity from the remote peer
void
activity()
{
// Note that the connection is alive
ping_state_ = 0;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
}
// Called after a ping is sent.
void
on_ping(boost::system::error_code ec)
{
// Happens when the timer closes the socket
if(ec == boost::asio::error::operation_aborted)
return;
if(ec)
return fail(ec, "ping");
// Note that the ping was sent.
if(ping_state_ == 1)
{
ping_state_ = 2;
}
else
{
// ping_state_ could have been set to 0
// if an incoming control frame was received
// at exactly the same time we sent a ping.
BOOST_ASSERT(ping_state_ == 0);
}
}
void
on_control_callback(
websocket::frame_type kind,
boost::beast::string_view payload)
{
boost::ignore_unused(kind, payload);
// Note that there is activity
activity();
}
void
do_read()
{
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Read a message into our buffer
ws_.async_read(
buffer_,
@@ -331,6 +411,9 @@ public:
if(ec)
fail(ec, "read");
// Note that there is activity
activity();
// Echo the message
ws_.text(ws_.got_text());
ws_.async_write(
@@ -558,7 +641,7 @@ public:
{
// Create a WebSocket websocket_session by transferring the socket
std::make_shared<websocket_session>(
std::move(socket_))->run(std::move(req_));
std::move(socket_))->do_accept(std::move(req_));
return;
}