Use suggested timeouts in Websocket examples

This commit is contained in:
Vinnie Falco
2019-02-16 14:01:37 -08:00
parent f21358186e
commit fc7b47fc5d
16 changed files with 105 additions and 329 deletions

View File

@ -2,6 +2,7 @@ Version 216:
* Refactor websocket::stream operations
* Add websocket::stream timeouts
* Use suggested timeouts in Websocket examples
--------------------------------------------------------------------------------

View File

@ -53,7 +53,7 @@ in the [source_file example] directory.
These HTTP clients submit a GET request to a server specified on the command
line, and prints the resulting response. The crawl client asynchronously
fetches the document root of the 10,000 top ranked domains, this may be
used to evaluate robustness.
used to evaluate robustness. All asynchronous support provide timeouts.
[table
[[Description] [Source File] [Source File (using SSL)]]
@ -77,7 +77,7 @@ used to evaluate robustness.
These WebSocket clients connect to a
server and send a message, then receive a message and print the response
before disconnecting.
before disconnecting. All asynchronous clients support timeouts.
[table
[[Description] [Source File] [Source File (using SSL)]]
@ -102,7 +102,7 @@ before disconnecting.
[section Servers]
These HTTP servers deliver files from a root directory specified on the
command line.
command line. All asynchronous servers support timeouts.
[table
[[Description] [Source File] [Source File (using SSL)]]
@ -137,7 +137,8 @@ command line.
]]
These WebSocket servers echo back any message received, keeping the
session open until the client disconnects.
session open until the client disconnects. All asynchronous servers
support timeouts.
[table
[[Description] [Source File] [Source File (using SSL)]]
@ -177,20 +178,18 @@ and illustrate the implementation of advanced features.
[
[Advanced]
[[itemized_list
[Timeouts]
[HTTP pipelining]
[Asynchronous timeouts]
[Dual protocols: HTTP and WebSocket]
[WebSocket use idle ping for timeout]
[Clean exit via SIGINT (CTRL+C) or SIGTERM (kill)]
]]
[[example_src example/advanced/server/advanced_server.cpp advanced_server.cpp]]
][
[Advanced, flex (plain + SSL)]
[[itemized_list
[Timeouts]
[HTTP pipelining]
[Asynchronous timeouts]
[Dual protocols: HTTP and WebSocket]
[WebSocket use idle ping for timeout]
[Flexible ports: plain and SSL on the same port]
[Clean exit via SIGINT (CTRL+C) or SIGTERM (kill)]
]]
@ -198,8 +197,9 @@ and illustrate the implementation of advanced features.
][
[Chat Server, multi-threaded]
[[itemized_list
[Multi-user Chat]
[Broadcasting Messages]
[Multi-user Chat Server]
[JavaScript Browser Client]
[Dual protocols: HTTP and WebSocket]
[Clean exit via SIGINT (CTRL+C) or SIGTERM (kill)]
]]

View File

@ -239,35 +239,17 @@ class websocket_session
}
beast::flat_buffer buffer_;
char ping_state_ = 0;
protected:
net::steady_timer timer_;
public:
// Construct the session
explicit
websocket_session(net::io_context& ioc)
: timer_(ioc,
(std::chrono::steady_clock::time_point::max)())
{
}
// Start the asynchronous operation
template<class Body, class Allocator>
void
do_accept(http::request<Body, http::basic_fields<Allocator>> req)
{
// Set the control callback. This will be called
// on every incoming ping, pong, and close frame.
derived().ws().control_callback(
std::bind(
&websocket_session::on_control_callback,
this,
std::placeholders::_1,
std::placeholders::_2));
// VFALCO What about the timer?
// Set suggested timeout settings for the websocket
derived().ws().set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
// Accept the websocket handshake
derived().ws().async_accept(
@ -280,10 +262,6 @@ public:
void
on_accept(beast::error_code ec)
{
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "accept");
@ -291,99 +269,6 @@ public:
do_read();
}
// Called when the timer expires.
void
on_timer(beast::error_code ec)
{
if(ec && ec != net::error::operation_aborted)
return fail(ec, "timer");
// See if the timer really expired since the deadline may have moved.
if(timer_.expiry() <= std::chrono::steady_clock::now())
{
// If this is the first time the timer expired,
// send a ping to see if the other end is there.
if(derived().ws().is_open() && ping_state_ == 0)
{
// Note that we are sending a ping
ping_state_ = 1;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Now send the ping
derived().ws().async_ping({},
beast::bind_front_handler(
&websocket_session::on_ping,
derived().shared_from_this()));
}
else
{
// The timer expired while trying to handshake,
// or we sent a ping and it never completed or
// we never got back a control frame, so close.
derived().do_timeout();
return;
}
}
// Wait on the timer
timer_.async_wait(
net::bind_executor(
derived().ws().get_executor(), // use the strand
beast::bind_front_handler(
&websocket_session::on_timer,
derived().shared_from_this())));
}
// Called to indicate activity from the remote peer
void
activity()
{
// Note that the connection is alive
ping_state_ = 0;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
}
// Called after a ping is sent.
void
on_ping(beast::error_code ec)
{
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "ping");
// Note that the ping was sent.
if(ping_state_ == 1)
{
ping_state_ = 2;
}
else
{
// ping_state_ could have been set to 0
// if an incoming control frame was received
// at exactly the same time we sent a ping.
BOOST_ASSERT(ping_state_ == 0);
}
}
void
on_control_callback(
websocket::frame_type kind,
beast::string_view payload)
{
boost::ignore_unused(kind, payload);
// Note that there is activity
activity();
}
void
do_read()
{
@ -402,10 +287,6 @@ public:
{
boost::ignore_unused(bytes_transferred);
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
// This indicates that the websocket_session was closed
if(ec == websocket::error::closed)
return;
@ -413,9 +294,6 @@ public:
if(ec)
fail(ec, "read");
// Note that there is activity
activity();
// Echo the message
derived().ws().text(derived().ws().got_text());
derived().ws().async_write(
@ -432,10 +310,6 @@ public:
{
boost::ignore_unused(bytes_transferred);
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "write");
@ -461,9 +335,7 @@ public:
explicit
plain_websocket_session(
beast::tcp_stream<net::io_context::strand>&& stream)
: websocket_session<plain_websocket_session>(
stream.get_executor().context())
, ws_(std::move(stream))
: ws_(std::move(stream))
{
}
@ -480,34 +352,10 @@ public:
void
run(http::request<Body, http::basic_fields<Allocator>> req)
{
// Run the timer. The timer is operated
// continuously, this simplifies the code.
on_timer({});
// Accept the WebSocket upgrade request
do_accept(std::move(req));
}
void
do_timeout()
{
// This is so the close can have a timeout
if(close_)
return;
close_ = true;
// VFALCO This doesn't look right...
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Close the WebSocket Connection
ws_.async_close(
websocket::close_code::normal,
beast::bind_front_handler(
&plain_websocket_session::on_close,
shared_from_this()));
}
void
on_close(beast::error_code ec)
{
@ -536,9 +384,7 @@ public:
explicit
ssl_websocket_session(beast::ssl_stream<
beast::tcp_stream<net::io_context::strand>>&& stream)
: websocket_session<ssl_websocket_session>(
stream.get_executor().context())
, ws_(std::move(stream))
: ws_(std::move(stream))
{
}
@ -555,10 +401,6 @@ public:
void
run(http::request<Body, http::basic_fields<Allocator>> req)
{
// Run the timer. The timer is operated
// continuously, this simplifies the code.
on_timer({});
// Accept the WebSocket upgrade request
do_accept(std::move(req));
}
@ -568,9 +410,6 @@ public:
{
eof_ = true;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Perform the SSL shutdown
ws_.next_layer().async_shutdown(
beast::bind_front_handler(
@ -581,29 +420,11 @@ public:
void
on_shutdown(beast::error_code ec)
{
// Happens when the shutdown times out
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "shutdown");
// At this point the connection is closed gracefully
}
void
do_timeout()
{
// If this is true it means we timed out performing the shutdown
if(eof_)
return;
// Start the timer again
timer_.expires_at(
(std::chrono::steady_clock::time_point::max)());
on_timer({});
do_eof();
}
};
template<class Body, class Allocator>

View File

@ -19,7 +19,6 @@
#include <boost/beast/version.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
#include <boost/make_unique.hpp>
#include <boost/config.hpp>
@ -221,7 +220,6 @@ fail(beast::error_code ec, char const* what)
class websocket_session : public std::enable_shared_from_this<websocket_session>
{
websocket::stream<beast::tcp_stream<net::io_context::strand>> ws_;
net::steady_timer timer_;
beast::flat_buffer buffer_;
char ping_state_ = 0;
@ -230,9 +228,11 @@ public:
explicit
websocket_session(tcp::socket socket)
: ws_(std::move(socket))
, timer_(ws_.get_executor().context(),
(std::chrono::steady_clock::time_point::max)())
{
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
}
// Start the asynchronous accept operation
@ -240,22 +240,6 @@ public:
void
do_accept(http::request<Body, http::basic_fields<Allocator>> req)
{
// Set the control callback. This will be called
// on every incoming ping, pong, and close frame.
ws_.control_callback(
std::bind(
&websocket_session::on_control_callback,
this,
std::placeholders::_1,
std::placeholders::_2));
// Run the timer. The timer is operated
// continuously, this simplifies the code.
on_timer({});
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Accept the websocket handshake
ws_.async_accept(
req,
@ -267,10 +251,6 @@ public:
void
on_accept(beast::error_code ec)
{
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "accept");
@ -278,102 +258,6 @@ public:
do_read();
}
// Called when the timer expires.
void
on_timer(beast::error_code ec)
{
if(ec && ec != net::error::operation_aborted)
return fail(ec, "timer");
// See if the timer really expired since the deadline may have moved.
if(timer_.expiry() <= std::chrono::steady_clock::now())
{
// If this is the first time the timer expired,
// send a ping to see if the other end is there.
if(ws_.is_open() && ping_state_ == 0)
{
// Note that we are sending a ping
ping_state_ = 1;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
// Now send the ping
ws_.async_ping({},
beast::bind_front_handler(
&websocket_session::on_ping,
shared_from_this()));
}
else
{
// The timer expired while trying to handshake,
// or we sent a ping and it never completed or
// we never got back a control frame, so close.
// Closing the socket cancels all outstanding operations. They
// will complete with net::error::operation_aborted
beast::get_lowest_layer(ws_).socket().shutdown(tcp::socket::shutdown_both, ec);
beast::get_lowest_layer(ws_).socket().close(ec);
return;
}
}
// Wait on the timer
timer_.async_wait(
net::bind_executor(
ws_.get_executor(), // use the strand
beast::bind_front_handler(
&websocket_session::on_timer,
shared_from_this())));
}
// Called to indicate activity from the remote peer
void
activity()
{
// Note that the connection is alive
ping_state_ = 0;
// Set the timer
timer_.expires_after(std::chrono::seconds(15));
}
// Called after a ping is sent.
void
on_ping(beast::error_code ec)
{
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "ping");
// Note that the ping was sent.
if(ping_state_ == 1)
{
ping_state_ = 2;
}
else
{
// ping_state_ could have been set to 0
// if an incoming control frame was received
// at exactly the same time we sent a ping.
BOOST_ASSERT(ping_state_ == 0);
}
}
void
on_control_callback(
websocket::frame_type kind,
beast::string_view payload)
{
boost::ignore_unused(kind, payload);
// Note that there is activity
activity();
}
void
do_read()
{
@ -392,10 +276,6 @@ public:
{
boost::ignore_unused(bytes_transferred);
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
// This indicates that the websocket_session was closed
if(ec == websocket::error::closed)
return;
@ -403,9 +283,6 @@ public:
if(ec)
fail(ec, "read");
// Note that there is activity
activity();
// Echo the message
ws_.text(ws_.got_text());
ws_.async_write(
@ -422,10 +299,6 @@ public:
{
boost::ignore_unused(bytes_transferred);
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "write");
@ -615,10 +488,6 @@ public:
{
boost::ignore_unused(bytes_transferred);
// Happens when the timer closes the socket
if(ec == net::error::operation_aborted)
return;
if(ec)
return fail(ec, "write");

View File

@ -123,6 +123,15 @@ public:
if(ec)
return fail(ec, "ssl_handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::client));
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(

View File

@ -101,6 +101,15 @@ public:
if(ec)
return fail(ec, "connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::client));
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(

View File

@ -79,6 +79,15 @@ do_session(
if(ec)
return fail(ec, "ssl_handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws).expires_never();
// Set suggested timeout settings for the websocket
ws.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::client));
// Perform the websocket handshake
ws.async_handshake(host, "/", yield[ec]);
if(ec)

View File

@ -65,6 +65,15 @@ do_session(
if(ec)
return fail(ec, "connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws).expires_never();
// Set suggested timeout settings for the websocket
ws.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::client));
// Perform the websocket handshake
ws.async_handshake(host, "/", yield[ec]);
if(ec)

View File

@ -80,6 +80,15 @@ public:
if(ec)
return fail(ec, "handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(

View File

@ -52,6 +52,10 @@ public:
session(tcp::socket socket)
: ws_(std::move(socket))
{
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
}
// Start the asynchronous operation

View File

@ -17,6 +17,10 @@ websocket_session(
: ws_(std::move(socket))
, state_(state)
{
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
}
websocket_session::

View File

@ -66,6 +66,15 @@ do_session(
if(ec)
return fail(ec, "handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws).expires_never();
// Set suggested timeout settings for the websocket
ws.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
// Accept the websocket handshake
ws.async_accept(yield[ec]);
if(ec)

View File

@ -51,6 +51,11 @@ do_session(ws_type& ws, net::yield_context yield)
{
beast::error_code ec;
// Set suggested timeout settings for the websocket
ws.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
// Accept the websocket handshake
ws.async_accept(yield[ec]);
if(ec)

View File

@ -79,6 +79,9 @@ public:
reenter(*this)
{
// Set the timeout.
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
yield ws_.next_layer().async_handshake(
ssl::stream_base::server,
@ -90,6 +93,15 @@ public:
if(ec)
return fail(ec, "handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
// Accept the websocket handshake
yield ws_.async_accept(
std::bind(

View File

@ -74,6 +74,11 @@ public:
boost::ignore_unused(bytes_transferred);
reenter(*this)
{
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::suggested_settings(
websocket::role_type::server));
// Accept the websocket handshake
yield ws_.async_accept(
std::bind(

View File

@ -118,6 +118,7 @@ struct stream_base
opt.keep_alive_pings = true;
break;
}
return opt;
}
protected: