diff --git a/CHANGELOG.md b/CHANGELOG.md index 23813a7a..97201146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ WebSocket: * websocket handshake uses coroutine * websocket ping tests * Fix websocket close_op resume state +* websocket write tests API Changes: diff --git a/include/boost/beast/websocket/impl/stream.ipp b/include/boost/beast/websocket/impl/stream.ipp index e44dc0bf..c0550984 100644 --- a/include/boost/beast/websocket/impl/stream.ipp +++ b/include/boost/beast/websocket/impl/stream.ipp @@ -93,51 +93,12 @@ read_size_hint(DynamicBuffer& buffer) const { static_assert(is_dynamic_buffer::value, "DynamicBuffer requirements not met"); -#if 1 auto const initial_size = (std::min)( +tcp_frame_size, buffer.max_size() - buffer.size()); if(initial_size == 0) return 1; // buffer is full return read_size_hint(initial_size); - -#else - using beast::detail::clamp; - std::size_t result; - if(! pmd_ || (! rd_.done && ! pmd_->rd_set)) - { - // current message is uncompressed - - if(rd_.done) - { - // first message frame - auto const n = (std::min)( - buffer.max_size(), - (std::max)( - +tcp_frame_size, - buffer.capacity() - buffer.size())); - if(n > 0) - return n; - return 1; - } - - if(rd_.fh.fin) - { - BOOST_ASSERT(rd_.remain != 0); - return (std::min)( - buffer.max_size(), clamp(rd_.remain)); - } - } - return (std::min)( - buffer.max_size() - buffer.size(), - (std::max)((std::max)( - +tcp_frame_size, - clamp(rd_.remain)), - buffer.capacity() - buffer.size())); -done: - BOOST_ASSERT(result != 0); - return result; -#endif } template diff --git a/test/beast/websocket/stream.cpp b/test/beast/websocket/stream.cpp index 8360d9c1..bf67527a 100644 --- a/test/beast/websocket/stream.cpp +++ b/test/beast/websocket/stream.cpp @@ -15,9 +15,9 @@ #include #include #include +#include #include -#include -#include +#include namespace boost { namespace beast { @@ -38,11 +38,25 @@ public: //-------------------------------------------------------------------------- + enum class kind + { + sync, + async, + async_client + }; + class echo_server { + enum + { + buf_size = 20000 + }; + std::ostream& log_; boost::asio::io_service ios_; - static_buffer<2001> buffer_; + boost::optional< + boost::asio::io_service::work> work_; + static_buffer buffer_; test::stream ts_; std::thread t_; websocket::stream ws_; @@ -51,25 +65,37 @@ public: explicit echo_server( std::ostream& log, - bool async = false) + kind k = kind::sync) : log_(log) + , work_(ios_) , ts_(ios_) , ws_(ts_) { - if(async) + permessage_deflate pmd; + pmd.client_enable = true; + pmd.server_enable = true; + ws_.set_option(pmd); + + switch(k) { - do_async(); - } - else - { - t_ = std::thread{std::bind( - &echo_server::do_sync, - this, std::ref(ts_))}; + case kind::sync: + t_ = std::thread{[&]{ do_sync(); }}; + break; + + case kind::async: + t_ = std::thread{[&]{ ios_.run(); }}; + do_accept(); + break; + + case kind::async_client: + t_ = std::thread{[&]{ ios_.run(); }}; + break; } } ~echo_server() { + work_ = boost::none; t_.join(); } @@ -79,6 +105,16 @@ public: return ts_; } + void + async_handshake() + { + ws_.async_handshake("localhost", "/", + std::bind( + &echo_server::on_handshake, + this, + std::placeholders::_1)); + } + void async_close() { @@ -95,30 +131,28 @@ public: private: void - do_sync(test::stream& stream) + do_sync() { try { - websocket::stream ws{stream}; - permessage_deflate pmd; - pmd.client_enable = true; - pmd.server_enable = true; - ws.set_option(pmd); - ws.accept(); + ws_.accept(); for(;;) { - static_buffer<2001> buffer; - ws.read(buffer); - ws.text(ws.got_text()); - ws.write(buffer.data()); + ws_.read(buffer_); + ws_.text(ws_.got_text()); + ws_.write(buffer_.data()); + buffer_.consume(buffer_.size()); } } catch(system_error const& se) { + boost::ignore_unused(se); +#if 0 if( se.code() != error::closed && se.code() != error::failed && se.code() != boost::asio::error::eof) log_ << "echo_server: " << se.code().message() << std::endl; +#endif } catch(std::exception const& e) { @@ -127,20 +161,20 @@ public: } void - do_async() + do_accept() { - permessage_deflate pmd; - pmd.client_enable = true; - pmd.server_enable = true; - ws_.set_option(pmd); ws_.async_accept(std::bind( &echo_server::on_accept, this, std::placeholders::_1)); - t_ = std::thread{[&] - { - ios_.run(); - }}; + } + + void + on_handshake(error_code ec) + { + if(ec) + return fail(ec); + do_read(); } void @@ -193,13 +227,16 @@ public: void fail(error_code ec) { + boost::ignore_unused(ec); +#if 0 if( ec != error::closed && ec != error::failed && ec != boost::asio::error::eof) log_ << - "echo_server: " << + "echo_server_async: " << ec.message() << std::endl; +#endif } }; @@ -213,34 +250,31 @@ public: // test that writes the large buffer. static std::size_t constexpr limit = 1000; - for(int i = 0; i < 2; ++i) + std::size_t n; + for(n = 0; n <= limit; ++n) { - std::size_t n; - for(n = 0; n <= limit; ++n) + test::fail_counter fc{n}; + test::stream ts{ios_, fc}; + try { - test::fail_counter fc{n}; - test::stream ts{ios_, fc}; - try - { - f(ts); - ts.close(); - break; - } - catch(system_error const& se) - { - BEAST_EXPECTS( - se.code() == test::error::fail_error, - se.code().message()); - } - catch(std::exception const& e) - { - fail(e.what(), __FILE__, __LINE__); - } + f(ts); ts.close(); - continue; + break; } - BEAST_EXPECT(n < limit); + catch(system_error const& se) + { + BEAST_EXPECTS( + se.code() == test::error::fail_error, + se.code().message()); + } + catch(std::exception const& e) + { + fail(e.what(), __FILE__, __LINE__); + } + ts.close(); + continue; } + BEAST_EXPECT(n < limit); } template @@ -263,7 +297,8 @@ public: ws_type ws{ts}; ws.set_option(pmd); - echo_server es{log, i == 1};; + echo_server es{log, i==1 ? + kind::async : kind::sync}; error_code ec; ws.next_layer().connect(es.stream()); ws.handshake("localhost", "/", ec); @@ -892,27 +927,6 @@ public: } }; - //-------------------------------------------------------------------------- - - void - testOptions() - { - stream ws(ios_); - ws.auto_fragment(true); - ws.write_buffer_size(2048); - ws.binary(false); - ws.read_message_max(1 * 1024 * 1024); - try - { - ws.write_buffer_size(7); - fail(); - } - catch(std::exception const&) - { - pass(); - } - } - //-------------------------------------------------------------------------- // // Accept @@ -980,7 +994,6 @@ public: + big + "\r\n"}; auto tr = connect(ws.next_layer()); - error_code ec = test::error::fail_error; try { w.accept(ws); @@ -1978,9 +1991,117 @@ public: // //-------------------------------------------------------------------------- + static + std::string const& + random_string() + { + static std::string const s = [] + { + std::size_t constexpr N = 16384; + std::mt19937 mt{1}; + std::string tmp; + tmp.reserve(N); + for(std::size_t i = 0; i < N; ++ i) + tmp.push_back(static_cast( + std::uniform_int_distribution< + unsigned>{0, 255}(mt))); + return tmp; + }(); + return s; + } + + template + void + doTestRead(Wrap const& w) + { + permessage_deflate pmd; + pmd.client_enable = false; + pmd.server_enable = false; + + // Read close frames + { + // VFALCO What about asynchronous?? + + auto const check = + [&](error_code ev, string_view s) + { + echo_server es{log}; + stream ws{ios_}; + ws.next_layer().connect(es.stream()); + w.handshake(ws, "localhost", "/"); + ws.next_layer().append(s); + static_buffer<1> b; + error_code ec; + try + { + w.read(ws, b); + fail("", __FILE__, __LINE__); + } + catch(system_error const& se) + { + BEAST_EXPECTS(se.code() == ev, + se.code().message()); + } + ws.next_layer().close(); + }; + + // payload length 1 + check(error::failed, + "\x88\x01\x01"); + + // invalid close code 1005 + check(error::failed, + "\x88\x02\x03\xed"); + + // invalid utf8 + check(error::failed, + "\x88\x06\xfc\x15\x0f\xd7\x73\x43"); + + // good utf8 + check(error::closed, + "\x88\x06\xfc\x15utf8"); + } + + pmd.client_enable = true; + pmd.server_enable = true; + + // invalid inflate block + doTest(pmd, [&](ws_type& ws) + { + auto const& s = random_string(); + ws.binary(true); + ws.next_layer().append( + "\xc2\x40" + s.substr(0, 64)); + flat_buffer b; + try + { + w.read(ws, b); + } + catch(system_error const& se) + { + if(se.code() == test::error::fail_error) + throw; + BEAST_EXPECTS(se.code().category() == + zlib::detail::get_error_category(), + se.code().message()); + } + catch(...) + { + throw; + } + }); + } + void testRead() { + doTestRead(SyncClient{}); + + yield_to([&](yield_context yield) + { + doTestRead(AsyncClient{yield}); + }); + // Read close frames { auto const check = @@ -2017,60 +2138,391 @@ public: } //-------------------------------------------------------------------------- + // + // Write + // + //-------------------------------------------------------------------------- + + template + void + doTestWrite(Wrap const& w) + { + permessage_deflate pmd; + pmd.client_enable = false; + pmd.server_enable = false; + + // continuation + doTest(pmd, [&](ws_type& ws) + { + std::string const s = "Hello"; + std::size_t const chop = 3; + BOOST_ASSERT(chop < s.size()); + w.write_some(ws, false, + boost::asio::buffer(s.data(), chop)); + w.write_some(ws, true, boost::asio::buffer( + s.data() + chop, s.size() - chop)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + }); + + // mask + doTest(pmd, [&](ws_type& ws) + { + ws.auto_fragment(false); + std::string const s = "Hello"; + w.write(ws, boost::asio::buffer(s)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + }); + + // mask (large) + doTest(pmd, [&](ws_type& ws) + { + ws.auto_fragment(false); + ws.write_buffer_size(16); + std::string const s(32, '*'); + w.write(ws, boost::asio::buffer(s)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + }); + + // mask, autofrag + doTest(pmd, [&](ws_type& ws) + { + ws.auto_fragment(true); + std::string const s(16384, '*'); + w.write(ws, boost::asio::buffer(s)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + }); + + // nomask + doTestLoop([&](test::stream& ts) + { + echo_server es{log, kind::async_client}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); + try + { + es.async_handshake(); + w.accept(ws); + ws.auto_fragment(false); + std::string const s = "Hello"; + w.write(ws, boost::asio::buffer(s)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + w.close(ws, {}); + } + catch(...) + { + ts.close(); + throw; + } + ts.close(); + }); + + // nomask, autofrag + doTestLoop([&](test::stream& ts) + { + echo_server es{log, kind::async_client}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); + try + { + es.async_handshake(); + w.accept(ws); + ws.auto_fragment(true); + std::string const s(16384, '*'); + w.write(ws, boost::asio::buffer(s)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + w.close(ws, {}); + } + catch(...) + { + ts.close(); + throw; + } + ts.close(); + }); + + pmd.client_enable = true; + pmd.server_enable = true; + + // deflate + doTest(pmd, [&](ws_type& ws) + { + auto const& s = random_string(); + ws.binary(true); + w.write(ws, boost::asio::buffer(s)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + }); + + // deflate, continuation + doTest(pmd, [&](ws_type& ws) + { + std::string const s = "Hello"; + std::size_t const chop = 3; + BOOST_ASSERT(chop < s.size()); + // This call should produce no + // output due to compression latency. + w.write_some(ws, false, + boost::asio::buffer(s.data(), chop)); + w.write_some(ws, true, boost::asio::buffer( + s.data() + chop, s.size() - chop)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + }); + + // deflate, no context takeover + pmd.client_no_context_takeover = true; + doTest(pmd, [&](ws_type& ws) + { + auto const& s = random_string(); + ws.binary(true); + w.write(ws, boost::asio::buffer(s)); + flat_buffer b; + w.read(ws, b); + BEAST_EXPECT(to_string(b.data()) == s); + }); + } void - testMask(endpoint_type const& ep, - yield_context do_yield) + testWrite() { + doTestWrite(SyncClient{}); + + yield_to([&](yield_context yield) { - std::vector v; - for(char n = 0; n < 20; ++n) - { - error_code ec = test::error::fail_error; - socket_type sock(ios_); - sock.connect(ep, ec); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - stream ws(sock); - ws.handshake("localhost", "/", ec); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - ws.write(boost::asio::buffer(v), ec); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - multi_buffer b; - ws.read(b, ec); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - BEAST_EXPECT(to_string(b.data()) == - std::string(v.data(), v.size())); - v.push_back(n+1); - } + doTestWrite(AsyncClient{yield}); + }); + + // already closed + { + stream ws{ios_}; + error_code ec; + ws.write(sbuf(""), ec); + BEAST_EXPECTS( + ec == boost::asio::error::operation_aborted, + ec.message()); } + + // async, already closed { - std::vector v; - for(char n = 0; n < 20; ++n) + boost::asio::io_service ios; + stream ws{ios}; + ws.async_write(sbuf(""), + [&](error_code ec) + { + BEAST_EXPECTS( + ec == boost::asio::error::operation_aborted, + ec.message()); + }); + ios.run(); + } + + // suspend on write + { + echo_server es{log}; + error_code ec; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/", ec); + BEAST_EXPECTS(! ec, ec.message()); + std::size_t count = 0; + ws.async_ping("", + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_write(sbuf("*"), + [&](error_code ec) + { + ++count; + BEAST_EXPECTS( + ec == boost::asio::error::operation_aborted, + ec.message()); + }); + ws.async_close({}, [&](error_code){}); + ios.run(); + BEAST_EXPECT(count == 2); + } + + // suspend on write, nomask, frag + { + echo_server es{log, kind::async_client}; + error_code ec; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + es.async_handshake(); + ws.accept(ec); + BEAST_EXPECTS(! ec, ec.message()); + std::size_t count = 0; + std::string const s(16384, '*'); + ws.auto_fragment(true); + ws.async_write(boost::asio::buffer(s), + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_ping("", + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + ios.run(); + ios.reset(); + BEAST_EXPECT(count == 2); + flat_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + BEAST_EXPECT(to_string(b.data()) == s); + ws.async_close({}, + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + }); + ios.run(); + BEAST_EXPECT(count == 4); + } + + // suspend on write, mask, frag + { + echo_server es{log, kind::async}; + error_code ec; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/", ec); + BEAST_EXPECTS(! ec, ec.message()); + std::size_t count = 0; + std::string const s(16384, '*'); + ws.auto_fragment(true); + ws.async_write(boost::asio::buffer(s), + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_ping("", + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + ios.run(); + ios.reset(); + BEAST_EXPECT(count == 2); + flat_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + BEAST_EXPECT(to_string(b.data()) == s); + ws.async_close({}, + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + }); + ios.run(); + BEAST_EXPECT(count == 4); + } + + // suspend on write, deflate + { + echo_server es{log, kind::async}; + error_code ec; + boost::asio::io_service ios; + stream ws{ios}; { - error_code ec = test::error::fail_error; - socket_type sock(ios_); - sock.connect(ep, ec); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - stream ws(sock); - ws.handshake("localhost", "/", ec); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - ws.async_write(boost::asio::buffer(v), do_yield[ec]); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - multi_buffer b; - ws.async_read(b, do_yield[ec]); - if(! BEAST_EXPECTS(! ec, ec.message())) - break; - BEAST_EXPECT(to_string(b.data()) == - std::string(v.data(), v.size())); - v.push_back(n+1); + permessage_deflate pmd; + pmd.client_enable = true; + ws.set_option(pmd); } + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/", ec); + BEAST_EXPECTS(! ec, ec.message()); + std::size_t count = 0; + auto const& s = random_string(); + ws.binary(true); + ws.async_write(boost::asio::buffer(s), + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_ping("", + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + ios.run(); + ios.reset(); + BEAST_EXPECT(count == 2); + flat_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + BEAST_EXPECT(to_string(b.data()) == s); + ws.async_close({}, + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + }); + ios.run(); + BEAST_EXPECT(count == 4); + } + } + + //-------------------------------------------------------------------------- + + void + testOptions() + { + stream ws(ios_); + ws.auto_fragment(true); + ws.write_buffer_size(2048); + ws.binary(false); + ws.read_message_max(1 * 1024 * 1024); + try + { + ws.write_buffer_size(7); + fail(); + } + catch(std::exception const&) + { + pass(); } } @@ -2079,7 +2531,8 @@ public: { for(int i = 0; i < 2; ++i ) { - echo_server es{log, i==1}; + echo_server es{log, i==1 ? + kind::async : kind::sync}; boost::asio::io_service ios; stream ws{ios}; ws.next_layer().connect(es.stream()); @@ -2143,7 +2596,7 @@ public: void testPausation2() { - echo_server es{log, true}; + echo_server es{log, kind::async}; boost::asio::io_service ios; stream ws{ios}; ws.next_layer().connect(es.stream()); @@ -2210,7 +2663,7 @@ public: void testPausation3() { - echo_server es{log, true}; + echo_server es{log, kind::async}; boost::asio::io_service ios; stream ws{ios}; ws.next_layer().connect(es.stream()); @@ -2257,11 +2710,12 @@ public: Write a message as two individual frames */ void - testWriteFrames() + testIssue300() { for(int i = 0; i < 2; ++i ) { - echo_server es{log, i==1}; + echo_server es{log, i==1 ? + kind::async : kind::sync}; boost::asio::io_service ios; stream ws{ios}; ws.next_layer().connect(es.stream()); @@ -2285,7 +2739,8 @@ public: { for(;;) { - echo_server es{log, i==1}; + echo_server es{log, i==1 ? + kind::async : kind::sync}; boost::asio::io_service ios; stream ws{ios}; ws.next_layer().connect(es.stream()); @@ -2315,7 +2770,7 @@ public: template void - testStream(Wrap const& w, + doTestStream(Wrap const& w, permessage_deflate const& pmd) { using boost::asio::buffer; @@ -2579,33 +3034,32 @@ public: testHandshake(); testPing(); testRead(); - - permessage_deflate pmd; - pmd.client_enable = false; - pmd.server_enable = false; + testWrite(); testOptions(); testPausation1(); testPausation2(); testPausation3(); - testWriteFrames(); + testIssue300(); testAsyncWriteFrame(); - auto const doClientTests = + auto const testStream = [this](permessage_deflate const& pmd) { - testStream(SyncClient{}, pmd); + doTestStream(SyncClient{}, pmd); yield_to( [&](yield_context yield) { - testStream(AsyncClient{yield}, pmd); + doTestStream(AsyncClient{yield}, pmd); }); }; + permessage_deflate pmd; + pmd.client_enable = false; pmd.server_enable = false; - doClientTests(pmd); + testStream(pmd); pmd.client_enable = true; pmd.server_enable = true; @@ -2613,7 +3067,7 @@ public: pmd.client_no_context_takeover = false; pmd.compLevel = 1; pmd.memLevel = 1; - doClientTests(pmd); + testStream(pmd); pmd.client_enable = true; pmd.server_enable = true; @@ -2621,7 +3075,7 @@ public: pmd.client_no_context_takeover = true; pmd.compLevel = 1; pmd.memLevel = 1; - doClientTests(pmd); + testStream(pmd); } };