// // Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef WEBSOCKET_ASYNC_ECHO_SERVER_HPP #define WEBSOCKET_ASYNC_ECHO_SERVER_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace websocket { /** Asynchronous WebSocket echo client/server */ class async_echo_server { public: using error_code = beast::error_code; using address_type = boost::asio::ip::address; using socket_type = boost::asio::ip::tcp::socket; using endpoint_type = boost::asio::ip::tcp::endpoint; private: /** A container of type-erased option setters. */ template class options_set { // workaround for std::function bug in msvc struct callable { virtual ~callable() = default; virtual void operator()( beast::websocket::stream&) = 0; }; template class callable_impl : public callable { T t_; public: template callable_impl(U&& u) : t_(std::forward(u)) { } void operator()(beast::websocket::stream& ws) { t_(ws); } }; template class lambda { Opt opt_; public: lambda(lambda&&) = default; lambda(lambda const&) = default; lambda(Opt const& opt) : opt_(opt) { } void operator()(beast::websocket::stream& ws) const { ws.set_option(opt_); } }; std::unordered_map> list_; public: template void set_option(Opt const& opt) { std::unique_ptr p; p.reset(new callable_impl>{opt}); list_[std::type_index{ typeid(Opt)}] = std::move(p); } void set_options(beast::websocket::stream& ws) { for(auto const& op : list_) (*op.second)(ws); } }; std::ostream* log_; boost::asio::io_service ios_; socket_type sock_; endpoint_type ep_; boost::asio::ip::tcp::acceptor acceptor_; std::vector thread_; boost::optional work_; options_set opts_; public: async_echo_server(async_echo_server const&) = delete; async_echo_server& operator=(async_echo_server const&) = delete; /** Constructor. @param log A pointer to a stream to log to, or `nullptr` to disable logging. @param threads The number of threads in the io_service. */ async_echo_server(std::ostream* log, std::size_t threads) : log_(log) , sock_(ios_) , acceptor_(ios_) , work_(ios_) { thread_.reserve(threads); for(std::size_t i = 0; i < threads; ++i) thread_.emplace_back( [&]{ ios_.run(); }); } /** Destructor. */ ~async_echo_server() { work_ = boost::none; error_code ec; ios_.dispatch( [&]{ acceptor_.close(ec); }); for(auto& t : thread_) t.join(); } /** Return the listening endpoint. */ endpoint_type local_endpoint() const { return acceptor_.local_endpoint(); } /** Set a websocket option. The option will be applied to all new connections. @param opt The option to apply. */ template void set_option(Opt const& opt) { opts_.set_option(opt); } /** Open a listening port. @param ep The address and port to bind to. @param ec Set to the error, if any occurred. */ void open(endpoint_type const& ep, error_code& ec) { acceptor_.open(ep.protocol(), ec); if(ec) return fail("open", ec); acceptor_.set_option( boost::asio::socket_base::reuse_address{true}); acceptor_.bind(ep, ec); if(ec) return fail("bind", ec); acceptor_.listen( boost::asio::socket_base::max_connections, ec); if(ec) return fail("listen", ec); acceptor_.async_accept(sock_, ep_, std::bind(&async_echo_server::on_accept, this, std::placeholders::_1)); } private: class peer { struct data { async_echo_server& server; endpoint_type ep; int state = 0; beast::websocket::stream ws; boost::asio::io_service::strand strand; beast::websocket::opcode op; beast::multi_buffer db; std::size_t id; data(async_echo_server& server_, endpoint_type const& ep_, socket_type&& sock_) : server(server_) , ep(ep_) , ws(std::move(sock_)) , strand(ws.get_io_service()) , id([] { static std::atomic n{0}; return ++n; }()) { } }; // VFALCO This could be unique_ptr in [Net.TS] std::shared_ptr d_; public: peer(peer&&) = default; peer(peer const&) = default; peer& operator=(peer&&) = delete; peer& operator=(peer const&) = delete; template explicit peer(async_echo_server& server, endpoint_type const& ep, socket_type&& sock, Args&&... args) : d_(std::make_shared(server, ep, std::forward(sock), std::forward(args)...)) { auto& d = *d_; d.server.opts_.set_options(d.ws); run(); } void run() { auto& d = *d_; d.ws.async_accept_ex( [](beast::websocket::response_type& res) { res.fields.insert( "Server", "async_echo_server"); }, std::move(*this)); } void operator()(error_code ec, std::size_t) { (*this)(ec); } void operator()(error_code ec) { using boost::asio::buffer; using boost::asio::buffer_copy; auto& d = *d_; switch(d.state) { // did accept case 0: if(ec) return fail("async_accept", ec); // start case 1: if(ec) return fail("async_handshake", ec); d.db.consume(d.db.size()); // read message d.state = 2; d.ws.async_read(d.op, d.db, d.strand.wrap(std::move(*this))); return; // got message case 2: if(ec == beast::websocket::error::closed) return; if(ec) return fail("async_read", ec); // write message d.state = 1; d.ws.set_option( beast::websocket::message_type(d.op)); d.ws.async_write(d.db.data(), d.strand.wrap(std::move(*this))); return; } } private: void fail(std::string what, error_code ec) { auto& d = *d_; if(d.server.log_) if(ec != beast::websocket::error::closed) d.server.fail("[#" + std::to_string(d.id) + " " + boost::lexical_cast(d.ep) + "] " + what, ec); } }; void fail(std::string what, error_code ec) { if(log_) { static std::mutex m; std::lock_guard lock{m}; (*log_) << what << ": " << ec.message() << std::endl; } } void on_accept(error_code ec) { if(! acceptor_.is_open()) return; if(ec == boost::asio::error::operation_aborted) return; if(ec) fail("accept", ec); peer{*this, ep_, std::move(sock_)}; acceptor_.async_accept(sock_, ep_, std::bind(&async_echo_server::on_accept, this, std::placeholders::_1)); } }; } // websocket #endif