Better logging in async echo server

This commit is contained in:
Vinnie Falco
2016-11-02 17:34:53 -04:00
parent cfde28c3c3
commit 020a254b1b
3 changed files with 84 additions and 43 deletions

View File

@@ -1416,7 +1416,10 @@ public:
yield_to_mf(ep, &stream_test::testAsyncClient); yield_to_mf(ep, &stream_test::testAsyncClient);
} }
{ {
async_echo_server server(true, any, 4); error_code ec;
async_echo_server server{nullptr, 4};
server.open(true, any, ec);
BEAST_EXPECTS(! ec, ec.message());
auto const ep = server.local_endpoint(); auto const ep = server.local_endpoint();
testSyncClient(ep); testSyncClient(ep);
testAsyncWriteFrame(ep); testAsyncWriteFrame(ep);

View File

@@ -18,6 +18,8 @@
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <ostream>
namespace beast { namespace beast {
namespace websocket { namespace websocket {
@@ -31,38 +33,21 @@ public:
using socket_type = boost::asio::ip::tcp::socket; using socket_type = boost::asio::ip::tcp::socket;
private: private:
bool log_ = false; std::ostream* log_;
boost::asio::io_service ios_; boost::asio::io_service ios_;
socket_type sock_; socket_type sock_;
boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::acceptor acceptor_;
std::vector<std::thread> thread_; std::vector<std::thread> thread_;
boost::optional<boost::asio::io_service::work> work_;
public: public:
async_echo_server(bool server, async_echo_server(std::ostream* log,
endpoint_type const& ep, std::size_t threads) std::size_t threads)
: sock_(ios_) : log_(log)
, sock_(ios_)
, acceptor_(ios_) , acceptor_(ios_)
, work_(ios_)
{ {
if(server)
{
error_code ec;
acceptor_.open(ep.protocol(), ec);
maybe_throw(ec, "open");
acceptor_.set_option(
boost::asio::socket_base::reuse_address{true});
acceptor_.bind(ep, ec);
maybe_throw(ec, "bind");
acceptor_.listen(
boost::asio::socket_base::max_connections, ec);
maybe_throw(ec, "listen");
acceptor_.async_accept(sock_,
std::bind(&async_echo_server::on_accept, this,
beast::asio::placeholders::error));
}
else
{
Peer{log_, std::move(sock_), ep};
}
thread_.reserve(threads); thread_.reserve(threads);
for(std::size_t i = 0; i < threads; ++i) for(std::size_t i = 0; i < threads; ++i)
thread_.emplace_back( thread_.emplace_back(
@@ -71,6 +56,7 @@ public:
~async_echo_server() ~async_echo_server()
{ {
work_ = boost::none;
error_code ec; error_code ec;
ios_.dispatch( ios_.dispatch(
[&]{ acceptor_.close(ec); }); [&]{ acceptor_.close(ec); });
@@ -78,6 +64,46 @@ public:
t.join(); t.join();
} }
void
open(bool server,
endpoint_type const& ep, error_code& ec)
{
if(server)
{
acceptor_.open(ep.protocol(), ec);
if(ec)
{
if(log_)
(*log_) << "open: " << ec.message() << std::endl;
return;
}
acceptor_.set_option(
boost::asio::socket_base::reuse_address{true});
acceptor_.bind(ep, ec);
if(ec)
{
if(log_)
(*log_) << "bind: " << ec.message() << std::endl;
return;
}
acceptor_.listen(
boost::asio::socket_base::max_connections, ec);
if(ec)
{
if(log_)
(*log_) << "listen: " << ec.message() << std::endl;
return;
}
acceptor_.async_accept(sock_,
std::bind(&async_echo_server::on_accept, this,
beast::asio::placeholders::error));
}
else
{
Peer{*this, std::move(sock_), ep};
}
}
endpoint_type endpoint_type
local_endpoint() const local_endpoint() const
{ {
@@ -89,7 +115,7 @@ private:
{ {
struct data struct data
{ {
bool log; async_echo_server& server;
int state = 0; int state = 0;
boost::optional<endpoint_type> ep; boost::optional<endpoint_type> ep;
stream<socket_type> ws; stream<socket_type> ws;
@@ -98,8 +124,9 @@ private:
beast::streambuf db; beast::streambuf db;
int id; int id;
data(bool log_, socket_type&& sock_) data(async_echo_server& server_,
: log(log_) socket_type&& sock_)
: server(server_)
, ws(std::move(sock_)) , ws(std::move(sock_))
, strand(ws.get_io_service()) , strand(ws.get_io_service())
, id([] , id([]
@@ -110,9 +137,9 @@ private:
{ {
} }
data(bool log_, socket_type&& sock_, data(async_echo_server& server_,
endpoint_type const& ep_) socket_type&& sock_, endpoint_type const& ep_)
: log(log_) : server(server_)
, ep(ep_) , ep(ep_)
, ws(std::move(sock_)) , ws(std::move(sock_))
, strand(ws.get_io_service()) , strand(ws.get_io_service())
@@ -152,15 +179,16 @@ private:
template<class... Args> template<class... Args>
explicit explicit
Peer(bool log, socket_type&& sock, Args&&... args) Peer(async_echo_server& server,
: d_(std::make_shared<data>(log, socket_type&& sock, Args&&... args)
: d_(std::make_shared<data>(server,
std::forward<socket_type>(sock), std::forward<socket_type>(sock),
std::forward<Args>(args)...)) std::forward<Args>(args)...))
{ {
auto& d = *d_; auto& d = *d_;
d.ws.set_option(decorate(identity{})); d.ws.set_option(decorate(identity{}));
d.ws.set_option(read_message_max(64 * 1024 * 1024)); d.ws.set_option(read_message_max(64 * 1024 * 1024));
//d.ws.set_option(auto_fragment{false}); d.ws.set_option(auto_fragment{false});
//d.ws.set_option(write_buffer_size{64 * 1024}); //d.ws.set_option(write_buffer_size{64 * 1024});
run(); run();
} }
@@ -291,10 +319,10 @@ private:
fail(error_code ec, std::string what) fail(error_code ec, std::string what)
{ {
auto& d = *d_; auto& d = *d_;
if(d.log) if(d.server.log_)
{ {
if(ec != error::closed) if(ec != error::closed)
std::cerr << "#" << d_->id << " " << (*d.server.log_) << "#" << d.id << " " <<
what << ": " << ec.message() << std::endl; what << ": " << ec.message() << std::endl;
} }
} }
@@ -304,7 +332,7 @@ private:
fail(error_code ec, std::string what) fail(error_code ec, std::string what)
{ {
if(log_) if(log_)
std::cerr << what << ": " << (*log_) << what << ": " <<
ec.message() << std::endl; ec.message() << std::endl;
} }
@@ -330,7 +358,7 @@ private:
acceptor_.async_accept(sock_, acceptor_.async_accept(sock_,
std::bind(&async_echo_server::on_accept, this, std::bind(&async_echo_server::on_accept, this,
beast::asio::placeholders::error)); beast::asio::placeholders::error));
Peer{false, std::move(sock)}; Peer{*this, std::move(sock)};
} }
}; };

View File

@@ -8,17 +8,27 @@
#include "websocket_async_echo_server.hpp" #include "websocket_async_echo_server.hpp"
#include "websocket_sync_echo_server.hpp" #include "websocket_sync_echo_server.hpp"
#include <beast/test/sig_wait.hpp> #include <beast/test/sig_wait.hpp>
#include <iostream>
int main() int main()
{ {
using endpoint_type = boost::asio::ip::tcp::endpoint; using endpoint_type = boost::asio::ip::tcp::endpoint;
using address_type = boost::asio::ip::address; using address_type = boost::asio::ip::address;
beast::websocket::async_echo_server s1(true, endpoint_type{ try
address_type::from_string("127.0.0.1"), 6000 }, 4); {
boost::system::error_code ec;
beast::websocket::async_echo_server s1{nullptr, 1};
s1.open(true, endpoint_type{
address_type::from_string("127.0.0.1"), 6000 }, ec);
beast::websocket::sync_echo_server s2(true, endpoint_type{ beast::websocket::sync_echo_server s2(true, endpoint_type{
address_type::from_string("127.0.0.1"), 6001 }); address_type::from_string("127.0.0.1"), 6001 });
beast::test::sig_wait(); beast::test::sig_wait();
}
catch(std::exception const& e)
{
std::cout << "Error: " << e.what() << std::endl;
}
} }