diff --git a/CHANGELOG.md b/CHANGELOG.md index 89591d92..8315f362 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ Version 109: +* refactor test::stream + WebSocket: * Fix async_read_some handler signature diff --git a/test/beast/http/read.cpp b/test/beast/http/read.cpp index bc508750..441dbd09 100644 --- a/test/beast/http/read.cpp +++ b/test/beast/http/read.cpp @@ -51,7 +51,7 @@ public: test::stream ts{ios_, fc}; test_parser p(fc); error_code ec = test::error::fail_error; - ts.remote().close(); + ts.close_remote(); read(ts, b, p, ec); if(! ec) break; @@ -68,7 +68,7 @@ public: std::string(s + pre, len - pre)}; test_parser p(fc); error_code ec = test::error::fail_error; - ts.remote().close(); + ts.close_remote(); read(ts, b, p, ec); if(! ec) break; @@ -83,7 +83,7 @@ public: test::stream ts{ios_, fc}; test_parser p(fc); error_code ec = test::error::fail_error; - ts.remote().close(); + ts.close_remote(); async_read(ts, b, p, do_yield[ec]); if(! ec) break; @@ -100,7 +100,7 @@ public: std::string{s + pre, len - pre}); test_parser p(fc); error_code ec = test::error::fail_error; - ts.remote().close(); + ts.close_remote(); async_read(ts, b, p, do_yield[ec]); if(! ec) break; @@ -114,7 +114,7 @@ public: { multi_buffer b; test::stream c{ios_, "GET / X"}; - c.remote().close(); + c.close_remote(); request_parser p; read(c, b, p); fail(); @@ -306,7 +306,7 @@ public: test::stream ts{ios_}; request_parser p; error_code ec; - ts.remote().close(); + ts.close_remote(); read(ts, b, p, ec); BEAST_EXPECT(ec == http::error::end_of_stream); } @@ -315,7 +315,7 @@ public: test::stream ts{ios_}; request_parser p; error_code ec; - ts.remote().close(); + ts.close_remote(); async_read(ts, b, p, do_yield[ec]); BEAST_EXPECT(ec == http::error::end_of_stream); } diff --git a/test/beast/http/write.cpp b/test/beast/http/write.cpp index 38b510e1..c303b5af 100644 --- a/test/beast/http/write.cpp +++ b/test/beast/http/write.cpp @@ -284,10 +284,11 @@ public: bool equal_body(string_view sv, string_view body) { - test::stream ts{ios_, sv}; + test::stream ts{ios_, sv}, tr{ios_}; + ts.connect(tr); message m; multi_buffer b; - ts.remote().close(); + ts.close_remote(); try { read(ts, b, m); @@ -304,12 +305,13 @@ public: std::string str(message const& m) { - test::stream ts(ios_); + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); error_code ec; write(ts, m, ec); if(ec && ec != error::end_of_stream) BOOST_THROW_EXCEPTION(system_error{ec}); - return ts.remote().str().to_string(); + return tr.str().to_string(); } void @@ -323,10 +325,11 @@ public: m.set(field::content_length, "5"); m.body = "*****"; error_code ec; - test::stream ts{ios_}; + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); async_write(ts, m, do_yield[ec]); if(BEAST_EXPECTS(ec == error::end_of_stream, ec.message())) - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "HTTP/1.0 200 OK\r\n" "Server: test\r\n" "Content-Length: 5\r\n" @@ -341,10 +344,11 @@ public: m.set(field::transfer_encoding, "chunked"); m.body = "*****"; error_code ec; - test::stream ts{ios_}; + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); async_write(ts, m, do_yield[ec]); if(BEAST_EXPECTS(! ec, ec.message())) - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "HTTP/1.1 200 OK\r\n" "Server: test\r\n" "Transfer-Encoding: chunked\r\n" @@ -364,7 +368,8 @@ public: for(n = 0; n < limit; ++n) { test::fail_counter fc(n); - test::stream ts(ios_, fc); + test::stream ts{ios_, fc}, tr{ios_}; + ts.connect(tr); request m(verb::get, "/", 10, fc); m.set(field::user_agent, "test"); m.set(field::connection, "keep-alive"); @@ -373,7 +378,7 @@ public: try { write(ts, m); - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "GET / HTTP/1.0\r\n" "User-Agent: test\r\n" "Connection: keep-alive\r\n" @@ -393,7 +398,8 @@ public: for(n = 0; n < limit; ++n) { test::fail_counter fc(n); - test::stream ts(ios_, fc); + test::stream ts{ios_, fc}, tr{ios_}; + ts.connect(tr); request m{verb::get, "/", 10, fc}; m.set(field::user_agent, "test"); m.set(field::transfer_encoding, "chunked"); @@ -402,7 +408,7 @@ public: write(ts, m, ec); if(ec == error::end_of_stream) { - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "GET / HTTP/1.0\r\n" "User-Agent: test\r\n" "Transfer-Encoding: chunked\r\n" @@ -422,7 +428,8 @@ public: for(n = 0; n < limit; ++n) { test::fail_counter fc(n); - test::stream ts(ios_, fc); + test::stream ts{ios_, fc}, tr{ios_}; + ts.connect(tr); request m{verb::get, "/", 10, fc}; m.set(field::user_agent, "test"); m.set(field::transfer_encoding, "chunked"); @@ -431,7 +438,7 @@ public: async_write(ts, m, do_yield[ec]); if(ec == error::end_of_stream) { - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "GET / HTTP/1.0\r\n" "User-Agent: test\r\n" "Transfer-Encoding: chunked\r\n" @@ -451,7 +458,8 @@ public: for(n = 0; n < limit; ++n) { test::fail_counter fc(n); - test::stream ts(ios_, fc); + test::stream ts{ios_, fc}, tr{ios_}; + ts.connect(tr); request m{verb::get, "/", 10, fc}; m.set(field::user_agent, "test"); m.set(field::connection, "keep-alive"); @@ -461,7 +469,7 @@ public: write(ts, m, ec); if(! ec) { - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "GET / HTTP/1.0\r\n" "User-Agent: test\r\n" "Connection: keep-alive\r\n" @@ -477,7 +485,8 @@ public: for(n = 0; n < limit; ++n) { test::fail_counter fc(n); - test::stream ts(ios_, fc); + test::stream ts{ios_, fc}, tr{ios_}; + ts.connect(tr); request m{verb::get, "/", 10, fc}; m.set(field::user_agent, "test"); m.set(field::connection, "keep-alive"); @@ -487,7 +496,7 @@ public: async_write(ts, m, do_yield[ec]); if(! ec) { - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "GET / HTTP/1.0\r\n" "User-Agent: test\r\n" "Connection: keep-alive\r\n" @@ -530,11 +539,12 @@ public: m.set(field::user_agent, "test"); m.body = "*"; m.prepare_payload(); - test::stream ts(ios_); + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); error_code ec; write(ts, m, ec); BEAST_EXPECT(ec == error::end_of_stream); - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "GET / HTTP/1.0\r\n" "User-Agent: test\r\n" "\r\n" @@ -567,10 +577,11 @@ public: m.set(field::user_agent, "test"); m.body = "*"; m.prepare_payload(); - test::stream ts(ios_); + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); error_code ec; write(ts, m, ec); - BEAST_EXPECT(ts.remote().str() == + BEAST_EXPECT(tr.str() == "GET / HTTP/1.1\r\n" "User-Agent: test\r\n" "Transfer-Encoding: chunked\r\n" @@ -635,7 +646,8 @@ public: // destroyed when calling ~io_service { boost::asio::io_service ios; - test::stream ts{ios}; + test::stream ts{ios}, tr{ios}; + ts.connect(tr); BEAST_EXPECT(handler::count() == 0); request m; m.method(verb::get); @@ -695,8 +707,8 @@ public: void testWriteStream(boost::asio::yield_context yield) { - test::stream ts{ios_}; - auto tr = ts.remote(); + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); ts.write_size(3); response m0; @@ -812,8 +824,8 @@ public: testIssue655() { boost::asio::io_service ios; - test::stream ts{ios}; - + test::stream ts{ios}, tr{ios}; + ts.connect(tr); response res; res.chunked(true); response_serializer sr{res}; @@ -821,11 +833,11 @@ public: [&](const error_code&) { }); - ios.run(); } - void run() override + void + run() override { testIssue655(); yield_to( diff --git a/test/beast/websocket/stream.cpp b/test/beast/websocket/stream.cpp index f48d48bf..8360d9c1 100644 --- a/test/beast/websocket/stream.cpp +++ b/test/beast/websocket/stream.cpp @@ -1,5 +1,5 @@ // -// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// Copyright (w) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,14 +10,11 @@ // Test that header file is self-contained. #include -#include "websocket_sync_echo_server.hpp" - #include #include #include #include #include -#include #include #include #include @@ -36,37 +33,114 @@ public: using address_type = boost::asio::ip::address; using socket_type = boost::asio::ip::tcp::socket; + using ws_type = + websocket::stream; + //-------------------------------------------------------------------------- - class asyncEchoServer - : public std::enable_shared_from_this + class echo_server { std::ostream& log_; - websocket::stream ws_; - boost::asio::io_service::strand strand_; + boost::asio::io_service ios_; static_buffer<2001> buffer_; + test::stream ts_; + std::thread t_; + websocket::stream ws_; public: - asyncEchoServer( + explicit + echo_server( std::ostream& log, - test::stream stream) + bool async = false) : log_(log) - , ws_(std::move(stream)) - , strand_(ws_.get_io_service()) + , ts_(ios_) + , ws_(ts_) + { + if(async) + { + do_async(); + } + else + { + t_ = std::thread{std::bind( + &echo_server::do_sync, + this, std::ref(ts_))}; + } + } + + ~echo_server() + { + t_.join(); + } + + test::stream& + stream() + { + return ts_; + } + + void + async_close() + { + ios_.post( + [&] + { + ws_.async_close({}, + std::bind( + &echo_server::on_close, + this, + std::placeholders::_1)); + }); + } + + private: + void + do_sync(test::stream& stream) + { + try + { + websocket::stream ws{stream}; + permessage_deflate pmd; + pmd.client_enable = true; + pmd.server_enable = true; + ws.set_option(pmd); + ws.accept(); + for(;;) + { + static_buffer<2001> buffer; + ws.read(buffer); + ws.text(ws.got_text()); + ws.write(buffer.data()); + } + } + catch(system_error const& se) + { + if( se.code() != error::closed && + se.code() != error::failed && + se.code() != boost::asio::error::eof) + log_ << "echo_server: " << se.code().message() << std::endl; + } + catch(std::exception const& e) + { + log_ << "echo_server: " << e.what() << std::endl; + } + } + + void + do_async() { permessage_deflate pmd; pmd.client_enable = true; pmd.server_enable = true; ws_.set_option(pmd); - } - - void - run() - { - ws_.async_accept(strand_.wrap(std::bind( - &asyncEchoServer::on_accept, - shared_from_this(), - std::placeholders::_1))); + ws_.async_accept(std::bind( + &echo_server::on_accept, + this, + std::placeholders::_1)); + t_ = std::thread{[&] + { + ios_.run(); + }}; } void @@ -81,11 +155,10 @@ public: do_read() { ws_.async_read(buffer_, - strand_.wrap(std::bind( - &asyncEchoServer::on_read, - shared_from_this(), - std::placeholders::_1))); - + std::bind( + &echo_server::on_read, + this, + std::placeholders::_1)); } void @@ -95,10 +168,10 @@ public: return fail(ec); ws_.text(ws_.got_text()); ws_.async_write(buffer_.data(), - strand_.wrap(std::bind( - &asyncEchoServer::on_write, - this->shared_from_this(), - std::placeholders::_1))); + std::bind( + &echo_server::on_write, + this, + std::placeholders::_1)); } void @@ -110,6 +183,13 @@ public: do_read(); } + void + on_close(error_code ec) + { + if(ec) + return fail(ec); + } + void fail(error_code ec) { @@ -117,65 +197,14 @@ public: ec != error::failed && ec != boost::asio::error::eof) log_ << - "asyncEchoServer: " << + "echo_server: " << ec.message() << std::endl; } }; - void - echoServer(test::stream& stream) - { - try - { - websocket::stream ws{stream}; - permessage_deflate pmd; - pmd.client_enable = true; - pmd.server_enable = true; - ws.set_option(pmd); - ws.accept(); - for(;;) - { - static_buffer<2001> buffer; - ws.read(buffer); - ws.text(ws.got_text()); - ws.write(buffer.data()); - } - } - catch(system_error const& se) - { - if( se.code() != error::closed && - se.code() != error::failed && - se.code() != boost::asio::error::eof) - log << "echoServer: " << se.code().message() << std::endl; - } - catch(std::exception const& e) - { - log << "echoServer: " << e.what() << std::endl; - } - } - - void - launchEchoServer(test::stream stream) - { - std::thread{std::bind( - &stream_test::echoServer, - this, - std::move(stream))}.detach(); - } - - void - launchEchoServerAsync(test::stream stream) - { - std::make_shared( - log, std::move(stream))->run(); - } - //-------------------------------------------------------------------------- - using ws_stream_type = - websocket::stream; - template void doTestLoop(Test const& f) @@ -183,74 +212,98 @@ public: // This number has to be high for the // test that writes the large buffer. static std::size_t constexpr limit = 1000; - std::size_t n; - for(n = 0; n <= limit; ++n) - { - test::fail_counter fc{n}; - test::stream ts{ios_, fc}; - try - { - f(ts); - // Made it through - ts.close(); - break; - } - catch(system_error const& se) + for(int i = 0; i < 2; ++i) + { + std::size_t n; + for(n = 0; n <= limit; ++n) { - BEAST_EXPECTS( - se.code() == test::error::fail_error, - se.code().message()); + test::fail_counter fc{n}; + test::stream ts{ios_, fc}; + try + { + f(ts); + ts.close(); + break; + } + catch(system_error const& se) + { + BEAST_EXPECTS( + se.code() == test::error::fail_error, + se.code().message()); + } + catch(std::exception const& e) + { + fail(e.what(), __FILE__, __LINE__); + } ts.close(); + continue; } - catch(std::exception const& e) - { - fail(e.what(), __FILE__, __LINE__); - ts.close(); - } - continue; + BEAST_EXPECT(n < limit); } - BEAST_EXPECT(n < limit); } - - template + + template void doTest( - Wrap const& w, permessage_deflate const& pmd, - Launch const& launch, Test const& f) { - doTestLoop( - [&](test::stream& ts) + // This number has to be high for the + // test that writes the large buffer. + static std::size_t constexpr limit = 1000; + + for(int i = 0; i < 2; ++i) { - ws_stream_type ws{ts}; - ws.set_option(pmd); - launch(ts.remote()); - w.handshake(ws, "localhost", "/"); - f(ws); - }); -#if 0 - // Lowering the read_size takes a disproportionate - // amount of time for the gains in coverage . - doTestLoop( - [&](test::stream& ts) - { - ws_stream_type ws{ts}; - ts.read_size(45); - ws.set_option(pmd); - launch(ts.remote()); - w.handshake(ws, "localhost", "/"); - f(ws); - }); -#endif + std::size_t n; + for(n = 0; n <= limit; ++n) + { + test::fail_counter fc{n}; + test::stream ts{ios_, fc}; + ws_type ws{ts}; + ws.set_option(pmd); + + echo_server es{log, i == 1};; + error_code ec; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/", ec); + if(ec) + { + ts.close(); + if( ! BEAST_EXPECTS( + ec == test::error::fail_error, + ec.message())) + BOOST_THROW_EXCEPTION(system_error{ec}); + continue; + } + try + { + f(ws); + ts.close(); + break; + } + catch(system_error const& se) + { + BEAST_EXPECTS( + se.code() == test::error::fail_error, + se.code().message()); + } + catch(std::exception const& e) + { + fail(e.what(), __FILE__, __LINE__); + } + ts.close(); + continue; + } + BEAST_EXPECT(n < limit); + } } template void doCloseTest( Wrap const& w, - ws_stream_type& ws, + ws_type& ws, close_code code) { try @@ -272,7 +325,7 @@ public: void doFailTest( Wrap const& w, - ws_stream_type& ws, + ws_type& ws, error_code ev) { try @@ -866,9 +919,9 @@ public: // //-------------------------------------------------------------------------- - template + template void - doTestAccept(Client const& c) + doTestAccept(Wrap const& w) { class res_decorator { @@ -901,7 +954,8 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; - ts.str( + auto tr = connect(ws.next_layer()); + ts.append( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -910,7 +964,7 @@ public: "Sec-WebSocket-Version: 13\r\n" "\r\n"); ts.read_size(20); - c.accept(ws); + w.accept(ws); // VFALCO validate contents of ws.next_layer().str? }); @@ -925,10 +979,11 @@ public: "Sec-WebSocket-Version: 13\r\n" + big + "\r\n"}; + auto tr = connect(ws.next_layer()); error_code ec = test::error::fail_error; try { - c.accept(ws); + w.accept(ws); fail("", __FILE__, __LINE__); } catch(system_error const& se) @@ -944,7 +999,8 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; - ts.str( + auto tr = connect(ws.next_layer()); + ts.append( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -954,7 +1010,7 @@ public: "\r\n"); ts.read_size(20); bool called = false; - c.accept_ex(ws, res_decorator{called}); + w.accept_ex(ws, res_decorator{called}); BEAST_EXPECT(called); }); @@ -969,10 +1025,11 @@ public: "Sec-WebSocket-Version: 13\r\n" + big + "\r\n"}; + auto tr = connect(ws.next_layer()); try { bool called = false; - c.accept_ex(ws, res_decorator{called}); + w.accept_ex(ws, res_decorator{called}); fail("", __FILE__, __LINE__); } catch(system_error const& se) @@ -988,7 +1045,8 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; - c.accept(ws, sbuf( + auto tr = connect(ws.next_layer()); + w.accept(ws, sbuf( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -1002,9 +1060,10 @@ public: // request in buffers, oversize { stream ws{ios_}; + auto tr = connect(ws.next_layer()); try { - c.accept(ws, boost::asio::buffer( + w.accept(ws, boost::asio::buffer( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -1028,8 +1087,9 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; + auto tr = connect(ws.next_layer()); bool called = false; - c.accept_ex(ws, sbuf( + w.accept_ex(ws, sbuf( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -1044,10 +1104,11 @@ public: // request in buffers, decorator, oversized { stream ws{ios_}; + auto tr = connect(ws.next_layer()); try { bool called = false; - c.accept_ex(ws, boost::asio::buffer( + w.accept_ex(ws, boost::asio::buffer( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -1071,13 +1132,14 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; - ts.str( + auto tr = connect(ws.next_layer()); + ts.append( "Connection: upgrade\r\n" "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" "Sec-WebSocket-Version: 13\r\n" "\r\n"); ts.read_size(16); - c.accept(ws, sbuf( + w.accept(ws, sbuf( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -1093,9 +1155,10 @@ public: "Sec-WebSocket-Version: 13\r\n" + big + "\r\n"}; + auto tr = connect(ws.next_layer()); try { - c.accept(ws, sbuf( + w.accept(ws, sbuf( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -1114,14 +1177,15 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; - ts.str( + auto tr = connect(ws.next_layer()); + ts.append( "Connection: upgrade\r\n" "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" "Sec-WebSocket-Version: 13\r\n" "\r\n"); ts.read_size(16); bool called = false; - c.accept_ex(ws, sbuf( + w.accept_ex(ws, sbuf( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n"), @@ -1137,10 +1201,11 @@ public: "Sec-WebSocket-Version: 13\r\n" + big + "\r\n"}; + auto tr = connect(ws.next_layer()); try { bool called = false; - c.accept_ex(ws, sbuf( + w.accept_ex(ws, sbuf( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n"), @@ -1159,6 +1224,7 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; + auto tr = connect(ws.next_layer()); request_type req; req.method(http::verb::get); req.target("/"); @@ -1168,13 +1234,14 @@ public: req.insert(http::field::connection, "upgrade"); req.insert(http::field::sec_websocket_key, "dGhlIHNhbXBsZSBub25jZQ=="); req.insert(http::field::sec_websocket_version, "13"); - c.accept(ws, req); + w.accept(ws, req); }); // request in message, decorator doTestLoop([&](test::stream& ts) { stream ws{ts}; + auto tr = connect(ws.next_layer()); request_type req; req.method(http::verb::get); req.target("/"); @@ -1185,7 +1252,7 @@ public: req.insert(http::field::sec_websocket_key, "dGhlIHNhbXBsZSBub25jZQ=="); req.insert(http::field::sec_websocket_version, "13"); bool called = false; - c.accept_ex(ws, req, + w.accept_ex(ws, req, res_decorator{called}); BEAST_EXPECT(called); }); @@ -1194,6 +1261,7 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; + auto tr = connect(ws.next_layer()); request_type req; req.method(http::verb::get); req.target("/"); @@ -1203,12 +1271,12 @@ public: req.insert(http::field::connection, "upgrade"); req.insert(http::field::sec_websocket_key, "dGhlIHNhbXBsZSBub25jZQ=="); req.insert(http::field::sec_websocket_version, "13"); - ts.str("\x88\x82\xff\xff\xff\xff\xfc\x17"); - c.accept(ws, req); + ts.append("\x88\x82\xff\xff\xff\xff\xfc\x17"); + w.accept(ws, req); try { static_buffer<1> b; - c.read(ws, b); + w.read(ws, b); fail("success", __FILE__, __LINE__); } catch(system_error const& e) @@ -1222,7 +1290,8 @@ public: doTestLoop([&](test::stream& ts) { stream ws{ts}; - ts.str( + auto tr = connect(ws.next_layer()); + ts.append( "GET / HTTP/1.1\r\n" "Host: localhost\r\n" "Upgrade: websocket\r\n" @@ -1232,7 +1301,7 @@ public: ts.read_size(20); try { - c.accept(ws); + w.accept(ws); fail("success", __FILE__, __LINE__); } catch(system_error const& e) @@ -1248,10 +1317,11 @@ public: // Closed by client { stream ws{ios_}; - ws.next_layer().remote().close(); + auto tr = connect(ws.next_layer()); + tr.close(); try { - c.accept(ws); + w.accept(ws); fail("success", __FILE__, __LINE__); } catch(system_error const& e) @@ -1298,7 +1368,8 @@ public: break; } stream ws{ios_}; - ws.next_layer().str( + auto tr = connect(ws.next_layer()); + ws.next_layer().append( s.substr(n, s.size() - n)); try { @@ -1416,23 +1487,17 @@ public: pmd.client_enable = false; pmd.server_enable = false; - auto const launch = - [&](test::stream stream) - { - launchEchoServerAsync(std::move(stream)); - }; - // normal close - doTest(w, pmd, launch, - [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { w.close(ws, {}); }); // double close { + echo_server es{log}; stream ws{ios_}; - launch(ws.next_layer().remote()); + ws.next_layer().connect(es.stream()); w.handshake(ws, "localhost", "/"); w.close(ws, {}); try @@ -1449,10 +1514,9 @@ public: } // drain a message after close - doTest(w, pmd, launch, - [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - ws.next_layer().str("\x81\x01\x2a"); + ws.next_layer().append("\x81\x01\x2a"); w.close(ws, {}); }); @@ -1461,28 +1525,27 @@ public: std::string s; s = "\x81\x7e\x10\x01"; s.append(4097, '*'); - doTest(w, pmd, launch, - [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - ws.next_layer().str(s); + ws.next_layer().append(s); w.close(ws, {}); }); } // drain a ping after close - doTest(w, pmd, launch, - [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - ws.next_layer().str("\x89\x01*"); + ws.next_layer().append("\x89\x01*"); w.close(ws, {}); }); // drain invalid message frame after close { + echo_server es{log}; stream ws{ios_}; - launch(ws.next_layer().remote()); + ws.next_layer().connect(es.stream()); w.handshake(ws, "localhost", "/"); - ws.next_layer().str("\x81\x81\xff\xff\xff\xff*"); + ws.next_layer().append("\x81\x81\xff\xff\xff\xff*"); try { w.close(ws, {}); @@ -1498,10 +1561,11 @@ public: // drain invalid close frame after close { + echo_server es{log}; stream ws{ios_}; - launch(ws.next_layer().remote()); + ws.next_layer().connect(es.stream()); w.handshake(ws, "localhost", "/"); - ws.next_layer().str("\x88\x01*"); + ws.next_layer().append("\x88\x01*"); try { w.close(ws, {}); @@ -1516,10 +1580,9 @@ public: } // close with incomplete read message - doTest(w, pmd, launch, - [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - ws.next_layer().str("\x81\x02**"); + ws.next_layer().append("\x81\x02**"); static_buffer<1> b; w.read_some(ws, 1, b); w.close(ws, {}); @@ -1536,19 +1599,13 @@ public: doTestClose(AsyncClient{yield}); }); - auto const launch = - [&](test::stream stream) - { - launchEchoServer(std::move(stream)); - //launchEchoServerAsync(std::move(stream)); - }; - // suspend on write { + echo_server es{log}; error_code ec; boost::asio::io_service ios; - stream ws{ios, ios_}; - launch(ws.next_layer().remote()); + stream ws{ios}; + ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/", ec); BEAST_EXPECTS(! ec, ec.message()); std::size_t count = 0; @@ -1571,10 +1628,11 @@ public: // suspend on read { + echo_server es{log}; error_code ec; boost::asio::io_service ios; - stream ws{ios, ios_}; - launch(ws.next_layer().remote()); + stream ws{ios}; + ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/", ec); BEAST_EXPECTS(! ec, ec.message()); flat_buffer b; @@ -1603,9 +1661,9 @@ public: //-------------------------------------------------------------------------- - template + template void - doTestHandshake(Client const& c, Launch const& launch) + doTestHandshake(Wrap const& w) { class req_decorator { @@ -1630,70 +1688,103 @@ public: // handshake doTestLoop([&](test::stream& ts) { - ws_stream_type ws{ts}; - launch(ts.remote()); - c.handshake(ws, "localhost", "/"); + echo_server es{log}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); + try + { + w.handshake(ws, "localhost", "/"); + } + catch(...) + { + ts.close(); + throw; + } + ts.close(); }); // handshake, response doTestLoop([&](test::stream& ts) { - ws_stream_type ws{ts}; - launch(ts.remote()); + echo_server es{log}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); response_type res; - c.handshake(ws, res, "localhost", "/"); - // VFALCO validate res? + try + { + w.handshake(ws, res, "localhost", "/"); + // VFALCO validate res? + } + catch(...) + { + ts.close(); + throw; + } + ts.close(); }); // handshake, decorator doTestLoop([&](test::stream& ts) { - ws_stream_type ws{ts}; - launch(ts.remote()); + echo_server es{log}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); bool called = false; - c.handshake_ex(ws, "localhost", "/", - req_decorator{called}); - BEAST_EXPECT(called); + try + { + w.handshake_ex(ws, "localhost", "/", + req_decorator{called}); + BEAST_EXPECT(called); + } + catch(...) + { + ts.close(); + throw; + } + ts.close(); }); // handshake, response, decorator doTestLoop([&](test::stream& ts) { - ws_stream_type ws{ts}; - launch(ts.remote()); + echo_server es{log}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); bool called = false; response_type res; - c.handshake_ex(ws, res, "localhost", "/", - req_decorator{called}); - // VFALCO validate res? - BEAST_EXPECT(called); + try + { + w.handshake_ex(ws, res, "localhost", "/", + req_decorator{called}); + // VFALCO validate res? + BEAST_EXPECT(called); + } + catch(...) + { + ts.close(); + throw; + } + ts.close(); }); } void testHandshake() { - doTestHandshake(SyncClient{}, - [&](test::stream stream) - { - launchEchoServer(std::move(stream)); - }); + doTestHandshake(SyncClient{}); yield_to([&](yield_context yield) { - doTestHandshake(AsyncClient{yield}, - [&](test::stream stream) - { - launchEchoServerAsync(std::move(stream)); - }); + doTestHandshake(AsyncClient{yield}); }); auto const check = [&](std::string const& s) { stream ws{ios_}; - ws.next_layer().str(s); - ws.next_layer().remote().close(); + auto tr = connect(ws.next_layer()); + ws.next_layer().append(s); + tr.close(); try { ws.handshake("localhost:80", "/"); @@ -1775,25 +1866,18 @@ public: void doTestPing(Wrap const& w) { - auto const launch = - [&](test::stream stream) - { - launchEchoServer(std::move(stream)); - //launchEchoServerAsync(std::move(stream)); - }; - permessage_deflate pmd; pmd.client_enable = false; pmd.server_enable = false; // ping - doTest(w, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { w.ping(ws, {}); }); // pong - doTest(w, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { w.pong(ws, {}); }); @@ -1809,13 +1893,6 @@ public: doTestPing(AsyncClient{yield}); }); - auto const launch = - [&](test::stream stream) - { - launchEchoServer(std::move(stream)); - //launchEchoServerAsync(std::move(stream)); - }; - // ping, already closed { stream ws{ios_}; @@ -1866,10 +1943,11 @@ public: // suspend on write { + echo_server es{log}; error_code ec; boost::asio::io_service ios; - stream ws{ios, ios_}; - launch(ws.next_layer().remote()); + stream ws{ios}; + ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/", ec); BEAST_EXPECTS(! ec, ec.message()); std::size_t count = 0; @@ -1908,15 +1986,16 @@ public: auto const check = [&](error_code ev, string_view s) { - test::stream ts{ios_}; - stream ws{ts}; - launchEchoServerAsync(ts.remote()); + echo_server es{log}; + stream ws{ios_}; + ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/"); - ts.str(s); + ws.next_layer().append(s); static_buffer<1> b; error_code ec; ws.read(b, ec); BEAST_EXPECTS(ec == ev, ec.message()); + ws.next_layer().close(); }; // payload length 1 @@ -1998,14 +2077,12 @@ public: void testPausation1() { - for(int i = 0; i < 2; ++i) + for(int i = 0; i < 2; ++i ) { + echo_server es{log, i==1}; boost::asio::io_service ios; - stream ws{ios, ios_}; - if(i == 0) - launchEchoServer(ws.next_layer().remote()); - else - launchEchoServerAsync(ws.next_layer().remote()); + stream ws{ios}; + ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/"); // Make remote send a text message with bad utf8. @@ -2064,16 +2141,17 @@ public: } void - testPausation2(endpoint_type const& ep) + testPausation2() { + echo_server es{log, true}; boost::asio::io_service ios; - stream ws(ios); - ws.next_layer().connect(ep); + stream ws{ios}; + ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/"); // Cause close to be received - ws.binary(true); - ws.write(sbuf("CLOSE")); + es.async_close(); + multi_buffer b; std::size_t count = 0; // Read a close frame. @@ -2130,16 +2208,17 @@ public: } void - testPausation3(endpoint_type const& ep) + testPausation3() { + echo_server es{log, true}; boost::asio::io_service ios; - stream ws(ios); - ws.next_layer().connect(ep); + stream ws{ios}; + ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/"); // Cause close to be received - ws.binary(true); - ws.write(sbuf("CLOSE")); + es.async_close(); + multi_buffer b; std::size_t count = 0; ws.async_read(b, @@ -2180,14 +2259,14 @@ public: void testWriteFrames() { - for(int i = 0; i < 2; ++i) + for(int i = 0; i < 2; ++i ) { + echo_server es{log, i==1}; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + error_code ec; - stream ws{ios_}; - if(i == 0) - launchEchoServer(ws.next_layer().remote()); - else - launchEchoServerAsync(ws.next_layer().remote()); ws.handshake("localhost", "/", ec); if(! BEAST_EXPECTS(! ec, ec.message())) return; @@ -2206,13 +2285,12 @@ public: { for(;;) { - error_code ec; + echo_server es{log, i==1}; boost::asio::io_service ios; - stream ws{ios, ios_}; - if(i == 0) - launchEchoServer(ws.next_layer().remote()); - else - launchEchoServerAsync(ws.next_layer().remote()); + stream ws{ios}; + ws.next_layer().connect(es.stream()); + + error_code ec; ws.handshake("localhost", "/", ec); if(! BEAST_EXPECTS(! ec, ec.message())) break; @@ -2235,45 +2313,43 @@ public: //-------------------------------------------------------------------------- - template + template void - testStream( - Wrap const& c, - permessage_deflate const& pmd, - Launch const& launch) + testStream(Wrap const& w, + permessage_deflate const& pmd) { using boost::asio::buffer; // send empty message - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { ws.text(true); - c.write(ws, boost::asio::null_buffers{}); + w.write(ws, boost::asio::null_buffers{}); multi_buffer b; - c.read(ws, b); + w.read(ws, b); BEAST_EXPECT(ws.got_text()); BEAST_EXPECT(b.size() == 0); }); // send message - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { ws.auto_fragment(false); ws.binary(false); - c.write(ws, sbuf("Hello")); + w.write(ws, sbuf("Hello")); multi_buffer b; - c.read(ws, b); + w.read(ws, b); BEAST_EXPECT(ws.got_text()); BEAST_EXPECT(to_string(b.data()) == "Hello"); }); // read_some - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.write(ws, sbuf("Hello")); + w.write(ws, sbuf("Hello")); char buf[10]; auto const bytes_written = - c.read_some(ws, buffer(buf, sizeof(buf))); + w.read_some(ws, buffer(buf, sizeof(buf))); BEAST_EXPECT(bytes_written > 0); buf[bytes_written] = 0; BEAST_EXPECT( @@ -2282,19 +2358,19 @@ public: }); // close, no payload - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.close(ws, {}); + w.close(ws, {}); }); // close with code - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.close(ws, close_code::going_away); + w.close(ws, close_code::going_away); }); // send ping and message - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { bool once = false; auto cb = @@ -2306,18 +2382,18 @@ public: BEAST_EXPECT(s == ""); }; ws.control_callback(cb); - c.ping(ws, ""); + w.ping(ws, ""); ws.binary(true); - c.write(ws, sbuf("Hello")); + w.write(ws, sbuf("Hello")); multi_buffer b; - c.read(ws, b); + w.read(ws, b); BEAST_EXPECT(once); BEAST_EXPECT(ws.got_binary()); BEAST_EXPECT(to_string(b.data()) == "Hello"); }); // send ping and fragmented message - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { bool once = false; auto cb = @@ -2330,95 +2406,95 @@ public: }; ws.control_callback(cb); ws.ping("payload"); - c.write_some(ws, false, sbuf("Hello, ")); - c.write_some(ws, false, sbuf("")); - c.write_some(ws, true, sbuf("World!")); + w.write_some(ws, false, sbuf("Hello, ")); + w.write_some(ws, false, sbuf("")); + w.write_some(ws, true, sbuf("World!")); multi_buffer b; - c.read(ws, b); + w.read(ws, b); BEAST_EXPECT(once); BEAST_EXPECT(to_string(b.data()) == "Hello, World!"); ws.control_callback(); }); // send pong - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.pong(ws, ""); + w.pong(ws, ""); }); // send auto fragmented message - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { ws.auto_fragment(true); ws.write_buffer_size(8); - c.write(ws, sbuf("Now is the time for all good men")); + w.write(ws, sbuf("Now is the time for all good men")); multi_buffer b; - c.read(ws, b); + w.read(ws, b); BEAST_EXPECT(to_string(b.data()) == "Now is the time for all good men"); }); // send message with write buffer limit - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { std::string s(2000, '*'); ws.write_buffer_size(1200); - c.write(ws, buffer(s.data(), s.size())); + w.write(ws, buffer(s.data(), s.size())); multi_buffer b; - c.read(ws, b); + w.read(ws, b); BEAST_EXPECT(to_string(b.data()) == s); }); // unexpected cont - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.write_raw(ws, cbuf( + w.write_raw(ws, cbuf( 0x80, 0x80, 0xff, 0xff, 0xff, 0xff)); - doCloseTest(c, ws, close_code::protocol_error); + doCloseTest(w, ws, close_code::protocol_error); }); // invalid fixed frame header - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.write_raw(ws, cbuf( + w.write_raw(ws, cbuf( 0x8f, 0x80, 0xff, 0xff, 0xff, 0xff)); - doCloseTest(c, ws, close_code::protocol_error); + doCloseTest(w, ws, close_code::protocol_error); }); if(! pmd.client_enable) { // expected cont - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.write_some(ws, false, boost::asio::null_buffers{}); - c.write_raw(ws, cbuf( + w.write_some(ws, false, boost::asio::null_buffers{}); + w.write_raw(ws, cbuf( 0x81, 0x80, 0xff, 0xff, 0xff, 0xff)); - doCloseTest(c, ws, close_code::protocol_error); + doCloseTest(w, ws, close_code::protocol_error); }); // message size above 2^64 - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { - c.write_some(ws, false, cbuf(0x00)); - c.write_raw(ws, cbuf( + w.write_some(ws, false, cbuf(0x00)); + w.write_raw(ws, cbuf( 0x80, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff)); - doCloseTest(c, ws, close_code::too_big); + doCloseTest(w, ws, close_code::too_big); }); /* // message size exceeds max - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { // VFALCO This was never implemented correctly ws.read_message_max(1); - c.write(ws, cbuf(0x81, 0x02, '*', '*')); - doCloseTest(c, ws, close_code::too_big); + w.write(ws, cbuf(0x81, 0x02, '*', '*')); + doCloseTest(w, ws, close_code::too_big); }); */ } // receive ping - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { put(ws.next_layer().buffer(), cbuf( 0x89, 0x00)); @@ -2430,16 +2506,16 @@ public: invoked = true; }; ws.control_callback(cb); - c.write(ws, sbuf("Hello")); + w.write(ws, sbuf("Hello")); multi_buffer b; - c.read(ws, b); + w.read(ws, b); BEAST_EXPECT(invoked); BEAST_EXPECT(ws.got_text()); BEAST_EXPECT(to_string(b.data()) == "Hello"); }); // receive ping - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { put(ws.next_layer().buffer(), cbuf( 0x88, 0x00)); @@ -2451,24 +2527,24 @@ public: invoked = true; }; ws.control_callback(cb); - c.write(ws, sbuf("Hello")); - doCloseTest(c, ws, close_code::none); + w.write(ws, sbuf("Hello")); + doCloseTest(w, ws, close_code::none); }); // receive bad utf8 - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { put(ws.next_layer().buffer(), cbuf( 0x81, 0x06, 0x03, 0xea, 0xf0, 0x28, 0x8c, 0xbc)); - doFailTest(c, ws, error::failed); + doFailTest(w, ws, error::failed); }); // receive bad close - doTest(c, pmd, launch, [&](ws_stream_type& ws) + doTest(pmd, [&](ws_type& ws) { put(ws.next_layer().buffer(), cbuf( 0x88, 0x02, 0x03, 0xed)); - doFailTest(c, ws, error::failed); + doFailTest(w, ws, error::failed); }); } @@ -2510,58 +2586,20 @@ public: testOptions(); testPausation1(); + testPausation2(); + testPausation3(); testWriteFrames(); testAsyncWriteFrame(); - // Legacy tests use actual TCP/IP sockets - // VFALCO: Rewrite to use test::stream and - // remote - { - auto const any = endpoint_type{ - address_type::from_string("127.0.0.1"), 0}; - error_code ec; - ::websocket::sync_echo_server server{nullptr}; - server.set_option(pmd); - server.open(any, ec); - BEAST_EXPECTS(! ec, ec.message()); - auto const ep = server.local_endpoint(); - testPausation2(ep); - testPausation3(ep); - } - auto const doClientTests = [this](permessage_deflate const& pmd) { - testStream(SyncClient{}, pmd, - [&](test::stream stream) - { - launchEchoServer(std::move(stream)); - }); -#if 0 - // This causes false positives on ASAN - testStream(SyncClient{}, pmd, - [&](test::stream stream) - { - launchEchoServerAsync(std::move(stream)); - }); -#endif + testStream(SyncClient{}, pmd); + yield_to( [&](yield_context yield) { - testStream(AsyncClient{yield}, pmd, - [&](test::stream stream) - { - launchEchoServer(std::move(stream)); - }); - }); - yield_to( - [&](yield_context yield) - { - testStream(AsyncClient{yield}, pmd, - [&](test::stream stream) - { - launchEchoServerAsync(std::move(stream)); - }); + testStream(AsyncClient{yield}, pmd); }); }; diff --git a/test/doc/core_examples.cpp b/test/doc/core_examples.cpp index 7c29285b..af3fae91 100644 --- a/test/doc/core_examples.cpp +++ b/test/doc/core_examples.cpp @@ -50,8 +50,7 @@ public: testRead() { { - test::stream ts(ios_, - "\x16***"); + test::stream ts{ios_, "\x16***"}; error_code ec; flat_buffer b; auto const result = detect_ssl(ts, b, ec); @@ -61,8 +60,7 @@ public: yield_to( [&](yield_context yield) { - test::stream ts(ios_, - "\x16***"); + test::stream ts{ios_, "\x16***"}; error_code ec; flat_buffer b; auto const result = diff --git a/test/doc/http_examples.cpp b/test/doc/http_examples.cpp index 7f117389..dfafefd3 100644 --- a/test/doc/http_examples.cpp +++ b/test/doc/http_examples.cpp @@ -74,7 +74,7 @@ public: test::stream ts{ios_, sv}; message m; multi_buffer b; - ts.remote().close(); + ts.close_remote(); try { read(ts, b, m); @@ -90,8 +90,8 @@ public: void doExpect100Continue() { - test::stream ts{ios_}; - auto tr = ts.remote(); + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); yield_to( [&](yield_context) { @@ -125,12 +125,13 @@ public: std::string const s = "Hello, world!"; test::stream t0{ios_, s}; t0.read_size(3); - t0.remote().close(); - test::stream t1{ios_}; + t0.close_remote(); + test::stream t1{ios_}, t1r{ios_}; + t1.connect(t1r); error_code ec; send_cgi_response(t0, t1, ec); BEAST_EXPECTS(! ec, ec.message()); - BEAST_EXPECT(equal_body(t1.remote().str(), s)); + BEAST_EXPECT(equal_body(t1r.str(), s)); } void @@ -144,10 +145,11 @@ public: req.body = "Hello, world!"; req.prepare_payload(); - test::stream ds{ios_}; - auto dsr = ds.remote(); + test::stream ds{ios_}, dsr{ios_}; + ds.connect(dsr); dsr.read_size(3); - test::stream us{ios_}; + test::stream us{ios_}, usr{ios_}; + us.connect(usr); us.write_size(3); error_code ec; @@ -165,7 +167,7 @@ public: }); BEAST_EXPECTS(! ec, ec.message()); BEAST_EXPECT(equal_body( - us.remote().str(), req.body)); + usr.str(), req.body)); } void @@ -239,8 +241,8 @@ public: void doHEAD() { - test::stream ts{ios_}; - auto tr = ts.remote(); + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); yield_to( [&](yield_context) { @@ -324,7 +326,8 @@ public: return boost::asio::const_buffers_1{ s.data(), s.size()}; }; - test::stream ts{ios_}; + test::stream ts{ios_}, tr{ios_}; + ts.connect(tr); response res{status::ok, 11}; res.set(field::server, "test"); @@ -366,7 +369,7 @@ public: std::allocator{} ), ec); BEAST_EXPECT( - to_string(ts.remote().buffer().data()) == + to_string(tr.buffer().data()) == "HTTP/1.1 200 OK\r\n" "Server: test\r\n" "Accept: Expires, Content-MD5\r\n" diff --git a/test/extras/include/boost/beast/test/stream.hpp b/test/extras/include/boost/beast/test/stream.hpp index c35700d0..3d32bc0f 100644 --- a/test/extras/include/boost/beast/test/stream.hpp +++ b/test/extras/include/boost/beast/test/stream.hpp @@ -32,16 +32,18 @@ namespace boost { namespace beast { namespace test { -class stream; +/** A bidirectional in-memory communication channel -namespace detail { + An instance of this class provides a client and server + endpoint that are automatically connected to each other + similarly to a connected socket. -class stream_impl + Test pipes are used to facilitate writing unit tests + where the behavior of the transport is tightly controlled + to help illuminate all code paths (for code coverage) +*/ +class stream { - friend class boost::beast::test::stream; - - using buffer_type = flat_buffer; - struct read_op { virtual ~read_op() = default; @@ -63,7 +65,7 @@ class stream_impl friend class stream; std::mutex m; - buffer_type b; + flat_buffer b; std::condition_variable cv; std::unique_ptr op; boost::asio::io_service& ios; @@ -105,218 +107,61 @@ class stream_impl } }; - state s0_; - state s1_; - -public: - stream_impl( - boost::asio::io_service& ios, - fail_counter* fc) - : s0_(ios, fc) - , s1_(ios, nullptr) - { - } - - stream_impl( - boost::asio::io_service& ios0, - boost::asio::io_service& ios1) - : s0_(ios0, nullptr) - , s1_(ios1, nullptr) - { - } - - ~stream_impl() - { - BOOST_ASSERT(! s0_.op); - BOOST_ASSERT(! s1_.op); - } -}; - -template -class stream_impl::read_op_impl : public stream_impl::read_op -{ - class lambda - { - state& s_; - Buffers b_; - Handler h_; - boost::optional< - boost::asio::io_service::work> work_; - - public: - lambda(lambda&&) = default; - lambda(lambda const&) = default; - - lambda(state& s, Buffers const& b, Handler&& h) - : s_(s) - , b_(b) - , h_(std::move(h)) - , work_(s_.ios) - { - } - - lambda(state& s, Buffers const& b, Handler const& h) - : s_(s) - , b_(b) - , h_(h) - , work_(s_.ios) - { - } - - void - post() - { - s_.ios.post(std::move(*this)); - work_ = boost::none; - } - - void - operator()() - { - using boost::asio::buffer_copy; - using boost::asio::buffer_size; - std::unique_lock lock{s_.m}; - BOOST_ASSERT(! s_.op); - if(s_.b.size() > 0) - { - auto const bytes_transferred = buffer_copy( - b_, s_.b.data(), s_.read_max); - s_.b.consume(bytes_transferred); - auto& s = s_; - Handler h{std::move(h_)}; - lock.unlock(); - ++s.nread; - s.ios.post(bind_handler(std::move(h), - error_code{}, bytes_transferred)); - } - else - { - BOOST_ASSERT(s_.code != status::ok); - auto& s = s_; - Handler h{std::move(h_)}; - lock.unlock(); - ++s.nread; - error_code ec; - if(s.code == status::eof) - ec = boost::asio::error::eof; - else if(s.code == status::reset) - ec = boost::asio::error::connection_reset; - s.ios.post(bind_handler(std::move(h), ec, 0)); - } - } - }; - - lambda fn_; - -public: - read_op_impl(state& s, Buffers const& b, Handler&& h) - : fn_(s, b, std::move(h)) - { - } - - read_op_impl(state& s, Buffers const& b, Handler const& h) - : fn_(s, b, h) - { - } - - void - operator()() override - { - fn_.post(); - } -}; - -} // detail - -//------------------------------------------------------------------------------ - -/** A bidirectional in-memory communication channel - - An instance of this class provides a client and server - endpoint that are automatically connected to each other - similarly to a connected socket. - - Test pipes are used to facilitate writing unit tests - where the behavior of the transport is tightly controlled - to help illuminate all code paths (for code coverage) -*/ -class stream -{ - using status = detail::stream_impl::status; - - std::shared_ptr impl_; - detail::stream_impl::state* in_; - detail::stream_impl::state* out_; - - explicit - stream(std::shared_ptr< - detail::stream_impl> const& impl) - : impl_(impl) - , in_(&impl_->s1_) - , out_(&impl_->s0_) - { - } + std::shared_ptr in_; + std::weak_ptr out_; public: using buffer_type = flat_buffer; - /// Assignment - stream& operator=(stream&&) = default; - /// Destructor ~stream() { - if(! impl_) - return; - std::unique_lock lock{out_->m}; - if(out_->code == status::ok) + auto out = out_.lock(); + if(out) { - out_->code = status::reset; - out_->on_write(); + std::unique_lock lock{out->m}; + if(out->code == status::ok) + { + out->code = status::reset; + out->on_write(); + } } - lock.unlock(); } /// Constructor stream(stream&& other) - : impl_(std::move(other.impl_)) - , in_(other.in_) - , out_(other.out_) { + auto in = std::make_shared( + other.in_->ios, other.in_->fc); + in_ = std::move(other.in_); + out_ = std::move(other.out_); + other.in_ = in; + } + + /// Assignment + stream& + operator=(stream&& other) + { + auto in = std::make_shared( + other.in_->ios, other.in_->fc); + in_ = std::move(other.in_); + out_ = std::move(other.out_); + other.in_ = in; + return *this; } /// Constructor explicit - stream( - boost::asio::io_service& ios) - : impl_(std::make_shared< - detail::stream_impl>(ios, nullptr)) - , in_(&impl_->s0_) - , out_(&impl_->s1_) + stream(boost::asio::io_service& ios) + : in_(std::make_shared(ios, nullptr)) { } /// Constructor - explicit - stream( - boost::asio::io_service& ios0, - boost::asio::io_service& ios1) - : impl_(std::make_shared< - detail::stream_impl>(ios0, ios1)) - , in_(&impl_->s0_) - , out_(&impl_->s1_) - { - } - - /// Constructor - explicit stream( boost::asio::io_service& ios, fail_counter& fc) - : impl_(std::make_shared< - detail::stream_impl>(ios, &fc)) - , in_(&impl_->s0_) - , out_(&impl_->s1_) + : in_(std::make_shared(ios, &fc)) { } @@ -324,10 +169,7 @@ public: stream( boost::asio::io_service& ios, string_view s) - : impl_(std::make_shared< - detail::stream_impl>(ios, nullptr)) - , in_(&impl_->s0_) - , out_(&impl_->s1_) + : in_(std::make_shared(ios, nullptr)) { using boost::asio::buffer; using boost::asio::buffer_copy; @@ -341,10 +183,7 @@ public: boost::asio::io_service& ios, fail_counter& fc, string_view s) - : impl_(std::make_shared< - detail::stream_impl>(ios, &fc)) - , in_(&impl_->s0_) - , out_(&impl_->s1_) + : in_(std::make_shared(ios, &fc)) { using boost::asio::buffer; using boost::asio::buffer_copy; @@ -353,15 +192,17 @@ public: buffer(s.data(), s.size()))); } - /// Return the other end of the connection - stream - remote() + /// Establish a connection + void + connect(stream& remote) { - BOOST_ASSERT(in_ == &impl_->s0_); - return stream{impl_}; + BOOST_ASSERT(! out_.lock()); + BOOST_ASSERT(! remote.out_.lock()); + out_ = remote.in_; + remote.out_ = in_; } - /// Return the `io_service` associated with the object + /// Return the `io_service` associated with the stream boost::asio::io_service& get_io_service() { @@ -379,7 +220,7 @@ public: void write_size(std::size_t n) { - out_->write_max = n; + in_->write_max = n; } /// Direct input buffer access @@ -402,7 +243,7 @@ public: /// Appends a string to the pending input data void - str(string_view s) + append(string_view s) { using boost::asio::buffer; using boost::asio::buffer_copy; @@ -431,18 +272,25 @@ public: std::size_t nwrite() const { - return out_->nwrite; + return in_->nwrite; } /** Close the stream. - The other end of the pipe will see `error::eof` - after reading all the data from the buffer. + The other end of the connection will see + `error::eof` after reading all the remaining data. */ - template void close(); + /** Close the other end of the stream. + + This end of the connection will see + `error::eof` after reading all the remaining data. + */ + void + close_remote(); + template std::size_t read_some(MutableBufferSequence const& buffers); @@ -487,6 +335,36 @@ public: //------------------------------------------------------------------------------ +inline +void +stream:: +close() +{ + BOOST_ASSERT(! in_->op); + auto out = out_.lock(); + if(! out) + return; + std::lock_guard lock{out->m}; + if(out->code == status::ok) + { + out->code = status::eof; + out->on_write(); + } +} + +inline +void +stream:: +close_remote() +{ + std::lock_guard lock{in_->m}; + if(in_->code == status::ok) + { + in_->code = status::eof; + in_->on_write(); + } +} + template std::size_t stream:: @@ -597,8 +475,7 @@ async_read_some( } else { - in_->op.reset(new - detail::stream_impl::read_op_implop.reset(new read_op_impl, MutableBufferSequence>{*in_, buffers, init.completion_handler}); @@ -615,7 +492,6 @@ write_some(ConstBufferSequence const& buffers) static_assert(is_const_buffer_sequence< ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); - BOOST_ASSERT(out_->code == status::ok); error_code ec; auto const bytes_transferred = write_some(buffers, ec); @@ -635,18 +511,24 @@ write_some( "ConstBufferSequence requirements not met"); using boost::asio::buffer_copy; using boost::asio::buffer_size; - BOOST_ASSERT(out_->code == status::ok); + auto out = out_.lock(); + if(! out) + { + ec = boost::asio::error::connection_reset; + return 0; + } + BOOST_ASSERT(out->code == status::ok); if(in_->fc && in_->fc->fail(ec)) return 0; auto const n = (std::min)( - buffer_size(buffers), out_->write_max); - std::unique_lock lock{out_->m}; + buffer_size(buffers), in_->write_max); + std::unique_lock lock{out->m}; auto const bytes_transferred = - buffer_copy(out_->b.prepare(n), buffers); - out_->b.commit(bytes_transferred); - out_->on_write(); + buffer_copy(out->b.prepare(n), buffers); + out->b.commit(bytes_transferred); + out->on_write(); lock.unlock(); - ++out_->nwrite; + ++in_->nwrite; ec.assign(0, ec.category()); return bytes_transferred; } @@ -663,9 +545,14 @@ async_write_some(ConstBufferSequence const& buffers, "ConstBufferSequence requirements not met"); using boost::asio::buffer_copy; using boost::asio::buffer_size; - BOOST_ASSERT(out_->code == status::ok); async_completion init{handler}; + auto out = out_.lock(); + if(! out) + return in_->ios.post( + bind_handler(init.completion_handler, + boost::asio::error::connection_reset, 0)); + BOOST_ASSERT(out->code == status::ok); if(in_->fc) { error_code ec; @@ -674,14 +561,14 @@ async_write_some(ConstBufferSequence const& buffers, init.completion_handler, ec, 0)); } auto const n = - (std::min)(buffer_size(buffers), out_->write_max); - std::unique_lock lock{out_->m}; + (std::min)(buffer_size(buffers), in_->write_max); + std::unique_lock lock{out->m}; auto const bytes_transferred = - buffer_copy(out_->b.prepare(n), buffers); - out_->b.commit(bytes_transferred); - out_->on_write(); + buffer_copy(out->b.prepare(n), buffers); + out->b.commit(bytes_transferred); + out->on_write(); lock.unlock(); - ++out_->nwrite; + ++in_->nwrite; in_->ios.post(bind_handler(init.completion_handler, error_code{}, bytes_transferred)); return init.result.get(); @@ -689,8 +576,10 @@ async_write_some(ConstBufferSequence const& buffers, inline void -teardown(websocket::role_type, - stream& s, boost::system::error_code& ec) +teardown( + websocket::role_type, + stream& s, + boost::system::error_code& ec) { if(s.in_->fc) { @@ -707,8 +596,10 @@ teardown(websocket::role_type, template inline void -async_teardown(websocket::role_type, - stream& s, TeardownHandler&& handler) +async_teardown( + websocket::role_type, + stream& s, + TeardownHandler&& handler) { error_code ec; if(s.in_->fc && s.in_->fc->fail(ec)) @@ -719,18 +610,122 @@ async_teardown(websocket::role_type, bind_handler(std::move(handler), ec)); } -template -void -stream:: -close() +//------------------------------------------------------------------------------ + +template +class stream::read_op_impl : public stream::read_op { - BOOST_ASSERT(! in_->op); - std::lock_guard lock{out_->m}; - if(out_->code == status::ok) + class lambda + { + state& s_; + Buffers b_; + Handler h_; + boost::optional< + boost::asio::io_service::work> work_; + + public: + lambda(lambda&&) = default; + lambda(lambda const&) = default; + + lambda(state& s, Buffers const& b, Handler&& h) + : s_(s) + , b_(b) + , h_(std::move(h)) + , work_(s_.ios) + { + } + + lambda(state& s, Buffers const& b, Handler const& h) + : s_(s) + , b_(b) + , h_(h) + , work_(s_.ios) + { + } + + void + post() + { + s_.ios.post(std::move(*this)); + work_ = boost::none; + } + + void + operator()() + { + using boost::asio::buffer_copy; + using boost::asio::buffer_size; + std::unique_lock lock{s_.m}; + BOOST_ASSERT(! s_.op); + if(s_.b.size() > 0) + { + auto const bytes_transferred = buffer_copy( + b_, s_.b.data(), s_.read_max); + s_.b.consume(bytes_transferred); + auto& s = s_; + Handler h{std::move(h_)}; + lock.unlock(); + ++s.nread; + s.ios.post(bind_handler(std::move(h), + error_code{}, bytes_transferred)); + } + else + { + BOOST_ASSERT(s_.code != status::ok); + auto& s = s_; + Handler h{std::move(h_)}; + lock.unlock(); + ++s.nread; + error_code ec; + if(s.code == status::eof) + ec = boost::asio::error::eof; + else if(s.code == status::reset) + ec = boost::asio::error::connection_reset; + s.ios.post(bind_handler(std::move(h), ec, 0)); + } + } + }; + + lambda fn_; + +public: + read_op_impl(state& s, Buffers const& b, Handler&& h) + : fn_(s, b, std::move(h)) { - out_->code = status::eof; - out_->on_write(); } + + read_op_impl(state& s, Buffers const& b, Handler const& h) + : fn_(s, b, h) + { + } + + void + operator()() override + { + fn_.post(); + } +}; + +/// Create and return a connected stream +inline +stream +connect(stream& to) +{ + stream from{to.get_io_service()}; + from.connect(to); + return from; +} + +/// Create and return a connected stream +template +stream +connect(stream& to, Arg1&& arg1, ArgN&&... argn) +{ + stream from{ + std::forward(arg1), + std::forward(argn)...}; + from.connect(to); + return from; } } // test