From 96eff81ceaf4b79ed012369b6ec7e15159791adf Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Thu, 17 Aug 2017 22:38:50 -0700 Subject: [PATCH] websocket close fixes and tests --- CHANGELOG.md | 1 + include/boost/beast/websocket/impl/close.ipp | 17 +- include/boost/beast/websocket/impl/fail.ipp | 85 ++--- include/boost/beast/websocket/impl/read.ipp | 7 +- test/beast/websocket/stream.cpp | 311 +++++++++++++++--- .../include/boost/beast/test/stream.hpp | 7 +- 6 files changed, 341 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea925859..fd68db53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ Version 109: WebSocket: * Fix async_read_some handler signature +* websocket close fixes and tests API Changes: diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index 242e8f24..6fb6ba67 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -43,8 +43,9 @@ class stream::close_op struct state { stream& ws; - token tok; detail::frame_buffer fb; + error_code ev; + token tok; state( Handler&, @@ -201,7 +202,7 @@ operator()(error_code ec, std::size_t bytes_transferred) // Suspend BOOST_ASSERT(d.ws.rd_block_ != d.tok); BOOST_ASIO_CORO_YIELD - d.ws.rd_op_.emplace(std::move(*this)); + d.ws.r_close_op_.emplace(std::move(*this)); // Acquire the read block BOOST_ASSERT(! d.ws.rd_block_); @@ -230,7 +231,10 @@ operator()(error_code ec, std::size_t bytes_transferred) d.ws.rd_.fh, d.ws.rd_.buf, code)) { if(code != close_code::none) - break; + { + d.ev = error::failed; + goto teardown; + } BOOST_ASIO_CORO_YIELD d.ws.stream_.async_read_some( d.ws.rd_.buf.prepare(read_size(d.ws.rd_.buf, @@ -259,10 +263,11 @@ operator()(error_code ec, std::size_t bytes_transferred) if(code != close_code::none) { // Protocol error - goto upcall; + d.ev = error::failed; + goto teardown; } d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len)); - break; + goto teardown; } d.ws.rd_.buf.consume(clamp(d.ws.rd_.fh.len)); } @@ -305,6 +310,8 @@ operator()(error_code ec, std::size_t bytes_transferred) // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error ec.assign(0, ec.category()); } + if(! ec) + ec = d.ev; d.ws.failed_ = true; upcall: diff --git a/include/boost/beast/websocket/impl/fail.ipp b/include/boost/beast/websocket/impl/fail.ipp index 723c2f65..48f6572d 100644 --- a/include/boost/beast/websocket/impl/fail.ipp +++ b/include/boost/beast/websocket/impl/fail.ipp @@ -128,46 +128,47 @@ operator()(error_code ec, std::size_t) BOOST_ASIO_CORO_REENTER(*this) { // Maybe suspend - if(! d.ws.wr_block_) - { - // Acquire the write block - d.ws.wr_block_ = d.tok; - - // Make sure the stream is open - if(d.ws.failed_) - { - BOOST_ASIO_CORO_YIELD - d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); - goto upcall; - } - } - else - { - // Suspend - BOOST_ASSERT(d.ws.wr_block_ != d.tok); - BOOST_ASIO_CORO_YIELD - d.ws.rd_op_.emplace(std::move(*this)); // VFALCO emplace to rd_op_ - - // Acquire the write block - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = d.tok; - - // Resume - BOOST_ASIO_CORO_YIELD - d.ws.get_io_service().post(std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - - // Make sure the stream is open - if(d.ws.failed_) - { - ec = boost::asio::error::operation_aborted; - goto upcall; - } - } if(d.code != close_code::none && ! d.ws.wr_close_) { + if(! d.ws.wr_block_) + { + // Acquire the write block + d.ws.wr_block_ = d.tok; + + // Make sure the stream is open + if(d.ws.failed_) + { + BOOST_ASIO_CORO_YIELD + d.ws.get_io_service().post( + bind_handler(std::move(*this), + boost::asio::error::operation_aborted)); + goto upcall; + } + } + else + { + // Suspend + BOOST_ASSERT(d.ws.wr_block_ != d.tok); + BOOST_ASIO_CORO_YIELD + d.ws.rd_op_.emplace(std::move(*this)); // VFALCO emplace to rd_op_ + + // Acquire the write block + BOOST_ASSERT(! d.ws.wr_block_); + d.ws.wr_block_ = d.tok; + + // Resume + BOOST_ASIO_CORO_YIELD + d.ws.get_io_service().post(std::move(*this)); + BOOST_ASSERT(d.ws.wr_block_ == d.tok); + + // Make sure the stream is open + if(d.ws.failed_) + { + ec = boost::asio::error::operation_aborted; + goto upcall; + } + } + // Serialize close frame d.ws.template write_close< flat_static_buffer_base>( @@ -184,12 +185,12 @@ operator()(error_code ec, std::size_t) goto upcall; } // Teardown - BOOST_ASSERT(d.ws.wr_block_ == d.tok); + //BOOST_ASSERT(d.ws.wr_block_ == d.tok); using beast::websocket::async_teardown; BOOST_ASIO_CORO_YIELD async_teardown(d.ws.role_, d.ws.stream_, std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_ == d.tok); + //BOOST_ASSERT(d.ws.wr_block_ == d.tok); if(ec == boost::asio::error::eof) { // Rationale: @@ -200,8 +201,8 @@ operator()(error_code ec, std::size_t) ec = d.ev; d.ws.failed_ = true; upcall: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.wr_block_.reset(); + if(d.ws.wr_block_ == d.tok) + d.ws.wr_block_.reset(); d_.invoke(ec); } } diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index b809495a..d70f1fcf 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -49,9 +49,10 @@ class stream::read_some_op stream& ws_; consuming_buffers cb_; std::size_t bytes_written_ = 0; + error_code ev_; token tok_; - bool did_read_ = false; bool dispatched_ = false; + bool did_read_ = false; public: read_some_op(read_some_op&&) = default; @@ -553,11 +554,13 @@ operator()( } goto upcall; } + close: // Maybe send close frame, then teardown BOOST_ASIO_CORO_YIELD ws_.do_async_fail(code, ec, std::move(*this)); - BOOST_ASSERT(! ws_.wr_block_); + //BOOST_ASSERT(! ws_.wr_block_); + goto upcall; upcall: BOOST_ASSERT(ws_.rd_block_ == tok_); diff --git a/test/beast/websocket/stream.cpp b/test/beast/websocket/stream.cpp index a2f15d1f..aeeae2ad 100644 --- a/test/beast/websocket/stream.cpp +++ b/test/beast/websocket/stream.cpp @@ -154,6 +154,7 @@ public: log << "echoServer: " << e.what() << std::endl; } } + void launchEchoServer(test::stream stream) { @@ -381,6 +382,8 @@ public: return false; } + //-------------------------------------------------------------------------- + struct SyncClient { template @@ -519,6 +522,16 @@ public: return ws.read(buffer); } + template< + class NextLayer, class DynamicBuffer> + std::size_t + read_some(stream& ws, + std::size_t limit, + DynamicBuffer& buffer) const + { + return ws.read_some(buffer, limit); + } + template< class NextLayer, class MutableBufferSequence> std::size_t @@ -557,6 +570,8 @@ public: } }; + //-------------------------------------------------------------------------- + class AsyncClient { yield_context& yield_; @@ -757,6 +772,21 @@ public: return bytes_written; } + template< + class NextLayer, class DynamicBuffer> + std::size_t + read_some(stream& ws, + std::size_t limit, + DynamicBuffer& buffer) const + { + error_code ec; + auto const bytes_written = + ws.async_read_some(buffer, limit, yield_[ec]); + if(ec) + throw system_error{ec}; + return bytes_written; + } + template< class NextLayer, class MutableBufferSequence> std::size_t @@ -809,6 +839,8 @@ public: } }; + //-------------------------------------------------------------------------- + void testOptions() { @@ -1241,6 +1273,246 @@ public: //-------------------------------------------------------------------------- + template + void + doTestClose(Wrap const& w) + { + permessage_deflate pmd; + pmd.client_enable = false; + pmd.server_enable = false; + + auto const launch = + [&](test::stream stream) + { + launchEchoServerAsync(std::move(stream)); + }; + + // normal close + doTest(w, pmd, launch, + [&](ws_stream_type& ws) + { + w.close(ws, {}); + }); + + // double close + { + stream ws{ios_}; + launch(ws.next_layer().remote()); + w.handshake(ws, "localhost", "/"); + w.close(ws, {}); + try + { + w.close(ws, {}); + fail("", __FILE__, __LINE__); + } + catch(system_error const& se) + { + BEAST_EXPECTS( + se.code() == boost::asio::error::operation_aborted, + se.code().message()); + } + } + + // drain a message after close + doTest(w, pmd, launch, + [&](ws_stream_type& ws) + { + ws.next_layer().str("\x81\x01\x2a"); + w.close(ws, {}); + }); + + // drain a big message after close + { + std::string s; + s = "\x81\x7e\x10\x01"; + s.append(4097, '*'); + doTest(w, pmd, launch, + [&](ws_stream_type& ws) + { + ws.next_layer().str(s); + w.close(ws, {}); + }); + } + + // drain a ping after close + doTest(w, pmd, launch, + [&](ws_stream_type& ws) + { + ws.next_layer().str("\x89\x01*"); + w.close(ws, {}); + }); + + // drain invalid message frame after close + { + stream ws{ios_}; + launch(ws.next_layer().remote()); + w.handshake(ws, "localhost", "/"); + ws.next_layer().str("\x81\x81\xff\xff\xff\xff*"); + try + { + w.close(ws, {}); + fail("", __FILE__, __LINE__); + } + catch(system_error const& se) + { + BEAST_EXPECTS( + se.code() == error::failed, + se.code().message()); + } + } + + // drain invalid close frame after close + { + stream ws{ios_}; + launch(ws.next_layer().remote()); + w.handshake(ws, "localhost", "/"); + ws.next_layer().str("\x88\x01*"); + try + { + w.close(ws, {}); + fail("", __FILE__, __LINE__); + } + catch(system_error const& se) + { + BEAST_EXPECTS( + se.code() == error::failed, + se.code().message()); + } + } + + // close with incomplete read message + doTest(w, pmd, launch, + [&](ws_stream_type& ws) + { + ws.next_layer().str("\x81\x02**"); + static_buffer<1> b; + w.read_some(ws, 1, b); + w.close(ws, {}); + }); + } + + void + doTestCloseAsync() + { + auto const launch = + [&](test::stream stream) + { + launchEchoServer(std::move(stream)); + //launchEchoServerAsync(std::move(stream)); + }; + + // suspend on write + { + error_code ec; + boost::asio::io_service ios; + stream ws{ios, ios_}; + launch(ws.next_layer().remote()); + ws.handshake("localhost", "/", ec); + BEAST_EXPECTS(! ec, ec.message()); + std::size_t count = 0; + ws.async_ping("", + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_close({}, + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + }); + ios.run(); + BEAST_EXPECT(count == 2); + } + + // suspend on read + { + error_code ec; + boost::asio::io_service ios; + stream ws{ios, ios_}; + launch(ws.next_layer().remote()); + ws.handshake("localhost", "/", ec); + BEAST_EXPECTS(! ec, ec.message()); + flat_buffer b; + std::size_t count = 0; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + BEAST_EXPECTS( + ec == error::closed, ec.message()); + }); + BEAST_EXPECT(ws.rd_block_); + ws.async_close({}, + [&](error_code ec) + { + ++count; + BEAST_EXPECTS( + ec == boost::asio::error::operation_aborted, + ec.message()); + }); + BEAST_EXPECT(ws.wr_close_); + ios.run(); + BEAST_EXPECT(count == 2); + } + } + + void + testClose() + { + doTestClose(SyncClient{}); + + yield_to([&](yield_context yield) + { + doTestClose(AsyncClient{yield}); + }); + + doTestCloseAsync(); + } + + //-------------------------------------------------------------------------- + + void + testRead() + { + // Read close frames + { + auto const check = + [&](error_code ev, string_view s) + { + test::stream ts{ios_}; + stream ws{ts}; + launchEchoServerAsync(ts.remote()); + ws.handshake("localhost", "/"); + ts.str(s); + static_buffer<1> b; + error_code ec; + ws.read(b, ec); + BEAST_EXPECTS(ec == ev, ec.message()); + }; + + // payload length 1 + check(error::failed, + "\x88\x01\x01"); + + // invalid close code 1005 + check(error::failed, + "\x88\x02\x03\xed"); + + // invalid utf8 + check(error::failed, + "\x88\x06\xfc\x15\x0f\xd7\x73\x43"); + + // good utf8 + check(error::closed, + "\x88\x06\xfc\x15utf8"); + } + } + + //-------------------------------------------------------------------------- + template void doTestHandshake(Client const& c, Launch const& launch) @@ -1589,43 +1861,6 @@ public: } } - void - testClose() - { - auto const check = - [&](error_code ev, string_view s) - { - test::stream ts{ios_}; - stream ws{ts}; - launchEchoServerAsync(ts.remote()); - ws.handshake("localhost", "/"); - ts.str(s); - static_buffer<1> b; - error_code ec; - ws.read(b, ec); - BEAST_EXPECTS(ec == ev, ec.message()); - }; - - // payload length 1 - check(error::failed, - "\x88\x81\xff\xff\xff\xff\x00"); - - // invalid close code 1005 - check(error::failed, - "\x88\x82\xff\xff\xff\xff\xfc\x12"); - - // invalid utf8 - check(error::failed, - "\x88\x86\xff\xff\xff\xff\xfc\x15\x0f\xd7\x73\x43"); - - // VFALCO No idea why this fails - // good utf8 -#if 0 - check({}, - "\x88\x86\xff\xff\xff\xff\xfc\x15utf8"); -#endif - } - void testPausation1() { @@ -2130,6 +2365,8 @@ public: sizeof(websocket::stream) << std::endl; testAccept(); + testClose(); + testRead(); permessage_deflate pmd; pmd.client_enable = false; diff --git a/test/extras/include/boost/beast/test/stream.hpp b/test/extras/include/boost/beast/test/stream.hpp index 29db1d6d..c35700d0 100644 --- a/test/extras/include/boost/beast/test/stream.hpp +++ b/test/extras/include/boost/beast/test/stream.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -139,6 +140,8 @@ class stream_impl::read_op_impl : public stream_impl::read_op state& s_; Buffers b_; Handler h_; + boost::optional< + boost::asio::io_service::work> work_; public: lambda(lambda&&) = default; @@ -148,6 +151,7 @@ class stream_impl::read_op_impl : public stream_impl::read_op : s_(s) , b_(b) , h_(std::move(h)) + , work_(s_.ios) { } @@ -155,6 +159,7 @@ class stream_impl::read_op_impl : public stream_impl::read_op : s_(s) , b_(b) , h_(h) + , work_(s_.ios) { } @@ -162,6 +167,7 @@ class stream_impl::read_op_impl : public stream_impl::read_op post() { s_.ios.post(std::move(*this)); + work_ = boost::none; } void @@ -262,7 +268,6 @@ public: { if(! impl_) return; - BOOST_ASSERT(! in_->op); std::unique_lock lock{out_->m}; if(out_->code == status::ok) {