diff --git a/CHANGELOG.md b/CHANGELOG.md index b6985faa..bc28e88b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ HTTP: * Refactor file_body for best practices * Add http-server-threaded example +WebSocket: + +* Add websocket-server-async example + -------------------------------------------------------------------------------- Version 71: diff --git a/doc/2_examples.qbk b/doc/2_examples.qbk index a94de120..6ecd97aa 100644 --- a/doc/2_examples.qbk +++ b/doc/2_examples.qbk @@ -111,6 +111,17 @@ send a message and receive the reply. Requires OpenSSL to build. +[section WebSocket Server (Asynchronous)] + +This program implements a WebSocket echo server using asynchronous +interfaces and a configurable number of threads. + +* [repo_file example/websocket-server-async/websocket_server_async.cpp] + +[endsect] + + + [section Documentation Samples] Here are all of the example functions and classes presented @@ -144,6 +155,7 @@ stand alone can be directly included in your projects. * [repo_file example/common/const_body.hpp] * [repo_file example/common/detect_ssl.hpp] * [repo_file example/common/file_body.hpp] +* [repo_file example/common/helpers.hpp] * [repo_file example/common/mime_types.hpp] * [repo_file example/common/mutable_body.hpp] * [repo_file example/common/rfc7231.hpp] diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index b42db182..cfaad504 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory (http-server-small) add_subdirectory (http-server-threaded) add_subdirectory (server-framework) add_subdirectory (websocket-client) +add_subdirectory (websocket-server-async) if (OPENSSL_FOUND) add_subdirectory (http-client-ssl) diff --git a/example/Jamfile b/example/Jamfile index 2d913f4f..61c87b8f 100644 --- a/example/Jamfile +++ b/example/Jamfile @@ -13,6 +13,7 @@ build-project http-server-small ; build-project http-server-threaded ; build-project server-framework ; build-project websocket-client ; +build-project websocket-server-async ; # VFALCO How do I make this work on Windows and if OpenSSL is not available? #build-project ssl-http-client ; diff --git a/example/common/helpers.hpp b/example/common/helpers.hpp new file mode 100644 index 00000000..784adc56 --- /dev/null +++ b/example/common/helpers.hpp @@ -0,0 +1,56 @@ +// +// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BEAST_EXAMPLE_COMMON_HELPERS_HPP +#define BEAST_EXAMPLE_COMMON_HELPERS_HPP + +#include +#include +#include +#include + +/// Block until SIGINT or SIGTERM is received. +inline +void +sig_wait() +{ + boost::asio::io_service ios{1}; + boost::asio::signal_set signals(ios, SIGINT, SIGTERM); + signals.async_wait([&](boost::system::error_code const&, int){}); + ios.run(); +} + +namespace detail { + +inline +void +print_1(std::ostream&) +{ +} + +template +void +print_1(std::ostream& os, T1 const& t1, TN const&... tn) +{ + os << t1; + print_1(os, tn...); +} + +} // detail + +// compose a string to std::cout or std::cerr atomically +// +template +void +print(std::ostream& os, Args const&... args) +{ + std::stringstream ss; + detail::print_1(ss, args...); + os << ss.str() << std::endl; +} + +#endif diff --git a/example/websocket-server-async/CMakeLists.txt b/example/websocket-server-async/CMakeLists.txt new file mode 100644 index 00000000..b0f5ad18 --- /dev/null +++ b/example/websocket-server-async/CMakeLists.txt @@ -0,0 +1,13 @@ +# Part of Beast + +GroupSources(include/beast beast) +GroupSources(example/websocket-server-async "/") + +add_executable (websocket-server-async + ${BEAST_INCLUDES} + websocket_server_async.cpp +) + +target_link_libraries(websocket-server-async + Beast + ) diff --git a/example/websocket-server-async/Jamfile b/example/websocket-server-async/Jamfile new file mode 100644 index 00000000..59571f2f --- /dev/null +++ b/example/websocket-server-async/Jamfile @@ -0,0 +1,13 @@ +# +# Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# + +exe websocket-server-async : + websocket_server_async.cpp + : + coverage:no + ubasan:no + ; diff --git a/example/websocket-server-async/websocket_server_async.cpp b/example/websocket-server-async/websocket_server_async.cpp new file mode 100644 index 00000000..55978f0a --- /dev/null +++ b/example/websocket-server-async/websocket_server_async.cpp @@ -0,0 +1,463 @@ +// +// Copyright (c) 2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include "../common/helpers.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace http = beast::http; // from +namespace websocket = beast::websocket; // from +namespace ip = boost::asio::ip; // from +using tcp = boost::asio::ip::tcp; // from + +//------------------------------------------------------------------------------ +// +// Example: WebSocket echo server, asynchronous +// +//------------------------------------------------------------------------------ + +/** WebSocket asynchronous echo server + + The server holds the listening socket, the io_service, and + the threads calling io_service::run +*/ +class server +{ + using error_code = beast::error_code; // Saves typing + using clock_type = + std::chrono::steady_clock; // For the timer + using stream_type = + websocket::stream; // The type of our websocket stream + std::ostream* log_; // Used for diagnostic output, may be null + boost::asio::io_service ios_; // The io_service, required + tcp::socket sock_; // Holds accepted connections + tcp::endpoint ep_; // The remote endpoint during accept + std::vector thread_; // Threads for the io_service + boost::asio::ip::tcp::acceptor acceptor_; // The listening socket + std::function mod_; // Called on new stream + boost::optional< + boost::asio::io_service::work> work_; // Keeps io_service::run from returning + + //-------------------------------------------------------------------------- + + class connection : public std::enable_shared_from_this + { + std::ostream* log_; // Where to log, may be null + tcp::endpoint ep_; // The remote endpoing + stream_type ws_; // The websocket stream + boost::asio::basic_waitable_timer< + clock_type> timer_; // Needed for timeouts + boost::asio::io_service::strand strand_;// Needed when threads > 1 + beast::multi_buffer buffer_; // Stores the current message + beast::drain_buffer drain_; // Helps discard data on close + std::size_t id_; // A small unique id + + public: + /// Constructor + connection( + server& parent, + tcp::endpoint const& ep, + tcp::socket&& sock) + : log_(parent.log_) + , ep_(ep) + , ws_(std::move(sock)) + , timer_(ws_.get_io_service(), (clock_type::time_point::max)()) + , strand_(ws_.get_io_service()) + , id_([] + { + static std::atomic n{0}; + return ++n; + }()) + { + // Invoke the callback for new connections if set. + // This allows the settings on the websocket stream + // to be adjusted. For example to turn compression + // on or off or adjust the read and write buffer sizes. + // + if(parent.mod_) + parent.mod_(ws_); + } + + // Called immediately after the connection is created. + // We keep this separate from the constructor because + // shared_from_this may not be called from constructors. + void run() + { + // Run the timer + on_timer({}); + + // Put the handshake on the timer + timer_.expires_from_now(std::chrono::seconds(15)); + + // Read the websocket handshake and send the response + ws_.async_accept_ex( + [](websocket::response_type& res) + { + res.insert(http::field::server, "websocket-server-async"); + }, + strand_.wrap(std::bind( + &connection::on_accept, + shared_from_this(), + std::placeholders::_1))); + } + + private: + // Called when the timer expires. + // We operate the timer continuously this simplifies the code. + // + void on_timer(error_code ec) + { + if(ec && ec != boost::asio::error::operation_aborted) + return fail("timer", ec); + + // Verify that the timer really expired + // since the deadline may have moved. + // + if(timer_.expires_at() <= clock_type::now()) + { + // Closing the socket cancels all outstanding + // operations. They will complete with + // boost::asio::error::operation_aborted + // + ws_.next_layer().close(ec); + return; + } + + // Wait on the timer + timer_.async_wait( + strand_.wrap(std::bind( + &connection::on_timer, + shared_from_this(), + std::placeholders::_1))); + } + + // Called after the handshake is performed + void on_accept(error_code ec) + { + if(ec) + return fail("accept", ec); + do_read(); + } + + // Read a message from the websocket stream + void do_read() + { + // Put the read on the timer + timer_.expires_from_now(std::chrono::seconds(15)); + + // Read a message + ws_.async_read(buffer_, + strand_.wrap(std::bind( + &connection::on_read, + shared_from_this(), + std::placeholders::_1))); + } + + // Called after the message read completes + void on_read(error_code ec) + { + // This error means the other side + // closed the websocket stream. + if(ec == websocket::error::closed) + return; + + if(ec) + return fail("read", ec); + + // Put the write on the timer + timer_.expires_from_now(std::chrono::seconds(15)); + + // Write the received message back + ws_.binary(ws_.got_binary()); + ws_.async_write(buffer_.data(), + strand_.wrap(std::bind( + &connection::on_write, + shared_from_this(), + std::placeholders::_1))); + } + + // Called after the message write completes + void on_write(error_code ec) + { + if(ec) + return fail("write", ec); + + // Empty out the buffer. This is + // needed if we want to do another read. + // + buffer_.consume(buffer_.size()); + + // This shows how the server can close the + // connection. Alternatively we could call + // do_read again and the connection would + // stay open until the other side closes it. + // + do_close(); + } + + // Sends a websocket close frame + void do_close() + { + // Put the close frame on the timer + timer_.expires_from_now(std::chrono::seconds(15)); + + // Send the close frame + ws_.async_close({}, + strand_.wrap(std::bind( + &connection::on_close, + shared_from_this(), + std::placeholders::_1))); + } + + // Called when writing the close frame completes + void on_close(error_code ec) + { + if(ec) + return fail("close", ec); + + on_drain({}); + } + + // Read and discard any leftover message data + void on_drain(error_code ec) + { + if(ec == websocket::error::closed) + { + // the connection has been closed gracefully + return; + } + + if(ec) + return fail("drain", ec); + + // WebSocket says that to close a connection you have + // to keep reading messages until you receive a close frame. + // Beast delivers the close frame as an error from read. + // + ws_.async_read(drain_, + strand_.wrap(std::bind( + &connection::on_drain, + shared_from_this(), + std::placeholders::_1))); + } + + // Pretty-print an error to the log + void fail(std::string what, error_code ec) + { + if(log_) + if(ec != boost::asio::error::operation_aborted) + print(*log_, "[#", id_, " ", ep_, "] ", what, ": ", ec.message()); + } + }; + + //-------------------------------------------------------------------------- + + // Pretty-print an error to the log + void fail(std::string what, error_code ec) + { + if(log_) + print(*log_, what, ": ", ec.message()); + } + + // Initiates an accept + void do_accept() + { + acceptor_.async_accept(sock_, ep_, + std::bind(&server::on_accept, this, + std::placeholders::_1)); + } + + // Called when receiving an incoming connection + void on_accept(error_code ec) + { + // This can happen during exit + if(! acceptor_.is_open()) + return; + + // This can happen during exit + if(ec == boost::asio::error::operation_aborted) + return; + + if(ec) + fail("accept", ec); + + // Create the connection and run it + std::make_shared(*this, ep_, std::move(sock_))->run(); + + // Initiate another accept + do_accept(); + } + +public: + /** Constructor. + + @param log A pointer to a stream to log to, or `nullptr` + to disable logging. + + @param threads The number of threads in the io_service. + */ + server(std::ostream* log, std::size_t threads) + : log_(log) + , sock_(ios_) + , acceptor_(ios_) + , work_(ios_) + { + thread_.reserve(threads); + for(std::size_t i = 0; i < threads; ++i) + thread_.emplace_back( + [&]{ ios_.run(); }); + } + + /// Destructor. + ~server() + { + work_ = boost::none; + ios_.dispatch([&] + { + error_code ec; + acceptor_.close(ec); + }); + for(auto& t : thread_) + t.join(); + } + + /// Return the listening endpoint. + tcp::endpoint + local_endpoint() const + { + return acceptor_.local_endpoint(); + } + + /** Set a handler called for new streams. + + This function is called for each new stream. + It is used to set options for every connection. + */ + template + void + on_new_stream(F const& f) + { + mod_ = f; + } + + /** Open a listening port. + + @param ep The address and port to bind to. + + @param ec Set to the error, if any occurred. + */ + void + open(tcp::endpoint const& ep, error_code& ec) + { + acceptor_.open(ep.protocol(), ec); + if(ec) + return fail("open", ec); + acceptor_.set_option( + boost::asio::socket_base::reuse_address{true}); + acceptor_.bind(ep, ec); + if(ec) + return fail("bind", ec); + acceptor_.listen( + boost::asio::socket_base::max_connections, ec); + if(ec) + return fail("listen", ec); + do_accept(); + } +}; + +//------------------------------------------------------------------------------ + +// This helper will apply some settings to a WebSocket +// stream. The server applies it to all new connections. +// +class set_stream_options +{ + websocket::permessage_deflate pmd_; + +public: + set_stream_options(set_stream_options const&) = default; + + explicit + set_stream_options( + websocket::permessage_deflate const& pmd) + : pmd_(pmd) + { + } + + template + void + operator()(websocket::stream& ws) const + { + ws.set_option(pmd_); + + // Turn off the auto-fragment option. + // This improves Autobahn performance. + // + ws.auto_fragment(false); + + // 64MB message size limit. + // The high limit is needed for Autobahn. + ws.read_message_max(64 * 1024 * 1024); + } +}; + +int main(int argc, char* argv[]) +{ + // Check command line arguments. + if(argc != 4) + { + std::cerr << + "Usage: " << argv[0] << "
\n" + " For IPv4, try: " << argv[0] << " 0.0.0.0 8080 1\n" + " For IPv6, try: " << argv[0] << " 0::0 8080 1\n" + ; + return EXIT_FAILURE; + } + + // Decode command line options + auto address = ip::address::from_string(argv[1]); + unsigned short port = static_cast(std::atoi(argv[2])); + unsigned short threads = static_cast(std::atoi(argv[3])); + + // Allow permessage-deflate + // compression on all connections + websocket::permessage_deflate pmd; + pmd.client_enable = true; + pmd.server_enable = true; + pmd.compLevel = 3; + + // Create our server + server s{&std::cout, threads}; + s.on_new_stream(set_stream_options{pmd}); + + // Open the listening port + beast::error_code ec; + s.open(tcp::endpoint{address, port}, ec); + if(ec) + { + std::cerr << "Error: " << ec.message(); + return EXIT_FAILURE; + } + + // Wait for CTRL+C. After receiving CTRL+C, + // the server should shut down cleanly. + // + sig_wait(); + + return EXIT_SUCCESS; +}