Add wstest benchmark tool

This commit is contained in:
Vinnie Falco
2017-07-14 17:35:37 -07:00
parent 94e92e75b7
commit bd5ab6ffd0
7 changed files with 358 additions and 52 deletions

View File

@ -9,6 +9,7 @@ WebSocket
* Add rd_close_ to websocket stream state
* stream uses flat_buffer
* accept requires a message
* Add wstest benchmark tool
API Changes:

View File

@ -202,58 +202,7 @@ class server
//
buffer_.consume(buffer_.size());
// This shows how the server can close the
// connection. Alternatively we could call
// do_read again and the connection would
// stay open until the other side closes it.
//
do_close();
}
// Sends a websocket close frame
void do_close()
{
// Put the close frame on the timer
timer_.expires_from_now(std::chrono::seconds(15));
// Send the close frame
ws_.async_close({},
strand_.wrap(std::bind(
&connection::on_close,
shared_from_this(),
std::placeholders::_1)));
}
// Called when writing the close frame completes
void on_close(error_code ec)
{
if(ec)
return fail("close", ec);
on_drain({});
}
// Read and discard any leftover message data
void on_drain(error_code ec)
{
if(ec == websocket::error::closed)
{
// the connection has been closed gracefully
return;
}
if(ec)
return fail("drain", ec);
// WebSocket says that to close a connection you have
// to keep reading messages until you receive a close frame.
// Beast delivers the close frame as an error from read.
//
ws_.async_read(drain_,
strand_.wrap(std::bind(
&connection::on_drain,
shared_from_this(),
std::placeholders::_1)));
do_read();
}
// Pretty-print an error to the log

View File

