diff --git a/CHANGELOG.md b/CHANGELOG.md index e53f892d..d26df531 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ * Fixes for WebSocket echo server * Fix 32-bit arm7 warnings * Remove unnecessary include +* WebSocket server examples and test tidying API Changes: diff --git a/doc/examples.qbk b/doc/examples.qbk index fd869536..3ad72b09 100644 --- a/doc/examples.qbk +++ b/doc/examples.qbk @@ -85,6 +85,15 @@ int main() } ``` +[heading WebSocket Echo Server] + +This example demonstrates both synchronous and asynchronous +WebSocket server implementations. + +* [@examples/websocket_async_echo_server.hpp] +* [@examples/websocket_ssync_echo_server.hpp] +* [@examples/websocket_echo.cpp] + [heading Secure WebSocket] Establish a WebSocket connection over an encrypted TLS connection, diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d4815e90..6d9e8edb 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -41,6 +41,17 @@ if (NOT WIN32) target_link_libraries(http-example ${Boost_LIBRARIES} Threads::Threads) endif() +add_executable (websocket-echo + ${BEAST_INCLUDES} + websocket_async_echo_server.hpp + websocket_sync_echo_server.hpp + websocket_echo.cpp +) + +if (NOT WIN32) + target_link_libraries(websocket-echo ${Boost_LIBRARIES} Threads::Threads) +endif() + add_executable (websocket-example ${BEAST_INCLUDES} ${EXTRAS_INCLUDES} diff --git a/examples/Jamfile.v2 b/examples/Jamfile.v2 index 2dd0427b..0bfbb594 100644 --- a/examples/Jamfile.v2 +++ b/examples/Jamfile.v2 @@ -18,6 +18,10 @@ exe http-example : http_example.cpp ; +exe websocket-echo : + websocket_echo.cpp + ; + exe websocket-example : websocket_example.cpp ; diff --git a/examples/websocket_async_echo_server.hpp b/examples/websocket_async_echo_server.hpp new file mode 100644 index 00000000..3b0b9ae1 --- /dev/null +++ b/examples/websocket_async_echo_server.hpp @@ -0,0 +1,375 @@ +// +// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef WEBSOCKET_ASYNC_ECHO_SERVER_HPP +#define WEBSOCKET_ASYNC_ECHO_SERVER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace websocket { + +/** Asynchronous WebSocket echo client/server +*/ +class async_echo_server +{ +public: + using error_code = beast::error_code; + using address_type = boost::asio::ip::address; + using socket_type = boost::asio::ip::tcp::socket; + using endpoint_type = boost::asio::ip::tcp::endpoint; + +private: + struct identity + { + template + void + operator()(beast::http::message< + true, Body, Fields>& req) const + { + req.fields.replace("User-Agent", "async_echo_client"); + } + + template + void + operator()(beast::http::message< + false, Body, Fields>& resp) const + { + resp.fields.replace("Server", "async_echo_server"); + } + }; + + /** A container of type-erased option setters. + */ + template + class options_set + { + // workaround for std::function bug in msvc + struct callable + { + virtual ~callable() = default; + virtual void operator()( + beast::websocket::stream&) = 0; + }; + + template + class callable_impl : public callable + { + T t_; + + public: + template + callable_impl(U&& u) + : t_(std::forward(u)) + { + } + + void + operator()(beast::websocket::stream& ws) + { + t_(ws); + } + }; + + template + class lambda + { + Opt opt_; + + public: + lambda(lambda&&) = default; + lambda(lambda const&) = default; + + lambda(Opt const& opt) + : opt_(opt) + { + } + + void + operator()(beast::websocket::stream& ws) const + { + ws.set_option(opt_); + } + }; + + std::unordered_map> list_; + + public: + template + void + set_option(Opt const& opt) + { + std::unique_ptr p; + p.reset(new callable_impl>{opt}); + list_[std::type_index{ + typeid(Opt)}] = std::move(p); + } + + void + set_options(beast::websocket::stream& ws) + { + for(auto const& op : list_) + (*op.second)(ws); + } + }; + + std::ostream* log_; + boost::asio::io_service ios_; + socket_type sock_; + endpoint_type ep_; + boost::asio::ip::tcp::acceptor acceptor_; + std::vector thread_; + boost::optional work_; + options_set opts_; + +public: + async_echo_server(async_echo_server const&) = delete; + async_echo_server& operator=(async_echo_server const&) = delete; + + /** Constructor. + + @param log A pointer to a stream to log to, or `nullptr` + to disable logging. + + @param threads The number of threads in the io_service. + */ + async_echo_server(std::ostream* log, + std::size_t threads) + : log_(log) + , sock_(ios_) + , acceptor_(ios_) + , work_(ios_) + { + opts_.set_option( + beast::websocket::decorate(identity{})); + thread_.reserve(threads); + for(std::size_t i = 0; i < threads; ++i) + thread_.emplace_back( + [&]{ ios_.run(); }); + } + + /** Destructor. + */ + ~async_echo_server() + { + work_ = boost::none; + error_code ec; + ios_.dispatch( + [&]{ acceptor_.close(ec); }); + for(auto& t : thread_) + t.join(); + } + + /** Return the listening endpoint. + */ + endpoint_type + local_endpoint() const + { + return acceptor_.local_endpoint(); + } + + /** Set a websocket option. + + The option will be applied to all new connections. + + @param opt The option to apply. + */ + template + void + set_option(Opt const& opt) + { + opts_.set_option(opt); + } + + /** Open a listening port. + + @param ep The address and port to bind to. + + @param ec Set to the error, if any occurred. + */ + void + open(endpoint_type const& ep, error_code& ec) + { + acceptor_.open(ep.protocol(), ec); + if(ec) + return fail("open", ec); + acceptor_.set_option( + boost::asio::socket_base::reuse_address{true}); + acceptor_.bind(ep, ec); + if(ec) + return fail("bind", ec); + acceptor_.listen( + boost::asio::socket_base::max_connections, ec); + if(ec) + return fail("listen", ec); + acceptor_.async_accept(sock_, ep_, + std::bind(&async_echo_server::on_accept, this, + beast::asio::placeholders::error)); + } + +private: + class peer + { + struct data + { + async_echo_server& server; + endpoint_type ep; + int state = 0; + beast::websocket::stream ws; + boost::asio::io_service::strand strand; + beast::websocket::opcode op; + beast::streambuf db; + std::size_t id; + + data(async_echo_server& server_, + endpoint_type const& ep_, + socket_type&& sock_) + : server(server_) + , ep(ep_) + , ws(std::move(sock_)) + , strand(ws.get_io_service()) + , id([] + { + static std::atomic n{0}; + return ++n; + }()) + { + } + }; + + // VFALCO This could be unique_ptr in [Net.TS] + std::shared_ptr d_; + + public: + peer(peer&&) = default; + peer(peer const&) = default; + peer& operator=(peer&&) = delete; + peer& operator=(peer const&) = delete; + + template + explicit + peer(async_echo_server& server, + endpoint_type const& ep, socket_type&& sock, + Args&&... args) + : d_(std::make_shared(server, ep, + std::forward(sock), + std::forward(args)...)) + { + auto& d = *d_; + d.server.opts_.set_options(d.ws); + run(); + } + + void run() + { + auto& d = *d_; + d.ws.async_accept(std::move(*this)); + } + + void operator()(error_code ec, std::size_t) + { + (*this)(ec); + } + + void operator()(error_code ec) + { + using boost::asio::buffer; + using boost::asio::buffer_copy; + auto& d = *d_; + switch(d.state) + { + // did accept + case 0: + if(ec) + return fail("async_accept", ec); + + // start + case 1: + if(ec) + return fail("async_handshake", ec); + d.db.consume(d.db.size()); + // read message + d.state = 2; + d.ws.async_read(d.op, d.db, + d.strand.wrap(std::move(*this))); + return; + + // got message + case 2: + if(ec == beast::websocket::error::closed) + return; + if(ec) + return fail("async_read", ec); + // write message + d.state = 1; + d.ws.set_option( + beast::websocket::message_type(d.op)); + d.ws.async_write(d.db.data(), + d.strand.wrap(std::move(*this))); + return; + } + } + + private: + void + fail(std::string what, error_code ec) + { + auto& d = *d_; + if(d.server.log_) + if(ec != beast::websocket::error::closed) + d.server.fail("[#" + std::to_string(d.id) + + " " + boost::lexical_cast(d.ep) + + "] " + what, ec); + } + }; + + void + fail(std::string what, error_code ec) + { + if(log_) + { + static std::mutex m; + std::lock_guard lock{m}; + (*log_) << what << ": " << + ec.message() << std::endl; + } + } + + void + on_accept(error_code ec) + { + if(! acceptor_.is_open()) + return; + if(ec == boost::asio::error::operation_aborted) + return; + if(ec) + fail("accept", ec); + peer{*this, ep_, std::move(sock_)}; + acceptor_.async_accept(sock_, ep_, + std::bind(&async_echo_server::on_accept, this, + beast::asio::placeholders::error)); + } +}; + +} // websocket + +#endif diff --git a/examples/websocket_echo.cpp b/examples/websocket_echo.cpp new file mode 100644 index 00000000..bf7ac9cb --- /dev/null +++ b/examples/websocket_echo.cpp @@ -0,0 +1,56 @@ +// +// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include "websocket_async_echo_server.hpp" +#include "websocket_sync_echo_server.hpp" +#include +#include +#include + +/// Block until SIGINT or SIGTERM is received. +void +sig_wait() +{ + boost::asio::io_service ios; + boost::asio::signal_set signals( + ios, SIGINT, SIGTERM); + signals.async_wait( + [&](boost::system::error_code const&, int) + { + }); + ios.run(); +} + +int main() +{ + using namespace beast::websocket; + using endpoint_type = boost::asio::ip::tcp::endpoint; + using address_type = boost::asio::ip::address; + + beast::error_code ec; + + permessage_deflate pmd; + pmd.client_enable = true; + pmd.server_enable = true; + pmd.compLevel = 3; + + websocket::async_echo_server s1{&std::cout, 1}; + s1.set_option(read_message_max{64 * 1024 * 1024}); + s1.set_option(auto_fragment{false}); + s1.set_option(pmd); + s1.open(endpoint_type{ + address_type::from_string("127.0.0.1"), 6000 }, ec); + + websocket::sync_echo_server s2{&std::cout}; + s2.set_option(read_message_max{64 * 1024 * 1024}); + s2.set_option(auto_fragment{false}); + s2.set_option(pmd); + s2.open(endpoint_type{ + address_type::from_string("127.0.0.1"), 6001 }, ec); + + sig_wait(); +} diff --git a/examples/websocket_sync_echo_server.hpp b/examples/websocket_sync_echo_server.hpp new file mode 100644 index 00000000..a4b0ba3a --- /dev/null +++ b/examples/websocket_sync_echo_server.hpp @@ -0,0 +1,326 @@ +// +// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef WEBSOCKET_SYNC_ECHO_SERVER_HPP +#define WEBSOCKET_SYNC_ECHO_SERVER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace websocket { + +/** Synchronous WebSocket echo client/server +*/ +class sync_echo_server +{ +public: + using error_code = beast::error_code; + using endpoint_type = boost::asio::ip::tcp::endpoint; + using address_type = boost::asio::ip::address; + using socket_type = boost::asio::ip::tcp::socket; + +private: + struct identity + { + template + void + operator()(beast::http::message< + true, Body, Fields>& req) const + { + req.fields.replace("User-Agent", "sync_echo_client"); + } + + template + void + operator()(beast::http::message< + false, Body, Fields>& resp) const + { + resp.fields.replace("Server", "sync_echo_server"); + } + }; + + /** A container of type-erased option setters. + */ + template + class options_set + { + // workaround for std::function bug in msvc + struct callable + { + virtual ~callable() = default; + virtual void operator()( + beast::websocket::stream&) = 0; + }; + + template + class callable_impl : public callable + { + T t_; + + public: + template + callable_impl(U&& u) + : t_(std::forward(u)) + { + } + + void + operator()(beast::websocket::stream& ws) + { + t_(ws); + } + }; + + template + class lambda + { + Opt opt_; + + public: + lambda(lambda&&) = default; + lambda(lambda const&) = default; + + lambda(Opt const& opt) + : opt_(opt) + { + } + + void + operator()(beast::websocket::stream& ws) const + { + ws.set_option(opt_); + } + }; + + std::unordered_map> list_; + + public: + template + void + set_option(Opt const& opt) + { + std::unique_ptr p; + p.reset(new callable_impl>{opt}); + list_[std::type_index{ + typeid(Opt)}] = std::move(p); + } + + void + set_options(beast::websocket::stream& ws) + { + for(auto const& op : list_) + (*op.second)(ws); + } + }; + + std::ostream* log_; + boost::asio::io_service ios_; + socket_type sock_; + endpoint_type ep_; + boost::asio::ip::tcp::acceptor acceptor_; + std::thread thread_; + options_set opts_; + +public: + /** Constructor. + + @param log A pointer to a stream to log to, or `nullptr` + to disable logging. + */ + sync_echo_server(std::ostream* log) + : log_(log) + , sock_(ios_) + , acceptor_(ios_) + { + opts_.set_option( + beast::websocket::decorate(identity{})); + } + + /** Destructor. + */ + ~sync_echo_server() + { + if(thread_.joinable()) + { + error_code ec; + ios_.dispatch( + [&]{ acceptor_.close(ec); }); + thread_.join(); + } + } + + /** Return the listening endpoint. + */ + endpoint_type + local_endpoint() const + { + return acceptor_.local_endpoint(); + } + + /** Set a websocket option. + + The option will be applied to all new connections. + + @param opt The option to apply. + */ + template + void + set_option(Opt const& opt) + { + opts_.set_option(opt); + } + + /** Open a listening port. + + @param ep The address and port to bind to. + + @param ec Set to the error, if any occurred. + */ + void + open(endpoint_type const& ep, error_code& ec) + { + acceptor_.open(ep.protocol(), ec); + if(ec) + return fail("open", ec); + acceptor_.set_option( + boost::asio::socket_base::reuse_address{true}); + acceptor_.bind(ep, ec); + if(ec) + return fail("bind", ec); + acceptor_.listen( + boost::asio::socket_base::max_connections, ec); + if(ec) + return fail("listen", ec); + acceptor_.async_accept(sock_, ep_, + std::bind(&sync_echo_server::on_accept, this, + beast::asio::placeholders::error)); + thread_ = std::thread{[&]{ ios_.run(); }}; + } + +private: + void + fail(std::string what, error_code ec) + { + if(log_) + { + static std::mutex m; + std::lock_guard lock{m}; + (*log_) << what << ": " << + ec.message() << std::endl; + } + } + + void + fail(std::string what, error_code ec, + int id, endpoint_type const& ep) + { + if(log_) + if(ec != beast::websocket::error::closed) + fail("[#" + std::to_string(id) + " " + + boost::lexical_cast(ep) + + "] " + what, ec); + } + + void + on_accept(error_code ec) + { + if(ec == boost::asio::error::operation_aborted) + return; + if(ec) + return fail("accept", ec); + struct lambda + { + std::size_t id; + endpoint_type ep; + sync_echo_server& self; + boost::asio::io_service::work work; + // Must be destroyed before work otherwise the + // io_service could be destroyed before the socket. + socket_type sock; + + lambda(sync_echo_server& self_, + endpoint_type const& ep_, + socket_type&& sock_) + : id([] + { + static std::atomic n{0}; + return ++n; + }()) + , ep(ep_) + , self(self_) + , work(sock_.get_io_service()) + , sock(std::move(sock_)) + { + } + + void operator()() + { + self.do_peer(id, ep, std::move(sock)); + } + }; + std::thread{lambda{*this, ep_, std::move(sock_)}}.detach(); + acceptor_.async_accept(sock_, ep_, + std::bind(&sync_echo_server::on_accept, this, + beast::asio::placeholders::error)); + } + + void + do_peer(std::size_t id, + endpoint_type const& ep, socket_type&& sock) + { + using boost::asio::buffer; + using boost::asio::buffer_copy; + beast::websocket::stream< + socket_type> ws{std::move(sock)}; + opts_.set_options(ws); + error_code ec; + ws.accept(ec); + if(ec) + { + fail("accept", ec, id, ep); + return; + } + for(;;) + { + beast::websocket::opcode op; + beast::streambuf sb; + ws.read(op, sb, ec); + if(ec) + { + auto const s = ec.message(); + break; + } + ws.set_option(beast::websocket::message_type{op}); + ws.write(sb.data(), ec); + if(ec) + break; + } + if(ec && ec != beast::websocket::error::closed) + { + fail("read", ec, id, ep); + } + } +}; + +} // websocket + +#endif diff --git a/extras/beast/test/sig_wait.hpp b/extras/beast/test/sig_wait.hpp index 5abcdec5..a1071bdf 100644 --- a/extras/beast/test/sig_wait.hpp +++ b/extras/beast/test/sig_wait.hpp @@ -9,8 +9,6 @@ #define BEAST_TEST_SIG_WAIT_HPP #include -#include -#include namespace beast { namespace test { diff --git a/test/Jamfile b/test/Jamfile index a023b5fd..5ffb5d59 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -83,10 +83,6 @@ unit-test websocket-tests : websocket/utf8_checker.cpp ; -exe websocket-echo : - websocket/websocket_echo.cpp - ; - unit-test zlib-tests : ../extras/beast/unit_test/main.cpp zlib/zlib-1.2.8/adler32.c diff --git a/test/websocket/CMakeLists.txt b/test/websocket/CMakeLists.txt index 5e8e8ce7..9a3260c7 100644 --- a/test/websocket/CMakeLists.txt +++ b/test/websocket/CMakeLists.txt @@ -8,7 +8,6 @@ add_executable (websocket-tests ${BEAST_INCLUDES} ${EXTRAS_INCLUDES} ../../extras/beast/unit_test/main.cpp - options_set.hpp websocket_async_echo_server.hpp websocket_sync_echo_server.hpp error.cpp @@ -28,16 +27,3 @@ endif() if (MINGW) set_target_properties(websocket-tests PROPERTIES COMPILE_FLAGS "-Wa,-mbig-obj -Og") endif() - -add_executable (websocket-echo - ${BEAST_INCLUDES} - ${EXTRAS_INCLUDES} - options_set.hpp - websocket_async_echo_server.hpp - websocket_sync_echo_server.hpp - websocket_echo.cpp -) - -if (NOT WIN32) - target_link_libraries(websocket-echo ${Boost_LIBRARIES} Threads::Threads) -endif() diff --git a/test/websocket/options_set.hpp b/test/websocket/options_set.hpp deleted file mode 100644 index c173b286..00000000 --- a/test/websocket/options_set.hpp +++ /dev/null @@ -1,99 +0,0 @@ -// -// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#ifndef BEAST_WEBSOCKET_OPTIONS_SET_HPP -#define BEAST_WEBSOCKET_OPTIONS_SET_HPP - -#include -#include -#include -#include -#include -#include - -namespace beast { -namespace websocket { - -/** A container of type-erased option setters. -*/ -template -class options_set -{ - // workaround for std::function bug in msvc - struct callable - { - virtual ~callable() = default; - virtual void operator()( - beast::websocket::stream&) = 0; - }; - - template - class callable_impl : public callable - { - T t_; - - public: - template - callable_impl(U&& u) - : t_(std::forward(u)) - { - } - - void - operator()(beast::websocket::stream& ws) - { - t_(ws); - } - }; - - template - class lambda - { - Opt opt_; - - public: - lambda(lambda&&) = default; - lambda(lambda const&) = default; - - lambda(Opt const& opt) - : opt_(opt) - { - } - - void - operator()(beast::websocket::stream& ws) const - { - ws.set_option(opt_); - } - }; - - std::unordered_map> list_; - -public: - template - void - set_option(Opt const& opt) - { - std::unique_ptr p; - p.reset(new callable_impl>{opt}); - list_[std::type_index{ - typeid(Opt)}] = std::move(p); - } - - void - set_options(beast::websocket::stream& ws) - { - for(auto const& op : list_) - (*op.second)(ws); - } -}; - -} // websocket -} // beast - -#endif diff --git a/test/websocket/stream.cpp b/test/websocket/stream.cpp index 17ddf16e..63256766 100644 --- a/test/websocket/stream.cpp +++ b/test/websocket/stream.cpp @@ -1250,7 +1250,10 @@ public: testBadResponses(); { - sync_echo_server server{nullptr, any}; + error_code ec; + ::websocket::sync_echo_server server{nullptr}; + server.open(any, ec); + BEAST_EXPECTS(! ec, ec.message()); auto const ep = server.local_endpoint(); //testInvokable1(ep); testInvokable2(ep); @@ -1262,7 +1265,7 @@ public: { error_code ec; - async_echo_server server{nullptr, 4}; + ::websocket::async_echo_server server{nullptr, 4}; server.open(any, ec); BEAST_EXPECTS(! ec, ec.message()); auto const ep = server.local_endpoint(); @@ -1273,8 +1276,11 @@ public: [this, any](permessage_deflate const& pmd) { { - sync_echo_server server{nullptr, any}; + error_code ec; + ::websocket::sync_echo_server server{nullptr}; server.set_option(pmd); + server.open(any, ec); + BEAST_EXPECTS(! ec, ec.message()); auto const ep = server.local_endpoint(); testEndpoint(SyncClient{}, ep, pmd); yield_to( @@ -1286,7 +1292,7 @@ public: } { error_code ec; - async_echo_server server{nullptr, 4}; + ::websocket::async_echo_server server{nullptr, 4}; server.set_option(pmd); server.open(any, ec); BEAST_EXPECTS(! ec, ec.message()); diff --git a/test/websocket/websocket_async_echo_server.hpp b/test/websocket/websocket_async_echo_server.hpp index 6114628a..61f0b9f8 100644 --- a/test/websocket/websocket_async_echo_server.hpp +++ b/test/websocket/websocket_async_echo_server.hpp @@ -5,55 +5,137 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#ifndef BEAST_WEBSOCKET_ASYNC_ECHO_PEER_H_INCLUDED -#define BEAST_WEBSOCKET_ASYNC_ECHO_PEER_H_INCLUDED +#ifndef BEAST_WEBSOCKET_ASYNC_ECHO_SERVER_HPP +#define BEAST_WEBSOCKET_ASYNC_ECHO_SERVER_HPP -#include "options_set.hpp" #include #include -#include +#include #include #include +#include #include -#include #include -#include - +#include #include +#include +#include +#include +#include +#include +#include -namespace beast { namespace websocket { -// Asynchronous WebSocket echo client/server -// +/** Asynchronous WebSocket echo client/server +*/ class async_echo_server { public: - using endpoint_type = boost::asio::ip::tcp::endpoint; + using error_code = beast::error_code; using address_type = boost::asio::ip::address; using socket_type = boost::asio::ip::tcp::socket; + using endpoint_type = boost::asio::ip::tcp::endpoint; private: struct identity { template void - operator()(http::message& req) const + operator()(beast::http::message< + true, Body, Fields>& req) const { req.fields.replace("User-Agent", "async_echo_client"); } template void - operator()(http::message& resp) const + operator()(beast::http::message< + false, Body, Fields>& resp) const { resp.fields.replace("Server", "async_echo_server"); } }; + /** A container of type-erased option setters. + */ + template + class options_set + { + // workaround for std::function bug in msvc + struct callable + { + virtual ~callable() = default; + virtual void operator()( + beast::websocket::stream&) = 0; + }; + + template + class callable_impl : public callable + { + T t_; + + public: + template + callable_impl(U&& u) + : t_(std::forward(u)) + { + } + + void + operator()(beast::websocket::stream& ws) + { + t_(ws); + } + }; + + template + class lambda + { + Opt opt_; + + public: + lambda(lambda&&) = default; + lambda(lambda const&) = default; + + lambda(Opt const& opt) + : opt_(opt) + { + } + + void + operator()(beast::websocket::stream& ws) const + { + ws.set_option(opt_); + } + }; + + std::unordered_map> list_; + + public: + template + void + set_option(Opt const& opt) + { + std::unique_ptr p; + p.reset(new callable_impl>{opt}); + list_[std::type_index{ + typeid(Opt)}] = std::move(p); + } + + void + set_options(beast::websocket::stream& ws) + { + for(auto const& op : list_) + (*op.second)(ws); + } + }; + std::ostream* log_; boost::asio::io_service ios_; socket_type sock_; + endpoint_type ep_; boost::asio::ip::tcp::acceptor acceptor_; std::vector thread_; boost::optional work_; @@ -63,6 +145,13 @@ public: async_echo_server(async_echo_server const&) = delete; async_echo_server& operator=(async_echo_server const&) = delete; + /** Constructor. + + @param log A pointer to a stream to log to, or `nullptr` + to disable logging. + + @param threads The number of threads in the io_service. + */ async_echo_server(std::ostream* log, std::size_t threads) : log_(log) @@ -78,6 +167,8 @@ public: [&]{ ios_.run(); }); } + /** Destructor. + */ ~async_echo_server() { work_ = boost::none; @@ -88,6 +179,20 @@ public: t.join(); } + /** Return the listening endpoint. + */ + endpoint_type + local_endpoint() const + { + return acceptor_.local_endpoint(); + } + + /** Set a websocket option. + + The option will be applied to all new connections. + + @param opt The option to apply. + */ template void set_option(Opt const& opt) @@ -95,84 +200,77 @@ public: opts_.set_option(opt); } + /** Open a listening port. + + @param ep The address and port to bind to. + + @param ec Set to the error, if any occurred. + */ void open(endpoint_type const& ep, error_code& ec) { acceptor_.open(ep.protocol(), ec); if(ec) - { - if(log_) - (*log_) << "open: " << ec.message() << std::endl; - return; - } + return fail("open", ec); acceptor_.set_option( boost::asio::socket_base::reuse_address{true}); acceptor_.bind(ep, ec); if(ec) - { - if(log_) - (*log_) << "bind: " << ec.message() << std::endl; - return; - } + return fail("bind", ec); acceptor_.listen( boost::asio::socket_base::max_connections, ec); if(ec) - { - if(log_) - (*log_) << "listen: " << ec.message() << std::endl; - return; - } - acceptor_.async_accept(sock_, + return fail("listen", ec); + acceptor_.async_accept(sock_, ep_, std::bind(&async_echo_server::on_accept, this, beast::asio::placeholders::error)); } - endpoint_type - local_endpoint() const - { - return acceptor_.local_endpoint(); - } - private: - class Peer + class peer { struct data { async_echo_server& server; + endpoint_type ep; int state = 0; - stream ws; + beast::websocket::stream ws; boost::asio::io_service::strand strand; - opcode op; + beast::websocket::opcode op; beast::streambuf db; - int id; + std::size_t id; data(async_echo_server& server_, + endpoint_type const& ep_, socket_type&& sock_) : server(server_) + , ep(ep_) , ws(std::move(sock_)) , strand(ws.get_io_service()) , id([] { - static int n = 0; + static std::atomic n{0}; return ++n; }()) { } }; + // VFALCO This could be unique_ptr in [Net.TS] std::shared_ptr d_; public: - Peer(Peer&&) = default; - Peer(Peer const&) = default; - Peer& operator=(Peer&&) = delete; - Peer& operator=(Peer const&) = delete; + peer(peer&&) = default; + peer(peer const&) = default; + peer& operator=(peer&&) = delete; + peer& operator=(peer const&) = delete; template explicit - Peer(async_echo_server& server, - socket_type&& sock, Args&&... args) - : d_(std::make_shared(server, + peer(async_echo_server& server, + endpoint_type const& ep, socket_type&& sock, + Args&&... args) + : d_(std::make_shared(server, ep, std::forward(sock), std::forward(args)...)) { @@ -196,7 +294,7 @@ private: using boost::asio::buffer_copy; if(db.size() < N-1) return false; - static_string t; + beast::static_string t; t.resize(N-1); buffer_copy(buffer(t.data(), t.size()), db.data()); @@ -221,12 +319,12 @@ private: // did accept case 0: if(ec) - return fail(ec, "async_accept"); + return fail("async_accept", ec); // start case 1: if(ec) - return fail(ec, "async_handshake"); + return fail("async_handshake", ec); d.db.consume(d.db.size()); // read message d.state = 2; @@ -236,10 +334,10 @@ private: // got message case 2: - if(ec == error::closed) + if(ec == beast::websocket::error::closed) return; if(ec) - return fail(ec, "async_read"); + return fail("async_read", ec); if(match(d.db, "RAW")) { d.state = 1; @@ -250,14 +348,16 @@ private: else if(match(d.db, "TEXT")) { d.state = 1; - d.ws.set_option(message_type{opcode::text}); + d.ws.set_option( + beast::websocket::message_type{ + beast::websocket::opcode::text}); d.ws.async_write( d.db.data(), d.strand.wrap(std::move(*this))); return; } else if(match(d.db, "PING")) { - ping_data payload; + beast::websocket::ping_data payload; d.db.consume(buffer_copy( buffer(payload.data(), payload.size()), d.db.data())); @@ -275,7 +375,8 @@ private: } // write message d.state = 1; - d.ws.set_option(message_type(d.op)); + d.ws.set_option( + beast::websocket::message_type(d.op)); d.ws.async_write(d.db.data(), d.strand.wrap(std::move(*this))); return; @@ -284,33 +385,26 @@ private: private: void - fail(error_code ec, std::string what) + fail(std::string what, error_code ec) { auto& d = *d_; if(d.server.log_) - { - if(ec != error::closed) - (*d.server.log_) << "#" << d.id << " " << - what << ": " << ec.message() << std::endl; - } + if(ec != beast::websocket::error::closed) + d.server.fail("[#" + std::to_string(d.id) + + " " + boost::lexical_cast(d.ep) + + "] " + what, ec); } }; void - fail(error_code ec, std::string what) + fail(std::string what, error_code ec) { if(log_) + { + static std::mutex m; + std::lock_guard lock{m}; (*log_) << what << ": " << ec.message() << std::endl; - } - - void - maybe_throw(error_code ec, std::string what) - { - if(ec) - { - fail(ec, what); - throw ec; } } @@ -321,16 +415,15 @@ private: return; if(ec == boost::asio::error::operation_aborted) return; - maybe_throw(ec, "accept"); - socket_type sock(std::move(sock_)); - acceptor_.async_accept(sock_, + if(ec) + fail("accept", ec); + peer{*this, ep_, std::move(sock_)}; + acceptor_.async_accept(sock_, ep_, std::bind(&async_echo_server::on_accept, this, beast::asio::placeholders::error)); - Peer{*this, std::move(sock)}; } }; } // websocket -} // beast #endif diff --git a/test/websocket/websocket_echo.cpp b/test/websocket/websocket_echo.cpp deleted file mode 100644 index 514c7913..00000000 --- a/test/websocket/websocket_echo.cpp +++ /dev/null @@ -1,44 +0,0 @@ -// -// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#include "websocket_async_echo_server.hpp" -#include "websocket_sync_echo_server.hpp" -#include -#include - -int main() -{ - using namespace beast::websocket; - using endpoint_type = boost::asio::ip::tcp::endpoint; - using address_type = boost::asio::ip::address; - - try - { - permessage_deflate pmd; - pmd.client_enable = true; - pmd.server_enable = true; - - beast::error_code ec; - async_echo_server s1{nullptr, 1}; - s1.open(endpoint_type{ - address_type::from_string("127.0.0.1"), 6000 }, ec); - s1.set_option(read_message_max{64 * 1024 * 1024}); - s1.set_option(auto_fragment{false}); - s1.set_option(pmd); - - beast::websocket::sync_echo_server s2(&std::cout, endpoint_type{ - address_type::from_string("127.0.0.1"), 6001 }); - s2.set_option(read_message_max{64 * 1024 * 1024}); - s2.set_option(pmd); - - beast::test::sig_wait(); - } - catch(std::exception const& e) - { - std::cout << "Error: " << e.what() << std::endl; - } -} diff --git a/test/websocket/websocket_sync_echo_server.hpp b/test/websocket/websocket_sync_echo_server.hpp index c83bc554..adaa1633 100644 --- a/test/websocket/websocket_sync_echo_server.hpp +++ b/test/websocket/websocket_sync_echo_server.hpp @@ -5,28 +5,34 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#ifndef BEAST_WEBSOCKET_SYNC_ECHO_PEER_H_INCLUDED -#define BEAST_WEBSOCKET_SYNC_ECHO_PEER_H_INCLUDED +#ifndef BEAST_WEBSOCKET_SYNC_ECHO_SERVER_HPP +#define BEAST_WEBSOCKET_SYNC_ECHO_SERVER_HPP -#include "options_set.hpp" #include #include #include #include #include +#include #include -#include #include +#include +#include +#include #include +#include +#include +#include +#include -namespace beast { namespace websocket { -// Synchronous WebSocket echo client/server -// +/** Synchronous WebSocket echo client/server +*/ class sync_echo_server { public: + using error_code = beast::error_code; using endpoint_type = boost::asio::ip::tcp::endpoint; using address_type = boost::asio::ip::address; using socket_type = boost::asio::ip::tcp::socket; @@ -36,64 +42,146 @@ private: { template void - operator()(http::message& req) const + operator()(beast::http::message< + true, Body, Fields>& req) const { req.fields.replace("User-Agent", "sync_echo_client"); } template void - operator()(http::message& resp) const + operator()(beast::http::message< + false, Body, Fields>& resp) const { resp.fields.replace("Server", "sync_echo_server"); } }; + /** A container of type-erased option setters. + */ + template + class options_set + { + // workaround for std::function bug in msvc + struct callable + { + virtual ~callable() = default; + virtual void operator()( + beast::websocket::stream&) = 0; + }; + + template + class callable_impl : public callable + { + T t_; + + public: + template + callable_impl(U&& u) + : t_(std::forward(u)) + { + } + + void + operator()(beast::websocket::stream& ws) + { + t_(ws); + } + }; + + template + class lambda + { + Opt opt_; + + public: + lambda(lambda&&) = default; + lambda(lambda const&) = default; + + lambda(Opt const& opt) + : opt_(opt) + { + } + + void + operator()(beast::websocket::stream& ws) const + { + ws.set_option(opt_); + } + }; + + std::unordered_map> list_; + + public: + template + void + set_option(Opt const& opt) + { + std::unique_ptr p; + p.reset(new callable_impl>{opt}); + list_[std::type_index{ + typeid(Opt)}] = std::move(p); + } + + void + set_options(beast::websocket::stream& ws) + { + for(auto const& op : list_) + (*op.second)(ws); + } + }; + std::ostream* log_; boost::asio::io_service ios_; socket_type sock_; + endpoint_type ep_; boost::asio::ip::tcp::acceptor acceptor_; std::thread thread_; options_set opts_; public: - sync_echo_server(std::ostream* log, endpoint_type ep) + /** Constructor. + + @param log A pointer to a stream to log to, or `nullptr` + to disable logging. + */ + sync_echo_server(std::ostream* log) : log_(log) , sock_(ios_) , acceptor_(ios_) { opts_.set_option( beast::websocket::decorate(identity{})); - error_code ec; - acceptor_.open(ep.protocol(), ec); - maybe_throw(ec, "open"); - acceptor_.set_option( - boost::asio::socket_base::reuse_address{true}); - acceptor_.bind(ep, ec); - maybe_throw(ec, "bind"); - acceptor_.listen( - boost::asio::socket_base::max_connections, ec); - maybe_throw(ec, "listen"); - acceptor_.async_accept(sock_, - std::bind(&sync_echo_server::on_accept, this, - beast::asio::placeholders::error)); - thread_ = std::thread{[&]{ ios_.run(); }}; } + /** Destructor. + */ ~sync_echo_server() { - error_code ec; - ios_.dispatch( - [&]{ acceptor_.close(ec); }); - thread_.join(); + if(thread_.joinable()) + { + error_code ec; + ios_.dispatch( + [&]{ acceptor_.close(ec); }); + thread_.join(); + } } + /** Return the listening endpoint. + */ endpoint_type local_endpoint() const { return acceptor_.local_endpoint(); } + /** Set a websocket option. + + The option will be applied to all new connections. + + @param opt The option to apply. + */ template void set_option(Opt const& opt) @@ -101,66 +189,96 @@ public: opts_.set_option(opt); } + /** Open a listening port. + + @param ep The address and port to bind to. + + @param ec Set to the error, if any occurred. + */ + void + open(endpoint_type const& ep, error_code& ec) + { + acceptor_.open(ep.protocol(), ec); + if(ec) + return fail("open", ec); + acceptor_.set_option( + boost::asio::socket_base::reuse_address{true}); + acceptor_.bind(ep, ec); + if(ec) + return fail("bind", ec); + acceptor_.listen( + boost::asio::socket_base::max_connections, ec); + if(ec) + return fail("listen", ec); + acceptor_.async_accept(sock_, ep_, + std::bind(&sync_echo_server::on_accept, this, + beast::asio::placeholders::error)); + thread_ = std::thread{[&]{ ios_.run(); }}; + } + private: void - fail(error_code ec, std::string what) + fail(std::string what, error_code ec) { if(log_) - *log_ << - what << ": " << ec.message() << std::endl; + { + static std::mutex m; + std::lock_guard lock{m}; + (*log_) << what << ": " << + ec.message() << std::endl; + } } void - fail(int id, error_code ec, std::string what) + fail(std::string what, error_code ec, + int id, endpoint_type const& ep) { if(log_) - *log_ << "#" << boost::lexical_cast(id) << " " << - what << ": " << ec.message() << std::endl; + if(ec != beast::websocket::error::closed) + fail("[#" + std::to_string(id) + " " + + boost::lexical_cast(ep) + + "] " + what, ec); } - void - maybe_throw(error_code ec, std::string what) - { - if(ec) - { - fail(ec, what); - throw ec; - } - } - - struct lambda - { - int id; - sync_echo_server& self; - boost::asio::io_service::work work; - // Must be destroyed before work otherwise the - // io_service could be destroyed before the socket. - socket_type sock; - - lambda(int id_, sync_echo_server& self_, - socket_type&& sock_) - : id(id_) - , self(self_) - , work(sock_.get_io_service()) - , sock(std::move(sock_)) - { - } - - void operator()() - { - self.do_peer(id, std::move(sock)); - } - }; - void on_accept(error_code ec) { if(ec == boost::asio::error::operation_aborted) return; - maybe_throw(ec, "accept"); - static int id_ = 0; - std::thread{lambda{++id_, *this, std::move(sock_)}}.detach(); - acceptor_.async_accept(sock_, + if(ec) + return fail("accept", ec); + struct lambda + { + std::size_t id; + endpoint_type ep; + sync_echo_server& self; + boost::asio::io_service::work work; + // Must be destroyed before work otherwise the + // io_service could be destroyed before the socket. + socket_type sock; + + lambda(sync_echo_server& self_, + endpoint_type const& ep_, + socket_type&& sock_) + : id([] + { + static std::atomic n{0}; + return ++n; + }()) + , ep(ep_) + , self(self_) + , work(sock_.get_io_service()) + , sock(std::move(sock_)) + { + } + + void operator()() + { + self.do_peer(id, ep, std::move(sock)); + } + }; + std::thread{lambda{*this, ep_, std::move(sock_)}}.detach(); + acceptor_.async_accept(sock_, ep_, std::bind(&sync_echo_server::on_accept, this, beast::asio::placeholders::error)); } @@ -174,7 +292,7 @@ private: using boost::asio::buffer_copy; if(db.size() < N-1) return false; - static_string t; + beast::static_string t; t.resize(N-1); buffer_copy(buffer(t.data(), t.size()), db.data()); @@ -185,22 +303,24 @@ private: } void - do_peer(int id, socket_type&& sock) + do_peer(std::size_t id, + endpoint_type const& ep, socket_type&& sock) { using boost::asio::buffer; using boost::asio::buffer_copy; - stream ws(std::move(sock)); + beast::websocket::stream< + socket_type> ws{std::move(sock)}; opts_.set_options(ws); error_code ec; ws.accept(ec); if(ec) { - fail(id, ec, "accept"); + fail("accept", ec, id, ep); return; } for(;;) { - opcode op; + beast::websocket::opcode op; beast::streambuf sb; ws.read(op, sb, ec); if(ec) @@ -208,7 +328,7 @@ private: auto const s = ec.message(); break; } - ws.set_option(message_type(op)); + ws.set_option(beast::websocket::message_type{op}); if(match(sb, "RAW")) { boost::asio::write( @@ -216,12 +336,14 @@ private: } else if(match(sb, "TEXT")) { - ws.set_option(message_type{opcode::text}); + ws.set_option( + beast::websocket::message_type{ + beast::websocket::opcode::text}); ws.write(sb.data(), ec); } else if(match(sb, "PING")) { - ping_data payload; + beast::websocket::ping_data payload; sb.consume(buffer_copy( buffer(payload.data(), payload.size()), sb.data())); @@ -238,14 +360,13 @@ private: if(ec) break; } - if(ec && ec != error::closed) + if(ec && ec != beast::websocket::error::closed) { - fail(id, ec, "read"); + fail("read", ec, id, ep); } } }; } // websocket -} // beast #endif