Use tcp_stream in WebSocket server examples

This commit is contained in:
Vinnie Falco
2019-02-09 22:30:54 -08:00
parent 920909673a
commit 6ee1a88292
15 changed files with 215 additions and 283 deletions

View File

@ -5,6 +5,7 @@ Version 213:
* Use timeouts in HTTP server examples * Use timeouts in HTTP server examples
* Use timeouts in HTTP client examples * Use timeouts in HTTP client examples
* Use tcp_stream in WebSocket client examples * Use tcp_stream in WebSocket client examples
* Use tcp_stream in WebSocket server examples
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------

View File

@ -18,10 +18,8 @@
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp> #include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/bind_executor.hpp> #include <boost/beast/_experimental/core/ssl_stream.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
@ -50,18 +48,14 @@ fail(beast::error_code ec, char const* what)
// Echoes back all received WebSocket messages // Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session> class session : public std::enable_shared_from_this<session>
{ {
tcp::socket socket_; websocket::stream<beast::ssl_stream<
websocket::stream<ssl::stream<tcp::socket&>> ws_; beast::tcp_stream<net::io_context::strand>>> ws_;
net::strand<
net::io_context::executor_type> strand_;
beast::multi_buffer buffer_; beast::multi_buffer buffer_;
public: public:
// Take ownership of the socket // Take ownership of the socket
session(tcp::socket socket, ssl::context& ctx) session(tcp::socket socket, ssl::context& ctx)
: socket_(std::move(socket)) : ws_(std::move(socket), ctx)
, ws_(socket_, ctx)
, strand_(ws_.get_executor())
{ {
} }
@ -69,15 +63,16 @@ public:
void void
run() run()
{ {
// Set the timeout.
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake // Perform the SSL handshake
ws_.next_layer().async_handshake( ws_.next_layer().async_handshake(
ssl::stream_base::server, ssl::stream_base::server,
net::bind_executor( std::bind(
strand_, &session::on_handshake,
std::bind( shared_from_this(),
&session::on_handshake, std::placeholders::_1));
shared_from_this(),
std::placeholders::_1)));
} }
void void
@ -88,12 +83,10 @@ public:
// Accept the websocket handshake // Accept the websocket handshake
ws_.async_accept( ws_.async_accept(
net::bind_executor( std::bind(
strand_, &session::on_accept,
std::bind( shared_from_this(),
&session::on_accept, std::placeholders::_1));
shared_from_this(),
std::placeholders::_1)));
} }
void void
@ -112,13 +105,11 @@ public:
// Read a message into our buffer // Read a message into our buffer
ws_.async_read( ws_.async_read(
buffer_, buffer_,
net::bind_executor( std::bind(
strand_, &session::on_read,
std::bind( shared_from_this(),
&session::on_read, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
} }
void void
@ -139,13 +130,11 @@ public:
ws_.text(ws_.got_text()); ws_.text(ws_.got_text());
ws_.async_write( ws_.async_write(
buffer_.data(), buffer_.data(),
net::bind_executor( std::bind(
strand_, &session::on_write,
std::bind( shared_from_this(),
&session::on_write, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
} }
void void
@ -173,7 +162,6 @@ class listener : public std::enable_shared_from_this<listener>
{ {
ssl::context& ctx_; ssl::context& ctx_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
tcp::socket socket_;
public: public:
listener( listener(
@ -182,7 +170,6 @@ public:
tcp::endpoint endpoint) tcp::endpoint endpoint)
: ctx_(ctx) : ctx_(ctx)
, acceptor_(ioc) , acceptor_(ioc)
, socket_(ioc)
{ {
beast::error_code ec; beast::error_code ec;
@ -233,15 +220,15 @@ public:
do_accept() do_accept()
{ {
acceptor_.async_accept( acceptor_.async_accept(
socket_,
std::bind( std::bind(
&listener::on_accept, &listener::on_accept,
shared_from_this(), shared_from_this(),
std::placeholders::_1)); std::placeholders::_1,
std::placeholders::_2));
} }
void void
on_accept(beast::error_code ec) on_accept(beast::error_code ec, tcp::socket socket)
{ {
if(ec) if(ec)
{ {
@ -250,7 +237,7 @@ public:
else else
{ {
// Create the session and run it // Create the session and run it
std::make_shared<session>(std::move(socket_), ctx_)->run(); std::make_shared<session>(std::move(socket), ctx_)->run();
} }
// Accept another connection // Accept another connection

View File

@ -15,9 +15,7 @@
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
@ -45,9 +43,7 @@ fail(beast::error_code ec, char const* what)
// Echoes back all received WebSocket messages // Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session> class session : public std::enable_shared_from_this<session>
{ {
websocket::stream<tcp::socket> ws_; websocket::stream<beast::tcp_stream<net::io_context::strand>> ws_;
net::strand<
net::io_context::executor_type> strand_;
beast::multi_buffer buffer_; beast::multi_buffer buffer_;
public: public:
@ -55,7 +51,6 @@ public:
explicit explicit
session(tcp::socket socket) session(tcp::socket socket)
: ws_(std::move(socket)) : ws_(std::move(socket))
, strand_(ws_.get_executor())
{ {
} }
@ -65,12 +60,10 @@ public:
{ {
// Accept the websocket handshake // Accept the websocket handshake
ws_.async_accept( ws_.async_accept(
net::bind_executor( std::bind(
strand_, &session::on_accept,
std::bind( shared_from_this(),
&session::on_accept, std::placeholders::_1));
shared_from_this(),
std::placeholders::_1)));
} }
void void
@ -89,13 +82,11 @@ public:
// Read a message into our buffer // Read a message into our buffer
ws_.async_read( ws_.async_read(
buffer_, buffer_,
net::bind_executor( std::bind(
strand_, &session::on_read,
std::bind( shared_from_this(),
&session::on_read, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
} }
void void
@ -116,13 +107,11 @@ public:
ws_.text(ws_.got_text()); ws_.text(ws_.got_text());
ws_.async_write( ws_.async_write(
buffer_.data(), buffer_.data(),
net::bind_executor( std::bind(
strand_, &session::on_write,
std::bind( shared_from_this(),
&session::on_write, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
} }
void void
@ -149,14 +138,12 @@ public:
class listener : public std::enable_shared_from_this<listener> class listener : public std::enable_shared_from_this<listener>
{ {
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
tcp::socket socket_;
public: public:
listener( listener(
net::io_context& ioc, net::io_context& ioc,
tcp::endpoint endpoint) tcp::endpoint endpoint)
: acceptor_(ioc) : acceptor_(ioc)
, socket_(ioc)
{ {
beast::error_code ec; beast::error_code ec;
@ -207,15 +194,15 @@ public:
do_accept() do_accept()
{ {
acceptor_.async_accept( acceptor_.async_accept(
socket_,
std::bind( std::bind(
&listener::on_accept, &listener::on_accept,
shared_from_this(), shared_from_this(),
std::placeholders::_1)); std::placeholders::_1,
std::placeholders::_2));
} }
void void
on_accept(beast::error_code ec) on_accept(beast::error_code ec, tcp::socket socket)
{ {
if(ec) if(ec)
{ {
@ -224,7 +211,7 @@ public:
else else
{ {
// Create the session and run it // Create the session and run it
std::make_shared<session>(std::move(socket_))->run(); std::make_shared<session>(std::move(socket))->run();
} }
// Accept another connection // Accept another connection

View File

@ -190,11 +190,10 @@ handle_request(
http_session:: http_session::
http_session( http_session(
tcp::socket socket, tcp::socket&& socket,
boost::shared_ptr<shared_state> const& state) boost::shared_ptr<shared_state> const& state)
: socket_(std::move(socket)) : stream_(std::move(socket))
, state_(state) , state_(state)
, strand_(socket_.get_executor())
{ {
} }
@ -202,14 +201,16 @@ void
http_session:: http_session::
run() run()
{ {
// Set the timeout.
stream_.expires_after(std::chrono::seconds(30));
// Read a request // Read a request
http::async_read(socket_, buffer_, req_, http::async_read(stream_, buffer_, req_,
net::bind_executor(strand_, std::bind(
std::bind( &http_session::on_read,
&http_session::on_read, shared_from_this(),
shared_from_this(), std::placeholders::_1,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_2)));
} }
// Report a failure // Report a failure
@ -239,13 +240,12 @@ operator()(http::message<isRequest, Body, Fields>&& msg) const
// Write the response // Write the response
auto self = self_.shared_from_this(); auto self = self_.shared_from_this();
http::async_write( http::async_write(
self_.socket_, self_.stream_,
*sp, *sp,
net::bind_executor(self_.strand_, [self, sp](beast::error_code ec, std::size_t bytes)
[self, sp](beast::error_code ec, std::size_t bytes) {
{ self->on_write(ec, bytes, sp->need_eof());
self->on_write(ec, bytes, sp->need_eof()); });
}));
} }
void void
@ -255,7 +255,7 @@ on_read(beast::error_code ec, std::size_t)
// This means they closed the connection // This means they closed the connection
if(ec == http::error::end_of_stream) if(ec == http::error::end_of_stream)
{ {
socket_.shutdown(tcp::socket::shutdown_send, ec); stream_.socket().shutdown(tcp::socket::shutdown_send, ec);
return; return;
} }
@ -268,7 +268,8 @@ on_read(beast::error_code ec, std::size_t)
{ {
// Create a WebSocket session by transferring the socket // Create a WebSocket session by transferring the socket
boost::make_shared<websocket_session>( boost::make_shared<websocket_session>(
std::move(socket_), state_)->run(std::move(req_)); std::move(stream_.release_socket()),
state_)->run(std::move(req_));
return; return;
} }
@ -292,23 +293,21 @@ on_read(beast::error_code ec, std::size_t)
#if 0 #if 0
// NOTE This causes an ICE in gcc 7.3 // NOTE This causes an ICE in gcc 7.3
// Write the response // Write the response
http::async_write(this->socket_, *sp, http::async_write(this->stream_, *sp,
net::bind_executor(strand_, [self = shared_from_this(), sp](
[self = shared_from_this(), sp]( beast::error_code ec, std::size_t bytes)
beast::error_code ec, std::size_t bytes) {
{ self->on_write(ec, bytes, sp->need_eof());
self->on_write(ec, bytes, sp->need_eof()); });
}));
#else #else
// Write the response // Write the response
auto self = shared_from_this(); auto self = shared_from_this();
http::async_write(this->socket_, *sp, http::async_write(stream_, *sp,
net::bind_executor(strand_, [self, sp](
[self, sp]( beast::error_code ec, std::size_t bytes)
beast::error_code ec, std::size_t bytes) {
{ self->on_write(ec, bytes, sp->need_eof());
self->on_write(ec, bytes, sp->need_eof()); });
}));
#endif #endif
}); });
#else #else
@ -336,7 +335,7 @@ on_write(beast::error_code ec, std::size_t, bool close)
{ {
// This means we should close the connection, usually because // This means we should close the connection, usually because
// the response indicated the "Connection: close" semantic. // the response indicated the "Connection: close" semantic.
socket_.shutdown(tcp::socket::shutdown_send, ec); stream_.socket().shutdown(tcp::socket::shutdown_send, ec);
return; return;
} }
@ -344,8 +343,11 @@ on_write(beast::error_code ec, std::size_t, bool close)
// otherwise the read behavior is undefined. // otherwise the read behavior is undefined.
req_ = {}; req_ = {};
// Set the timeout.
stream_.expires_after(std::chrono::seconds(30));
// Read another request // Read another request
http::async_read(socket_, buffer_, req_, http::async_read(stream_, buffer_, req_,
std::bind( std::bind(
&http_session::on_read, &http_session::on_read,
shared_from_this(), shared_from_this(),

View File

@ -21,11 +21,10 @@
*/ */
class http_session : public boost::enable_shared_from_this<http_session> class http_session : public boost::enable_shared_from_this<http_session>
{ {
tcp::socket socket_; beast::tcp_stream<net::io_context::strand> stream_;
beast::flat_buffer buffer_; beast::flat_buffer buffer_;
boost::shared_ptr<shared_state> state_; boost::shared_ptr<shared_state> state_;
http::request<http::string_body> req_; http::request<http::string_body> req_;
net::strand<net::io_context::executor_type> strand_;
struct send_lambda struct send_lambda
{ {
@ -48,7 +47,7 @@ class http_session : public boost::enable_shared_from_this<http_session>
public: public:
http_session( http_session(
tcp::socket socket, tcp::socket&& socket,
boost::shared_ptr<shared_state> const& state); boost::shared_ptr<shared_state> const& state);
void run(); void run();

View File

@ -17,7 +17,6 @@ listener(
tcp::endpoint endpoint, tcp::endpoint endpoint,
boost::shared_ptr<shared_state> const& state) boost::shared_ptr<shared_state> const& state)
: acceptor_(ioc) : acceptor_(ioc)
, socket_(ioc)
, state_(state) , state_(state)
{ {
beast::error_code ec; beast::error_code ec;
@ -62,11 +61,11 @@ run()
{ {
// Start accepting a connection // Start accepting a connection
acceptor_.async_accept( acceptor_.async_accept(
socket_,
std::bind( std::bind(
&listener::on_accept, &listener::on_accept,
shared_from_this(), shared_from_this(),
std::placeholders::_1)); std::placeholders::_1,
std::placeholders::_2));
} }
// Report a failure // Report a failure
@ -83,21 +82,21 @@ fail(beast::error_code ec, char const* what)
// Handle a connection // Handle a connection
void void
listener:: listener::
on_accept(beast::error_code ec) on_accept(beast::error_code ec, tcp::socket socket)
{ {
if(ec) if(ec)
return fail(ec, "accept"); return fail(ec, "accept");
else else
// Launch a new session for this connection // Launch a new session for this connection
boost::make_shared<http_session>( boost::make_shared<http_session>(
std::move(socket_), std::move(socket),
state_)->run(); state_)->run();
// Accept another connection // Accept another connection
acceptor_.async_accept( acceptor_.async_accept(
socket_,
std::bind( std::bind(
&listener::on_accept, &listener::on_accept,
shared_from_this(), shared_from_this(),
std::placeholders::_1)); std::placeholders::_1,
std::placeholders::_2));
} }

View File

@ -23,11 +23,10 @@ class shared_state;
class listener : public boost::enable_shared_from_this<listener> class listener : public boost::enable_shared_from_this<listener>
{ {
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
tcp::socket socket_;
boost::shared_ptr<shared_state> state_; boost::shared_ptr<shared_state> state_;
void fail(beast::error_code ec, char const* what); void fail(beast::error_code ec, char const* what);
void on_accept(beast::error_code ec); void on_accept(beast::error_code ec, tcp::socket socket);
public: public:
listener( listener(

View File

@ -35,7 +35,7 @@ main(int argc, char* argv[])
std::cerr << std::cerr <<
"Usage: websocket-chat-multi <address> <port> <doc_root> <threads>\n" << "Usage: websocket-chat-multi <address> <port> <doc_root> <threads>\n" <<
"Example:\n" << "Example:\n" <<
" websocket-chat-server 0.0.0.0 8080 .\n"; " websocket-chat-server 0.0.0.0 8080 . 5\n";
return EXIT_FAILURE; return EXIT_FAILURE;
} }
auto address = net::ip::make_address(argv[1]); auto address = net::ip::make_address(argv[1]);

View File

@ -12,11 +12,10 @@
websocket_session:: websocket_session::
websocket_session( websocket_session(
tcp::socket socket, tcp::socket&& socket,
boost::shared_ptr<shared_state> const& state) boost::shared_ptr<shared_state> const& state)
: ws_(std::move(socket)) : ws_(std::move(socket))
, state_(state) , state_(state)
, strand_(ws_.get_executor())
{ {
} }
@ -53,12 +52,11 @@ on_accept(beast::error_code ec)
// Read a message // Read a message
ws_.async_read( ws_.async_read(
buffer_, buffer_,
net::bind_executor(strand_, std::bind(
std::bind( &websocket_session::on_read,
&websocket_session::on_read, shared_from_this(),
shared_from_this(), std::placeholders::_1,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_2)));
} }
void void
@ -78,12 +76,11 @@ on_read(beast::error_code ec, std::size_t)
// Read another message // Read another message
ws_.async_read( ws_.async_read(
buffer_, buffer_,
net::bind_executor(strand_, std::bind(
std::bind( &websocket_session::on_read,
&websocket_session::on_read, shared_from_this(),
shared_from_this(), std::placeholders::_1,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_2)));
} }
void void
@ -93,13 +90,13 @@ send(boost::shared_ptr<std::string const> const& ss)
// Get on the strand if we aren't already, // Get on the strand if we aren't already,
// otherwise we will concurrently access // otherwise we will concurrently access
// objects which are not thread-safe. // objects which are not thread-safe.
if(! strand_.running_in_this_thread()) if(! ws_.get_executor().running_in_this_thread())
return net::post( return net::post(
net::bind_executor(strand_, ws_.get_executor(),
std::bind( std::bind(
&websocket_session::send, &websocket_session::send,
shared_from_this(), shared_from_this(),
ss))); ss));
// Always add to queue // Always add to queue
queue_.push_back(ss); queue_.push_back(ss);
@ -111,12 +108,11 @@ send(boost::shared_ptr<std::string const> const& ss)
// We are not currently writing, so send this immediately // We are not currently writing, so send this immediately
ws_.async_write( ws_.async_write(
net::buffer(*queue_.front()), net::buffer(*queue_.front()),
net::bind_executor(strand_, std::bind(
std::bind( &websocket_session::on_write,
&websocket_session::on_write, shared_from_this(),
shared_from_this(), std::placeholders::_1,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_2)));
} }
void void
@ -134,10 +130,9 @@ on_write(beast::error_code ec, std::size_t)
if(! queue_.empty()) if(! queue_.empty())
ws_.async_write( ws_.async_write(
net::buffer(*queue_.front()), net::buffer(*queue_.front()),
net::bind_executor(strand_, std::bind(
std::bind( &websocket_session::on_write,
&websocket_session::on_write, shared_from_this(),
shared_from_this(), std::placeholders::_1,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_2)));
} }

View File

@ -27,10 +27,9 @@ class shared_state;
class websocket_session : public boost::enable_shared_from_this<websocket_session> class websocket_session : public boost::enable_shared_from_this<websocket_session>
{ {
beast::flat_buffer buffer_; beast::flat_buffer buffer_;
websocket::stream<tcp::socket> ws_; websocket::stream<beast::tcp_stream<net::io_context::strand>> ws_;
boost::shared_ptr<shared_state> state_; boost::shared_ptr<shared_state> state_;
std::vector<boost::shared_ptr<std::string const>> queue_; std::vector<boost::shared_ptr<std::string const>> queue_;
net::strand<net::io_context::executor_type> strand_;
void fail(beast::error_code ec, char const* what); void fail(beast::error_code ec, char const* what);
void on_accept(beast::error_code ec); void on_accept(beast::error_code ec);
@ -39,7 +38,7 @@ class websocket_session : public boost::enable_shared_from_this<websocket_sessio
public: public:
websocket_session( websocket_session(
tcp::socket socket, tcp::socket&& socket,
boost::shared_ptr<shared_state> const& state); boost::shared_ptr<shared_state> const& state);
~websocket_session(); ~websocket_session();

View File

@ -18,9 +18,8 @@
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp> #include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/beast/_experimental/core/ssl_stream.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
@ -39,6 +38,11 @@ using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// The type of websocket stream to use
// Stackful coroutines are already stranded.
using ws_type = websocket::stream<beast::ssl_stream<
beast::tcp_stream<net::io_context::executor_type>>>;
// Report a failure // Report a failure
void void
fail(beast::error_code ec, char const* what) fail(beast::error_code ec, char const* what)
@ -49,14 +53,13 @@ fail(beast::error_code ec, char const* what)
// Echoes back all received WebSocket messages // Echoes back all received WebSocket messages
void void
do_session( do_session(
tcp::socket& socket, ws_type& ws,
ssl::context& ctx,
net::yield_context yield) net::yield_context yield)
{ {
beast::error_code ec; beast::error_code ec;
// Construct the stream by moving in the socket // Set the timeout.
websocket::stream<ssl::stream<tcp::socket&>> ws{socket, ctx}; beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake // Perform the SSL handshake
ws.next_layer().async_handshake(ssl::stream_base::server, yield[ec]); ws.next_layer().async_handshake(ssl::stream_base::server, yield[ec]);
@ -135,8 +138,7 @@ do_listen(
acceptor.get_executor().context(), acceptor.get_executor().context(),
std::bind( std::bind(
&do_session, &do_session,
std::move(socket), ws_type(std::move(socket), ctx),
std::ref(ctx),
std::placeholders::_1)); std::placeholders::_1));
} }
} }

View File

@ -15,7 +15,6 @@
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
@ -34,6 +33,11 @@ using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// The type of websocket stream to use
// Stackful coroutines are already stranded.
using ws_type = websocket::stream<
beast::tcp_stream<net::io_context::executor_type>>;
// Report a failure // Report a failure
void void
fail(beast::error_code ec, char const* what) fail(beast::error_code ec, char const* what)
@ -43,13 +47,10 @@ fail(beast::error_code ec, char const* what)
// Echoes back all received WebSocket messages // Echoes back all received WebSocket messages
void void
do_session(tcp::socket& socket, net::yield_context yield) do_session(ws_type& ws, net::yield_context yield)
{ {
beast::error_code ec; beast::error_code ec;
// Construct the stream by moving in the socket
websocket::stream<tcp::socket> ws{std::move(socket)};
// Accept the websocket handshake // Accept the websocket handshake
ws.async_accept(yield[ec]); ws.async_accept(yield[ec]);
if(ec) if(ec)
@ -121,7 +122,7 @@ do_listen(
acceptor.get_executor().context(), acceptor.get_executor().context(),
std::bind( std::bind(
&do_session, &do_session,
std::move(socket), ws_type(std::move(socket)),
std::placeholders::_1)); std::placeholders::_1));
} }
} }

View File

@ -30,10 +30,8 @@
#include <boost/beast/http.hpp> #include <boost/beast/http.hpp>
#include <boost/beast/version.hpp> #include <boost/beast/version.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
@ -80,12 +78,16 @@ setup_stream(websocket::stream<NextLayer>& ws)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// The type of websocket stream to use
// Stackful coroutines are already stranded.
using ws_type = websocket::stream<
beast::tcp_stream<net::io_context::executor_type>>;
void void
do_sync_session(tcp::socket& socket) do_sync_session(ws_type& ws)
{ {
beast::error_code ec; beast::error_code ec;
websocket::stream<tcp::socket> ws{std::move(socket)};
setup_stream(ws); setup_stream(ws);
ws.accept_ex( ws.accept_ex(
@ -131,7 +133,7 @@ do_sync_listen(
std::thread{std::bind( std::thread{std::bind(
&do_sync_session, &do_sync_session,
std::move(socket))}.detach(); ws_type(std::move(socket)))}.detach();
} }
} }
@ -140,9 +142,7 @@ do_sync_listen(
// Echoes back all received WebSocket messages // Echoes back all received WebSocket messages
class async_session : public std::enable_shared_from_this<async_session> class async_session : public std::enable_shared_from_this<async_session>
{ {
websocket::stream<tcp::socket> ws_; websocket::stream<beast::tcp_stream<net::io_context::strand>> ws_;
net::strand<
net::io_context::executor_type> strand_;
beast::multi_buffer buffer_; beast::multi_buffer buffer_;
public: public:
@ -150,7 +150,6 @@ public:
explicit explicit
async_session(tcp::socket socket) async_session(tcp::socket socket)
: ws_(std::move(socket)) : ws_(std::move(socket))
, strand_(ws_.get_executor())
{ {
setup_stream(ws_); setup_stream(ws_);
} }
@ -166,12 +165,10 @@ public:
res.set(http::field::server, res.set(http::field::server,
"Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Async"); "Boost.Beast/" + std::to_string(BOOST_BEAST_VERSION) + "-Async");
}, },
net::bind_executor( std::bind(
strand_, &async_session::on_accept,
std::bind( shared_from_this(),
&async_session::on_accept, std::placeholders::_1));
shared_from_this(),
std::placeholders::_1)));
} }
void void
@ -190,13 +187,11 @@ public:
// Read a message into our buffer // Read a message into our buffer
ws_.async_read( ws_.async_read(
buffer_, buffer_,
net::bind_executor( std::bind(
strand_, &async_session::on_read,
std::bind( shared_from_this(),
&async_session::on_read, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
} }
void void
@ -217,13 +212,11 @@ public:
ws_.text(ws_.got_text()); ws_.text(ws_.got_text());
ws_.async_write( ws_.async_write(
buffer_.data(), buffer_.data(),
net::bind_executor( std::bind(
strand_, &async_session::on_write,
std::bind( shared_from_this(),
&async_session::on_write, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
} }
void void
@ -247,18 +240,13 @@ public:
// Accepts incoming connections and launches the sessions // Accepts incoming connections and launches the sessions
class async_listener : public std::enable_shared_from_this<async_listener> class async_listener : public std::enable_shared_from_this<async_listener>
{ {
net::strand<
net::io_context::executor_type> strand_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
tcp::socket socket_;
public: public:
async_listener( async_listener(
net::io_context& ioc, net::io_context& ioc,
tcp::endpoint endpoint) tcp::endpoint endpoint)
: strand_(ioc.get_executor()) : acceptor_(ioc)
, acceptor_(ioc)
, socket_(ioc)
{ {
beast::error_code ec; beast::error_code ec;
@ -309,17 +297,15 @@ public:
do_accept() do_accept()
{ {
acceptor_.async_accept( acceptor_.async_accept(
socket_, std::bind(
net::bind_executor( &async_listener::on_accept,
strand_, shared_from_this(),
std::bind( std::placeholders::_1,
&async_listener::on_accept, std::placeholders::_2));
shared_from_this(),
std::placeholders::_1)));
} }
void void
on_accept(beast::error_code ec) on_accept(beast::error_code ec, tcp::socket socket)
{ {
if(ec) if(ec)
{ {
@ -328,7 +314,7 @@ public:
else else
{ {
// Create the async_session and run it // Create the async_session and run it
std::make_shared<async_session>(std::move(socket_))->run(); std::make_shared<async_session>(std::move(socket))->run();
} }
// Accept another connection // Accept another connection
@ -339,11 +325,10 @@ public:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void
do_coro_session(tcp::socket& socket, net::yield_context yield) do_coro_session(ws_type& ws, net::yield_context yield)
{ {
beast::error_code ec; beast::error_code ec;
websocket::stream<tcp::socket> ws{std::move(socket)};
setup_stream(ws); setup_stream(ws);
ws.async_accept_ex( ws.async_accept_ex(
@ -413,7 +398,7 @@ do_coro_listen(
acceptor.get_executor().context(), acceptor.get_executor().context(),
std::bind( std::bind(
&do_coro_session, &do_coro_session,
std::move(socket), ws_type(std::move(socket)),
std::placeholders::_1)); std::placeholders::_1));
} }
} }

View File

@ -18,10 +18,8 @@
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp> #include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/bind_executor.hpp> #include <boost/beast/_experimental/core/ssl_stream.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
@ -53,18 +51,14 @@ class session
: public net::coroutine : public net::coroutine
, public std::enable_shared_from_this<session> , public std::enable_shared_from_this<session>
{ {
tcp::socket socket_; websocket::stream<beast::ssl_stream<
websocket::stream<ssl::stream<tcp::socket&>> ws_; beast::tcp_stream<net::io_context::strand>>> ws_;
net::strand<
net::io_context::executor_type> strand_;
beast::multi_buffer buffer_; beast::multi_buffer buffer_;
public: public:
// Take ownership of the socket // Take ownership of the socket
session(tcp::socket socket, ssl::context& ctx) session(tcp::socket&& socket, ssl::context& ctx)
: socket_(std::move(socket)) : ws_(std::move(socket), ctx)
, ws_(socket_, ctx)
, strand_(ws_.get_executor())
{ {
} }
@ -88,25 +82,21 @@ public:
// Perform the SSL handshake // Perform the SSL handshake
yield ws_.next_layer().async_handshake( yield ws_.next_layer().async_handshake(
ssl::stream_base::server, ssl::stream_base::server,
net::bind_executor( std::bind(
strand_, &session::loop,
std::bind( shared_from_this(),
&session::loop, std::placeholders::_1,
shared_from_this(), 0));
std::placeholders::_1,
0)));
if(ec) if(ec)
return fail(ec, "handshake"); return fail(ec, "handshake");
// Accept the websocket handshake // Accept the websocket handshake
yield ws_.async_accept( yield ws_.async_accept(
net::bind_executor( std::bind(
strand_, &session::loop,
std::bind( shared_from_this(),
&session::loop, std::placeholders::_1,
shared_from_this(), 0));
std::placeholders::_1,
0)));
if(ec) if(ec)
return fail(ec, "accept"); return fail(ec, "accept");
@ -115,13 +105,11 @@ public:
// Read a message into our buffer // Read a message into our buffer
yield ws_.async_read( yield ws_.async_read(
buffer_, buffer_,
net::bind_executor( std::bind(
strand_, &session::loop,
std::bind( shared_from_this(),
&session::loop, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
if(ec == websocket::error::closed) if(ec == websocket::error::closed)
{ {
// This indicates that the session was closed // This indicates that the session was closed
@ -134,13 +122,11 @@ public:
ws_.text(ws_.got_text()); ws_.text(ws_.got_text());
yield ws_.async_write( yield ws_.async_write(
buffer_.data(), buffer_.data(),
net::bind_executor( std::bind(
strand_, &session::loop,
std::bind( shared_from_this(),
&session::loop, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
if(ec) if(ec)
return fail(ec, "write"); return fail(ec, "write");

View File

@ -15,10 +15,8 @@
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
@ -48,9 +46,8 @@ class session
: public net::coroutine : public net::coroutine
, public std::enable_shared_from_this<session> , public std::enable_shared_from_this<session>
{ {
websocket::stream<tcp::socket> ws_; websocket::stream<
net::strand< beast::tcp_stream<net::io_context::strand>> ws_;
net::io_context::executor_type> strand_;
beast::multi_buffer buffer_; beast::multi_buffer buffer_;
public: public:
@ -58,7 +55,6 @@ public:
explicit explicit
session(tcp::socket socket) session(tcp::socket socket)
: ws_(std::move(socket)) : ws_(std::move(socket))
, strand_(ws_.get_executor())
{ {
} }
@ -80,13 +76,11 @@ public:
{ {
// Accept the websocket handshake // Accept the websocket handshake
yield ws_.async_accept( yield ws_.async_accept(
net::bind_executor( std::bind(
strand_, &session::loop,
std::bind( shared_from_this(),
&session::loop, std::placeholders::_1,
shared_from_this(), 0));
std::placeholders::_1,
0)));
if(ec) if(ec)
return fail(ec, "accept"); return fail(ec, "accept");
@ -95,13 +89,11 @@ public:
// Read a message into our buffer // Read a message into our buffer
yield ws_.async_read( yield ws_.async_read(
buffer_, buffer_,
net::bind_executor( std::bind(
strand_, &session::loop,
std::bind( shared_from_this(),
&session::loop, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
if(ec == websocket::error::closed) if(ec == websocket::error::closed)
{ {
// This indicates that the session was closed // This indicates that the session was closed
@ -114,13 +106,11 @@ public:
ws_.text(ws_.got_text()); ws_.text(ws_.got_text());
yield ws_.async_write( yield ws_.async_write(
buffer_.data(), buffer_.data(),
net::bind_executor( std::bind(
strand_, &session::loop,
std::bind( shared_from_this(),
&session::loop, std::placeholders::_1,
shared_from_this(), std::placeholders::_2));
std::placeholders::_1,
std::placeholders::_2)));
if(ec) if(ec)
return fail(ec, "write"); return fail(ec, "write");