@ -3,6 +3,7 @@
add_subdirectory (core)
add_subdirectory (http)
add_subdirectory (websocket)
add_subdirectory (wstest)
add_subdirectory (zlib)
if ((NOT "${VARIANT}" STREQUAL "coverage") AND

View File

@ -34,4 +34,5 @@ build-project core ;
build-project http ;
build-project server ;
build-project websocket ;
build-project wstest ;
build-project zlib ;

View File

@ -0,0 +1,17 @@
# Part of Beast
GroupSources(include/beast beast)
GroupSources(example/common common)
GroupSources(extras/beast extras)
GroupSources(test/wstest "/")
add_executable (wstest
${BEAST_INCLUDES}
${COMMON_INCLUDES}
${EXTRAS_INCLUDES}
main.cpp
)
target_link_libraries(wstest
Beast
)

10
test/wstest/Jamfile Normal file
View File

@ -0,0 +1,10 @@
#
# Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#
exe wstest :
main.cpp
;

327
test/wstest/main.cpp Normal file
View File

@ -0,0 +1,327 @@
//
// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <example/common/helpers.hpp>
#include <example/common/session_alloc.hpp>
#include <beast/core.hpp>
#include <beast/websocket.hpp>
#include <beast/unit_test/dstream.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <random>
namespace asio = boost::asio;
namespace ip = boost::asio::ip;
using tcp = boost::asio::ip::tcp;
namespace ws = beast::websocket;
namespace ph = std::placeholders;
using error_code = beast::error_code;
class test_buffer : public asio::const_buffers_1
{
char data_[4096];
public:
test_buffer()
: asio::const_buffers_1(data_, sizeof(data_))
{
std::mt19937_64 rng;
std::uniform_int_distribution<unsigned short> dist;
for(auto& c : data_)
c = static_cast<unsigned char>(dist(rng));
}
};
class report
{
std::mutex m_;
std::size_t bytes_ = 0;
std::size_t messages_ = 0;
public:
void
insert(std::size_t messages, std::size_t bytes)
{
std::lock_guard<std::mutex> lock(m_);
bytes_ += bytes;
messages_ += messages;
}
std::size_t
bytes() const
{
return bytes_;
}
std::size_t
messages() const
{
return messages_;
}
};
class connection
: public std::enable_shared_from_this<connection>
{
std::ostream& log_;
ws::stream<tcp::socket> ws_;
tcp::endpoint ep_;
std::size_t messages_;
report& rep_;
test_buffer const& tb_;
asio::io_service::strand strand_;
beast::multi_buffer buffer_;
std::mt19937_64 rng_;
std::size_t count_ = 0;
std::size_t bytes_ = 0;
session_alloc<char> alloc_;
public:
connection(
std::ostream& log,
asio::io_service& ios,
tcp::endpoint const& ep,
std::size_t messages,
report& rep,
test_buffer const& tb)
: log_(log)
, ws_(ios)
, ep_(ep)
, messages_(messages)
, rep_(rep)
, tb_(tb)
, strand_(ios)
{
ws::permessage_deflate pmd;
pmd.client_enable = false;
ws_.set_option(pmd);
ws_.binary(true);
}
~connection()
{
rep_.insert(count_, bytes_);
}
void
run()
{
ws_.next_layer().async_connect(ep_,
alloc_.wrap(std::bind(
&connection::on_connect,
shared_from_this(),
ph::_1)));
}
private:
void
fail(beast::string_view what, error_code ec)
{
if( ec == asio::error::operation_aborted ||
ec == ws::error::closed)
return;
print(log_, "[", ep_, "] ", what, ": ", ec.message());
}
void
on_connect(error_code ec)
{
if(ec)
return fail("on_connect", ec);
ws_.async_handshake(
boost::lexical_cast<std::string>(ep_),
"/",
alloc_.wrap(std::bind(
&connection::on_handshake,
shared_from_this(),
ph::_1)));
}
void
on_handshake(error_code ec)
{
if(ec)
return fail("on_connect", ec);
do_write();
}
void
do_write()
{
std::geometric_distribution<std::size_t> dist{
double(4) / boost::asio::buffer_size(tb_)};
ws_.async_write_frame(true,
beast::buffer_prefix(dist(rng_), tb_),
alloc_.wrap(std::bind(
&connection::on_write,
shared_from_this(),
ph::_1)));
}
void
on_write(error_code ec)
{
if(ec)
return fail("on_read", ec);
if(messages_--)
return do_read();
ws_.async_close({},
alloc_.wrap(std::bind(
&connection::on_close,
shared_from_this(),
ph::_1)));
}
void
do_read()
{
ws_.async_read(buffer_,
alloc_.wrap(std::bind(
&connection::on_read,
shared_from_this(),
ph::_1)));
}
void
on_read(error_code ec)
{
if(ec)
return fail("on_read", ec);
++count_;
bytes_ += buffer_.size();
buffer_.consume(buffer_.size());
do_write();
}
void
on_close(error_code ec)
{
if(ec)
return fail("on_close", ec);
do_drain();
}
void
do_drain()
{
ws_.async_read(buffer_,
alloc_.wrap(std::bind(
&connection::on_drain,
shared_from_this(),
ph::_1)));
}
void
on_drain(error_code ec)
{
if(ec)
return fail("on_drain", ec);
do_drain();
}
};
class timer
{
using clock_type =
std::chrono::system_clock;
clock_type::time_point when_;
public:
using duration =
clock_type::duration;
timer()
: when_(clock_type::now())
{
}
duration
elapsed() const
{
return clock_type::now() - when_;
}
};
inline
std::uint64_t
throughput(
std::chrono::duration<double> const& elapsed,
std::uint64_t items)
{
using namespace std::chrono;
return static_cast<std::uint64_t>(
1 / (elapsed/items).count());
}
int
main(int argc, char** argv)
{
beast::unit_test::dstream dout(std::cerr);
try
{
// Check command line arguments.
if(argc != 6)
{
std::cerr <<
"Usage: " << argv[0] <<
" <address> <port> <trials> <messages> <workers>";
return EXIT_FAILURE;
}
auto const address = ip::address::from_string(argv[1]);
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
auto const trials = static_cast<std::size_t>(std::atoi(argv[3]));
auto const messages= static_cast<std::size_t>(std::atoi(argv[4]));
auto const workers = static_cast<std::size_t>(std::atoi(argv[5]));
auto const work = (messages + workers - 1) / workers;
test_buffer tb;
for(auto i = trials; i; --i)
{
report rep;
boost::asio::io_service ios{1};
for(auto j = workers; j; --j)
{
auto sp =
std::make_shared<connection>(
dout,
ios,
tcp::endpoint{address, port},
work,
rep,
tb);
sp->run();
}
timer t;
ios.run();
auto const elapsed = t.elapsed();
dout <<
throughput(elapsed, rep.bytes()) << " bytes/s in " <<
(std::chrono::duration_cast<
std::chrono::milliseconds>(
elapsed).count() / 1000.) << "ms and " <<
rep.bytes() << " bytes" << std::endl;
}
}
catch(std::exception const& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}