diff --git a/CHANGELOG.md b/CHANGELOG.md index d3dda2b0..76274d7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ WebSocket * Add rd_close_ to websocket stream state * stream uses flat_buffer * accept requires a message +* Add wstest benchmark tool API Changes: diff --git a/example/websocket-server-async/websocket_server_async.cpp b/example/websocket-server-async/websocket_server_async.cpp index 55978f0a..fa3acf11 100644 --- a/example/websocket-server-async/websocket_server_async.cpp +++ b/example/websocket-server-async/websocket_server_async.cpp @@ -202,58 +202,7 @@ class server // buffer_.consume(buffer_.size()); - // This shows how the server can close the - // connection. Alternatively we could call - // do_read again and the connection would - // stay open until the other side closes it. - // - do_close(); - } - - // Sends a websocket close frame - void do_close() - { - // Put the close frame on the timer - timer_.expires_from_now(std::chrono::seconds(15)); - - // Send the close frame - ws_.async_close({}, - strand_.wrap(std::bind( - &connection::on_close, - shared_from_this(), - std::placeholders::_1))); - } - - // Called when writing the close frame completes - void on_close(error_code ec) - { - if(ec) - return fail("close", ec); - - on_drain({}); - } - - // Read and discard any leftover message data - void on_drain(error_code ec) - { - if(ec == websocket::error::closed) - { - // the connection has been closed gracefully - return; - } - - if(ec) - return fail("drain", ec); - - // WebSocket says that to close a connection you have - // to keep reading messages until you receive a close frame. - // Beast delivers the close frame as an error from read. - // - ws_.async_read(drain_, - strand_.wrap(std::bind( - &connection::on_drain, - shared_from_this(), - std::placeholders::_1))); + do_read(); } // Pretty-print an error to the log diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d939ddbd..4d72a9ad 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,6 +3,7 @@ add_subdirectory (core) add_subdirectory (http) add_subdirectory (websocket) +add_subdirectory (wstest) add_subdirectory (zlib) if ((NOT "${VARIANT}" STREQUAL "coverage") AND diff --git a/test/Jamfile b/test/Jamfile index 01d339cc..9aba4abf 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -34,4 +34,5 @@ build-project core ; build-project http ; build-project server ; build-project websocket ; +build-project wstest ; build-project zlib ; diff --git a/test/wstest/CMakeLists.txt b/test/wstest/CMakeLists.txt new file mode 100644 index 00000000..26a14fb4 --- /dev/null +++ b/test/wstest/CMakeLists.txt @@ -0,0 +1,17 @@ +# Part of Beast + +GroupSources(include/beast beast) +GroupSources(example/common common) +GroupSources(extras/beast extras) +GroupSources(test/wstest "/") + +add_executable (wstest + ${BEAST_INCLUDES} + ${COMMON_INCLUDES} + ${EXTRAS_INCLUDES} + main.cpp + ) + +target_link_libraries(wstest + Beast + ) diff --git a/test/wstest/Jamfile b/test/wstest/Jamfile new file mode 100644 index 00000000..2feb22dc --- /dev/null +++ b/test/wstest/Jamfile @@ -0,0 +1,10 @@ +# +# Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# + +exe wstest : + main.cpp + ; diff --git a/test/wstest/main.cpp b/test/wstest/main.cpp new file mode 100644 index 00000000..8ea22ef6 --- /dev/null +++ b/test/wstest/main.cpp @@ -0,0 +1,327 @@ +// +// Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace asio = boost::asio; +namespace ip = boost::asio::ip; +using tcp = boost::asio::ip::tcp; +namespace ws = beast::websocket; +namespace ph = std::placeholders; +using error_code = beast::error_code; + +class test_buffer : public asio::const_buffers_1 +{ + char data_[4096]; + +public: + test_buffer() + : asio::const_buffers_1(data_, sizeof(data_)) + { + std::mt19937_64 rng; + std::uniform_int_distribution dist; + for(auto& c : data_) + c = static_cast(dist(rng)); + } +}; + +class report +{ + std::mutex m_; + std::size_t bytes_ = 0; + std::size_t messages_ = 0; + +public: + void + insert(std::size_t messages, std::size_t bytes) + { + std::lock_guard lock(m_); + bytes_ += bytes; + messages_ += messages; + } + + std::size_t + bytes() const + { + return bytes_; + } + + std::size_t + messages() const + { + return messages_; + } +}; + +class connection + : public std::enable_shared_from_this +{ + std::ostream& log_; + ws::stream ws_; + tcp::endpoint ep_; + std::size_t messages_; + report& rep_; + test_buffer const& tb_; + asio::io_service::strand strand_; + beast::multi_buffer buffer_; + std::mt19937_64 rng_; + std::size_t count_ = 0; + std::size_t bytes_ = 0; + session_alloc alloc_; + +public: + connection( + std::ostream& log, + asio::io_service& ios, + tcp::endpoint const& ep, + std::size_t messages, + report& rep, + test_buffer const& tb) + : log_(log) + , ws_(ios) + , ep_(ep) + , messages_(messages) + , rep_(rep) + , tb_(tb) + , strand_(ios) + { + ws::permessage_deflate pmd; + pmd.client_enable = false; + ws_.set_option(pmd); + ws_.binary(true); + } + + ~connection() + { + rep_.insert(count_, bytes_); + } + + void + run() + { + ws_.next_layer().async_connect(ep_, + alloc_.wrap(std::bind( + &connection::on_connect, + shared_from_this(), + ph::_1))); + } + +private: + void + fail(beast::string_view what, error_code ec) + { + if( ec == asio::error::operation_aborted || + ec == ws::error::closed) + return; + print(log_, "[", ep_, "] ", what, ": ", ec.message()); + } + + void + on_connect(error_code ec) + { + if(ec) + return fail("on_connect", ec); + ws_.async_handshake( + boost::lexical_cast(ep_), + "/", + alloc_.wrap(std::bind( + &connection::on_handshake, + shared_from_this(), + ph::_1))); + } + + void + on_handshake(error_code ec) + { + if(ec) + return fail("on_connect", ec); + do_write(); + } + + void + do_write() + { + std::geometric_distribution dist{ + double(4) / boost::asio::buffer_size(tb_)}; + ws_.async_write_frame(true, + beast::buffer_prefix(dist(rng_), tb_), + alloc_.wrap(std::bind( + &connection::on_write, + shared_from_this(), + ph::_1))); + } + + void + on_write(error_code ec) + { + if(ec) + return fail("on_read", ec); + if(messages_--) + return do_read(); + ws_.async_close({}, + alloc_.wrap(std::bind( + &connection::on_close, + shared_from_this(), + ph::_1))); + } + + void + do_read() + { + ws_.async_read(buffer_, + alloc_.wrap(std::bind( + &connection::on_read, + shared_from_this(), + ph::_1))); + } + + void + on_read(error_code ec) + { + if(ec) + return fail("on_read", ec); + ++count_; + bytes_ += buffer_.size(); + buffer_.consume(buffer_.size()); + do_write(); + } + + void + on_close(error_code ec) + { + if(ec) + return fail("on_close", ec); + do_drain(); + } + + void + do_drain() + { + ws_.async_read(buffer_, + alloc_.wrap(std::bind( + &connection::on_drain, + shared_from_this(), + ph::_1))); + } + + void + on_drain(error_code ec) + { + if(ec) + return fail("on_drain", ec); + do_drain(); + } + +}; + +class timer +{ + using clock_type = + std::chrono::system_clock; + + clock_type::time_point when_; + +public: + using duration = + clock_type::duration; + + timer() + : when_(clock_type::now()) + { + } + + duration + elapsed() const + { + return clock_type::now() - when_; + } +}; + +inline +std::uint64_t +throughput( + std::chrono::duration const& elapsed, + std::uint64_t items) +{ + using namespace std::chrono; + return static_cast( + 1 / (elapsed/items).count()); +} + +int +main(int argc, char** argv) +{ + beast::unit_test::dstream dout(std::cerr); + + try + { + // Check command line arguments. + if(argc != 6) + { + std::cerr << + "Usage: " << argv[0] << + "
"; + return EXIT_FAILURE; + } + + auto const address = ip::address::from_string(argv[1]); + auto const port = static_cast(std::atoi(argv[2])); + auto const trials = static_cast(std::atoi(argv[3])); + auto const messages= static_cast(std::atoi(argv[4])); + auto const workers = static_cast(std::atoi(argv[5])); + auto const work = (messages + workers - 1) / workers; + test_buffer tb; + for(auto i = trials; i; --i) + { + report rep; + boost::asio::io_service ios{1}; + for(auto j = workers; j; --j) + { + auto sp = + std::make_shared( + dout, + ios, + tcp::endpoint{address, port}, + work, + rep, + tb); + sp->run(); + } + timer t; + ios.run(); + auto const elapsed = t.elapsed(); + dout << + throughput(elapsed, rep.bytes()) << " bytes/s in " << + (std::chrono::duration_cast< + std::chrono::milliseconds>( + elapsed).count() / 1000.) << "ms and " << + rep.bytes() << " bytes" << std::endl; + } + } + catch(std::exception const& e) + { + std::cerr << "Exception: " << e.what() << std::endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +}