diff --git a/CHANGELOG.md b/CHANGELOG.md index 12892c8f..5c11f1ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Version 110: * Refactor stream open state variable * Refactor websocket stream members +* Refactor websocket stream: fixes and tests -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index ea1cfc7f..5a1191f1 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -22,12 +22,12 @@ #include #include +#include + namespace boost { namespace beast { namespace websocket { -//------------------------------------------------------------------------------ - /* Close the WebSocket Connection This composed operation sends the close frame if it hasn't already @@ -46,6 +46,7 @@ class stream::close_op detail::frame_buffer fb; error_code ev; token tok; + bool cont; state( Handler&, @@ -78,7 +79,8 @@ public: void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0); + std::size_t bytes_transferred = 0, + bool cont = true); friend void* asio_handler_allocate( @@ -102,7 +104,7 @@ public: bool asio_handler_is_continuation(close_op* op) { using boost::asio::asio_handler_is_continuation; - return asio_handler_is_continuation( + return op->d_->cont || asio_handler_is_continuation( std::addressof(op->d_.handler())); } @@ -120,11 +122,15 @@ template template void stream::close_op:: -operator()(error_code ec, std::size_t bytes_transferred) +operator()( + error_code ec, + std::size_t bytes_transferred, + bool cont) { using beast::detail::clamp; auto& d = *d_; close_code code{}; + d.cont = cont; BOOST_ASIO_CORO_REENTER(*this) { // Maybe suspend @@ -134,14 +140,8 @@ operator()(error_code ec, std::size_t bytes_transferred) d.ws.wr_block_ = d.tok; // Make sure the stream is open - if(! d.ws.open_) - { - BOOST_ASIO_CORO_YIELD - d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); + if(d.ws.check_fail(ec)) goto upcall; - } } else { @@ -160,24 +160,20 @@ operator()(error_code ec, std::size_t bytes_transferred) BOOST_ASSERT(d.ws.wr_block_ == d.tok); // Make sure the stream is open - if(! d.ws.open_) - { - ec = boost::asio::error::operation_aborted; + if(d.ws.check_fail(ec)) goto upcall; - } } - // Send close frame + // Can't call close twice BOOST_ASSERT(! d.ws.wr_close_); + + // Send close frame d.ws.wr_close_ = true; BOOST_ASIO_CORO_YIELD boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); - if(ec) - { - d.ws.open_ = false; + if(d.ws.check_fail(ec)) goto upcall; - } if(d.ws.rd_close_) { @@ -187,6 +183,7 @@ operator()(error_code ec, std::size_t bytes_transferred) goto teardown; } + // Maybe suspend if(! d.ws.rd_block_) { // Acquire the read block @@ -194,11 +191,6 @@ operator()(error_code ec, std::size_t bytes_transferred) } else { - // The read_op is currently running so it will see - // the close frame and call teardown. We will suspend - // to cause async_read to return error::closed, before - // we return error::success. - // Suspend BOOST_ASSERT(d.ws.rd_block_ != d.tok); BOOST_ASIO_CORO_YIELD @@ -213,12 +205,11 @@ operator()(error_code ec, std::size_t bytes_transferred) d.ws.get_io_service().post(std::move(*this)); BOOST_ASSERT(d.ws.rd_block_ == d.tok); - // Handle the stream closing while suspended - if(! d.ws.open_) - { - ec = boost::asio::error::operation_aborted; + // Make sure the stream is open + if(d.ws.check_fail(ec)) goto upcall; - } + + BOOST_ASSERT(! d.ws.rd_close_); } // Drain @@ -240,11 +231,8 @@ operator()(error_code ec, std::size_t bytes_transferred) d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, d.ws.rd_buf_.max_size())), std::move(*this)); - if(ec) - { - d.ws.open_ = false; + if(d.ws.check_fail(ec)) goto upcall; - } d.ws.rd_buf_.commit(bytes_transferred); } if(detail::is_control(d.ws.rd_fh_.op)) @@ -283,11 +271,8 @@ operator()(error_code ec, std::size_t bytes_transferred) d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, d.ws.rd_buf_.max_size())), std::move(*this)); - if(ec) - { - d.ws.open_ = false; + if(d.ws.check_fail(ec)) goto upcall; - } d.ws.rd_buf_.commit(bytes_transferred); } BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_); @@ -304,6 +289,7 @@ operator()(error_code ec, std::size_t bytes_transferred) async_teardown(d.ws.role_, d.ws.stream_, std::move(*this)); BOOST_ASSERT(d.ws.wr_block_ == d.tok); + BOOST_ASSERT(d.ws.open_); if(ec == boost::asio::error::eof) { // Rationale: @@ -317,15 +303,20 @@ operator()(error_code ec, std::size_t bytes_transferred) upcall: BOOST_ASSERT(d.ws.wr_block_ == d.tok); d.ws.wr_block_.reset(); - if(d.ws.rd_block_) + if(d.ws.rd_block_ == d.tok) { - BOOST_ASSERT(d.ws.rd_block_ = d.tok); d.ws.rd_block_.reset(); d.ws.paused_r_rd_.maybe_invoke(); } d.ws.paused_rd_.maybe_invoke() || d.ws.paused_ping_.maybe_invoke() || d.ws.paused_wr_.maybe_invoke(); + if(! d.cont) + { + auto& ws = d.ws; + return ws.stream_.get_io_service().post( + bind_handler(d_.release_handler(), ec)); + } d_.invoke(ec); } } @@ -353,12 +344,10 @@ close(close_reason const& cr, error_code& ec) static_assert(is_sync_stream::value, "SyncStream requirements not met"); using beast::detail::clamp; + ec.assign(0, ec.category()); // Make sure the stream is open - if(! open_) - { - ec = boost::asio::error::operation_aborted; + if(check_fail(ec)) return; - } // If rd_close_ is set then we already sent a close BOOST_ASSERT(! rd_close_); BOOST_ASSERT(! wr_close_); @@ -368,8 +357,7 @@ close(close_reason const& cr, error_code& ec) write_close(fb, cr); boost::asio::write(stream_, fb.data(), ec); } - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; // Drain the connection close_code code{}; @@ -387,8 +375,7 @@ close(close_reason const& cr, error_code& ec) stream_.read_some( rd_buf_.prepare(read_size(rd_buf_, rd_buf_.max_size())), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; rd_buf_.commit(bytes_transferred); } @@ -406,9 +393,11 @@ close(close_reason const& cr, error_code& ec) detail::mask_inplace(mb, rd_key_); detail::read_close(cr_, mb, code); if(code != close_code::none) + { // Protocol error return do_fail(close_code::none, error::failed, ec); + } rd_buf_.consume(clamp(rd_fh_.len)); break; } @@ -454,7 +443,8 @@ async_close(close_reason const& cr, CloseHandler&& handler) void(error_code)> init{handler}; close_op>{ - init.completion_handler, *this, cr}({}); + init.completion_handler, *this, cr}( + {}, 0, false); return init.result.get(); } diff --git a/include/boost/beast/websocket/impl/fail.ipp b/include/boost/beast/websocket/impl/fail.ipp deleted file mode 100644 index e0507075..00000000 --- a/include/boost/beast/websocket/impl/fail.ipp +++ /dev/null @@ -1,271 +0,0 @@ -// -// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/boostorg/beast -// - -#ifndef BOOST_BEAST_WEBSOCKET_IMPL_FAIL_IPP -#define BOOST_BEAST_WEBSOCKET_IMPL_FAIL_IPP - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace boost { -namespace beast { -namespace websocket { - -/* - This composed operation optionally sends a close frame, - then performs the teardown operation. -*/ -template -template -class stream::fail_op - : public boost::asio::coroutine -{ - struct state - { - stream& ws; - detail::frame_buffer fb; - std::uint16_t code; - error_code ev; - token tok; - - state( - Handler&, - stream& ws_, - std::uint16_t code_, - error_code ev_) - : ws(ws_) - , code(code_) - , ev(ev_) - , tok(ws.tok_.unique()) - { - } - }; - - handler_ptr d_; - -public: - fail_op(fail_op&&) = default; - fail_op(fail_op const&) = default; - - template - fail_op( - DeducedHandler&& h, - stream& ws, - std::uint16_t code, - error_code ev) - : d_(std::forward(h), - ws, code, ev) - { - } - - void operator()( - error_code ec = {}, - std::size_t bytes_transferred = 0); - - friend - void* asio_handler_allocate( - std::size_t size, fail_op* op) - { - using boost::asio::asio_handler_allocate; - return asio_handler_allocate( - size, std::addressof(op->d_.handler())); - } - - friend - void asio_handler_deallocate( - void* p, std::size_t size, fail_op* op) - { - using boost::asio::asio_handler_deallocate; - asio_handler_deallocate( - p, size, std::addressof(op->d_.handler())); - } - - friend - bool asio_handler_is_continuation(fail_op* op) - { - using boost::asio::asio_handler_is_continuation; - return asio_handler_is_continuation( - std::addressof(op->d_.handler())); - } - - template - friend - void asio_handler_invoke(Function&& f, fail_op* op) - { - using boost::asio::asio_handler_invoke; - asio_handler_invoke(f, - std::addressof(op->d_.handler())); - } -}; - -template -template -void -stream:: -fail_op:: -operator()(error_code ec, std::size_t) -{ - auto& d = *d_; - BOOST_ASIO_CORO_REENTER(*this) - { - // Maybe suspend - if(d.code != close_code::none && ! d.ws.wr_close_) - { - if(! d.ws.wr_block_) - { - // Acquire the write block - d.ws.wr_block_ = d.tok; - - // Make sure the stream is open - if(! d.ws.open_) - { - BOOST_ASIO_CORO_YIELD - d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); - goto upcall; - } - } - else - { - // Suspend - BOOST_ASSERT(d.ws.wr_block_ != d.tok); - BOOST_ASIO_CORO_YIELD - d.ws.paused_rd_.emplace(std::move(*this)); // VFALCO emplace to paused_rd_ - - // Acquire the write block - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = d.tok; - - // Resume - BOOST_ASIO_CORO_YIELD - d.ws.get_io_service().post(std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - - // Make sure the stream is open - if(! d.ws.open_) - { - ec = boost::asio::error::operation_aborted; - goto upcall; - } - } - - // Serialize close frame - d.ws.template write_close< - flat_static_buffer_base>( - d.fb, d.code); - // Send close frame - d.ws.wr_close_ = true; - BOOST_ASIO_CORO_YIELD - boost::asio::async_write( - d.ws.stream_, d.fb.data(), - std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.open_ = ! ec; - if(! d.ws.open_) - goto upcall; - } - // Teardown - //BOOST_ASSERT(d.ws.wr_block_ == d.tok); - using beast::websocket::async_teardown; - BOOST_ASIO_CORO_YIELD - async_teardown(d.ws.role_, - d.ws.stream_, std::move(*this)); - //BOOST_ASSERT(d.ws.wr_block_ == d.tok); - if(ec == boost::asio::error::eof) - { - // Rationale: - // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error - ec.assign(0, ec.category()); - } - if(! ec) - ec = d.ev; - d.ws.open_ = false; - upcall: - if(d.ws.wr_block_ == d.tok) - d.ws.wr_block_.reset(); - d_.invoke(ec); - } -} - -//------------------------------------------------------------------------------ - -/* _Fail the WebSocket Connection_ -*/ -template -void -stream:: -do_fail( - std::uint16_t code, // if set, send a close frame first - error_code ev, // error code to use upon success - error_code& ec) // set to the error, else set to ev -{ - BOOST_ASSERT(ev); - if(code != close_code::none && ! wr_close_) - { - wr_close_ = true; - detail::frame_buffer fb; - write_close< - flat_static_buffer_base>(fb, code); - boost::asio::write(stream_, fb.data(), ec); - open_ = ! ec; - if(! open_) - return; - } - using beast::websocket::teardown; - teardown(role_, stream_, ec); - if(ec == boost::asio::error::eof) - { - // Rationale: - // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error - ec.assign(0, ec.category()); - } - open_ = ! ec; - if(! open_) - return; - ec = ev; - open_ = false; -} - -/* _Fail the WebSocket Connection_ -*/ -template -template -void -stream:: -do_async_fail( - std::uint16_t code, // if set, send a close frame first - error_code ev, // error code to use upon success - Handler&& handler) -{ - fail_op::type>{ - std::forward(handler), - *this, - code, - ev}(); -} - -} // websocket -} // beast -} // boost - -#endif diff --git a/include/boost/beast/websocket/impl/ping.ipp b/include/boost/beast/websocket/impl/ping.ipp index 70eca861..591ee325 100644 --- a/include/boost/beast/websocket/impl/ping.ipp +++ b/include/boost/beast/websocket/impl/ping.ipp @@ -131,12 +131,11 @@ operator()(error_code ec, std::size_t) d.ws.wr_block_ = d.tok; // Make sure the stream is open - if(! d.ws.open_) + if(d.ws.check_fail(ec)) { BOOST_ASIO_CORO_YIELD d.ws.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); + bind_handler(std::move(*this), ec)); goto upcall; } } @@ -157,19 +156,16 @@ operator()(error_code ec, std::size_t) BOOST_ASSERT(d.ws.wr_block_ == d.tok); // Make sure the stream is open - if(! d.ws.open_) - { - ec = boost::asio::error::operation_aborted; + if(d.ws.check_fail(ec)) goto upcall; - } } // Send ping frame BOOST_ASIO_CORO_YIELD boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); - if(ec) - d.ws.open_ = false; + if(d.ws.check_fail(ec)) + goto upcall; upcall: BOOST_ASSERT(d.ws.wr_block_ == d.tok); @@ -199,16 +195,16 @@ void stream:: ping(ping_data const& payload, error_code& ec) { + ec.assign(0, ec.category()); // Make sure the stream is open - if(! open_) - { - ec = boost::asio::error::operation_aborted; + if(check_fail(ec)) return; - } detail::frame_buffer fb; write_ping( fb, detail::opcode::ping, payload); boost::asio::write(stream_, fb.data(), ec); + if(check_fail(ec)) + return; } template @@ -227,16 +223,16 @@ void stream:: pong(ping_data const& payload, error_code& ec) { + ec.assign(0, ec.category()); // Make sure the stream is open - if(! open_) - { - ec = boost::asio::error::operation_aborted; + if(check_fail(ec)) return; - } detail::frame_buffer fb; write_ping( fb, detail::opcode::pong, payload); boost::asio::write(stream_, fb.data(), ec); + if(check_fail(ec)) + return; } template diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index 113a14b8..c17b809f 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -51,8 +51,9 @@ class stream::read_some_op std::size_t bytes_written_ = 0; error_code ev_; token tok_; - bool dispatched_ = false; + close_code code_; bool did_read_ = false; + bool cont_; public: read_some_op(read_some_op&&) = default; @@ -67,6 +68,7 @@ public: , ws_(ws) , cb_(bs) , tok_(ws_.tok_.unique()) + , code_(close_code::none) { } @@ -78,7 +80,8 @@ public: void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0); + std::size_t bytes_transferred = 0, + bool cont = true); friend void* asio_handler_allocate( @@ -102,9 +105,8 @@ public: bool asio_handler_is_continuation(read_some_op* op) { using boost::asio::asio_handler_is_continuation; - return op->dispatched_ || - asio_handler_is_continuation( - std::addressof(op->h_)); + return op->cont_ || asio_handler_is_continuation( + std::addressof(op->h_)); } template @@ -124,13 +126,15 @@ stream:: read_some_op:: operator()( error_code ec, - std::size_t bytes_transferred) + std::size_t bytes_transferred, + bool cont) { using beast::detail::clamp; using boost::asio::buffer; using boost::asio::buffer_cast; using boost::asio::buffer_size; close_code code{}; + cont_ = cont; BOOST_ASIO_CORO_REENTER(*this) { // Maybe suspend @@ -140,17 +144,12 @@ operator()( ws_.rd_block_ = tok_; // Make sure the stream is open - if(! ws_.open_) - { - BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); + if(ws_.check_fail(ec)) goto upcall; - } } else { + do_suspend: // Suspend BOOST_ASSERT(ws_.rd_block_ != tok_); BOOST_ASIO_CORO_YIELD @@ -164,16 +163,13 @@ operator()( BOOST_ASIO_CORO_YIELD ws_.get_io_service().post(std::move(*this)); BOOST_ASSERT(ws_.rd_block_ == tok_); - dispatched_ = true; - // Handle the stream closing while suspended - if(! ws_.open_) - { - ec = boost::asio::error::operation_aborted; + // Make sure the stream is open + if(ws_.check_fail(ec)) goto upcall; - } } loop: + BOOST_ASSERT(ws_.rd_block_ == tok_); // See if we need to read a frame header. This // condition is structured to give the decompressor // a chance to emit the final empty deflate block @@ -188,19 +184,31 @@ operator()( if(code != close_code::none) { // _Fail the WebSocket Connection_ - ec = error::failed; + code_ = code; + ev_ = error::failed; goto close; } + BOOST_ASSERT(ws_.rd_block_ == tok_); BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some( ws_.rd_buf_.prepare(read_size( ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); - dispatched_ = true; - ws_.open_ = ! ec; - if(! ws_.open_) + BOOST_ASSERT(ws_.rd_block_ == tok_); + if(ws_.check_fail(ec)) goto upcall; ws_.rd_buf_.commit(bytes_transferred); + + // Allow a close operation to + // drain the connection if necessary. + BOOST_ASSERT(ws_.rd_block_ == tok_); + ws_.rd_block_.reset(); + if( ws_.paused_r_close_.maybe_invoke()) + { + BOOST_ASSERT(ws_.rd_block_); + goto do_suspend; + } + ws_.rd_block_ = tok_; } // Immediately apply the mask to the portion // of the buffer holding payload data. @@ -258,20 +266,18 @@ operator()( BOOST_ASIO_CORO_YIELD ws_.get_io_service().post(std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - dispatched_ = true; // Make sure the stream is open - if(! ws_.open_) - { - ws_.wr_block_.reset(); - ec = boost::asio::error::operation_aborted; + if(ws_.check_fail(ec)) goto upcall; - } // Ignore ping when closing if(ws_.wr_close_) { ws_.wr_block_.reset(); + ws_.paused_close_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke() || + ws_.paused_wr_.maybe_invoke(); goto loop; } } @@ -282,11 +288,12 @@ operator()( boost::asio::async_write(ws_.stream_, ws_.rd_fb_.data(), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - dispatched_ = true; - ws_.wr_block_.reset(); - ws_.open_ = ! ec; - if(! ws_.open_) + if(ws_.check_fail(ec)) goto upcall; + ws_.wr_block_.reset(); + ws_.paused_close_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke() || + ws_.paused_wr_.maybe_invoke(); goto loop; } // Handle pong frame @@ -319,7 +326,8 @@ operator()( if(code != close_code::none) { // _Fail the WebSocket Connection_ - ec = error::failed; + code_ = code; + ev_ = error::failed; goto close; } ws_.cr_ = cr; @@ -330,15 +338,15 @@ operator()( if(! ws_.wr_close_) { // _Start the WebSocket Closing Handshake_ - code = cr.code == close_code::none ? + code_ = cr.code == close_code::none ? close_code::normal : static_cast(cr.code); - ec = error::closed; + ev_ = error::closed; goto close; } // _Close the WebSocket Connection_ - code = close_code::none; - ec = error::closed; + code_ = close_code::none; + ev_ = error::closed; goto close; } } @@ -364,9 +372,7 @@ operator()( ws_.rd_buf_.prepare(read_size( ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); - dispatched_ = true; - ws_.open_ = ! ec; - if(! ws_.open_) + if(ws_.check_fail(ec)) goto upcall; ws_.rd_buf_.commit(bytes_transferred); if(ws_.rd_fh_.mask) @@ -390,8 +396,8 @@ operator()( ! ws_.rd_utf8_.finish())) { // _Fail the WebSocket Connection_ - code = close_code::bad_payload; - ec = error::failed; + code_ = close_code::bad_payload; + ev_ = error::failed; goto close; } } @@ -407,9 +413,7 @@ operator()( BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some(buffer_prefix( clamp(ws_.rd_remain_), cb_), std::move(*this)); - dispatched_ = true; - ws_.open_ = ! ec; - if(! ws_.open_) + if(ws_.check_fail(ec)) goto upcall; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffer_prefix( @@ -424,8 +428,8 @@ operator()( ! ws_.rd_utf8_.finish())) { // _Fail the WebSocket Connection_ - code = close_code::bad_payload; - ec = error::failed; + code_ = close_code::bad_payload; + ev_ = error::failed; goto close; } } @@ -434,7 +438,6 @@ operator()( } } ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin; - goto upcall; } else { @@ -453,8 +456,7 @@ operator()( ws_.rd_buf_.prepare(read_size( ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); - ws_.open_ = ! ec; - if(! ws_.open_) + if(ws_.check_fail(ec)) goto upcall; BOOST_ASSERT(bytes_transferred > 0); ws_.rd_buf_.commit(bytes_transferred); @@ -496,10 +498,8 @@ operator()( zs.next_in = empty_block; zs.avail_in = sizeof(empty_block); ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); - BOOST_ASSERT(! ec); - ws_.open_ = ! ec; - if(! ws_.open_) - break; + if(ws_.check_fail(ec)) + goto upcall; // VFALCO See: // https://github.com/madler/zlib/issues/280 BOOST_ASSERT(zs.total_out == 0); @@ -521,15 +521,14 @@ operator()( } ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); BOOST_ASSERT(ec != zlib::error::end_of_stream); - ws_.open_ = ! ec; - if(! ws_.open_) - break; + if(ws_.check_fail(ec)) + goto upcall; if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( ws_.rd_size_, zs.total_out, ws_.rd_msg_max_)) { // _Fail the WebSocket Connection_ - code = close_code::too_big; - ec = error::failed; + code_ = close_code::too_big; + ev_ = error::failed; goto close; } cb_.consume(zs.total_out); @@ -546,38 +545,100 @@ operator()( ws_.rd_done_ && ! ws_.rd_utf8_.finish())) { // _Fail the WebSocket Connection_ - code = close_code::bad_payload; - ec = error::failed; + code_ = close_code::bad_payload; + ev_ = error::failed; goto close; } } - goto upcall; } + goto upcall; close: - // Maybe send close frame, then teardown + if(! ws_.wr_block_) + { + // Acquire the write block + ws_.wr_block_ = tok_; + + // Make sure the stream is open + BOOST_ASSERT(ws_.open_); + } + else + { + // Suspend + BOOST_ASSERT(ws_.wr_block_ != tok_); + BOOST_ASIO_CORO_YIELD + ws_.paused_rd_.save(std::move(*this)); + + // Acquire the write block + BOOST_ASSERT(! ws_.wr_block_); + ws_.wr_block_ = tok_; + + // Resume + BOOST_ASIO_CORO_YIELD + ws_.get_io_service().post(std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + + // Make sure the stream is open + if(ws_.check_fail(ec)) + goto upcall; + } + + if(! ws_.wr_close_) + { + ws_.wr_close_ = true; + + // Serialize close frame + ws_.rd_fb_.reset(); + ws_.template write_close< + flat_static_buffer_base>( + ws_.rd_fb_, code_); + + // Send close frame + BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASIO_CORO_YIELD + boost::asio::async_write( + ws_.stream_, ws_.rd_fb_.data(), + std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + + // Make sure the stream is open + if(ws_.check_fail(ec)) + goto upcall; + } + + // Teardown + using beast::websocket::async_teardown; + BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD - ws_.do_async_fail(code, ec, std::move(*this)); - //BOOST_ASSERT(! ws_.wr_block_); - goto upcall; + async_teardown(ws_.role_, + ws_.stream_, std::move(*this)); + BOOST_ASSERT(ws_.wr_block_ == tok_); + if(ec == boost::asio::error::eof) + { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec.assign(0, ec.category()); + } + if(! ec) + ec = ev_; + ws_.open_ = false; upcall: BOOST_ASSERT(ws_.rd_block_ == tok_); ws_.rd_block_.reset(); ws_.paused_r_close_.maybe_invoke(); - ws_.paused_close_.maybe_invoke() || - ws_.paused_ping_.maybe_invoke() || - ws_.paused_wr_.maybe_invoke(); - if(! dispatched_) + if(ws_.wr_block_ == tok_) { - ws_.stream_.get_io_service().post( + ws_.wr_block_.reset(); + ws_.paused_close_.maybe_invoke() || + ws_.paused_ping_.maybe_invoke() || + ws_.paused_wr_.maybe_invoke(); + } + if(! cont_) + return ws_.stream_.get_io_service().post( bind_handler(std::move(h_), ec, bytes_written_)); - } - else - { - h_(ec, bytes_written_); - } + h_(ec, bytes_written_); } } @@ -693,7 +754,8 @@ operator()( } BOOST_ASIO_CORO_YIELD read_some_op{ - std::move(*this), ws_, *mb}(); + std::move(*this), ws_, *mb}( + {}, 0, true); if(ec) break; b_.commit(bytes_transferred); @@ -892,12 +954,10 @@ read_some( using boost::asio::buffer_size; close_code code{}; std::size_t bytes_written = 0; + ec.assign(0, ec.category()); // Make sure the stream is open - if(! open_) - { - ec = boost::asio::error::operation_aborted; + if(check_fail(ec)) return 0; - } loop: // See if we need to read a frame header. This // condition is structured to give the decompressor @@ -919,8 +979,7 @@ loop: rd_buf_.prepare(read_size( rd_buf_, rd_buf_.max_size())), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return bytes_written; rd_buf_.commit(bytes_transferred); } @@ -959,8 +1018,7 @@ loop: write_ping(fb, detail::opcode::pong, payload); boost::asio::write(stream_, fb.data(), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return bytes_written; goto loop; } @@ -1024,8 +1082,7 @@ loop: rd_buf_.commit(stream_.read_some( rd_buf_.prepare(read_size(rd_buf_, rd_buf_.max_size())), ec)); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return bytes_written; if(rd_fh_.mask) detail::mask_inplace( @@ -1068,8 +1125,7 @@ loop: auto const bytes_transferred = stream_.read_some(buffer_prefix( clamp(rd_remain_), buffers), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffer_prefix( @@ -1131,8 +1187,7 @@ loop: rd_buf_.prepare(read_size( rd_buf_, rd_buf_.max_size())), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); rd_buf_.commit(bytes_transferred); @@ -1162,8 +1217,7 @@ loop: zs.avail_in = sizeof(empty_block); pmd_->zi.write(zs, zlib::Flush::sync, ec); BOOST_ASSERT(! ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return bytes_written; // VFALCO See: // https://github.com/madler/zlib/issues/280 @@ -1186,8 +1240,7 @@ loop: } pmd_->zi.write(zs, zlib::Flush::sync, ec); BOOST_ASSERT(ec != zlib::error::end_of_stream); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return bytes_written; if(rd_msg_max_ && beast::detail::sum_exceeds( rd_size_, zs.total_out, rd_msg_max_)) @@ -1237,7 +1290,7 @@ async_read_some( read_some_op>{ init.completion_handler,*this, buffers}( - {}, 0); + {}, 0, false); return init.result.get(); } diff --git a/include/boost/beast/websocket/impl/stream.ipp b/include/boost/beast/websocket/impl/stream.ipp index b311d0b2..2527845a 100644 --- a/include/boost/beast/websocket/impl/stream.ipp +++ b/include/boost/beast/websocket/impl/stream.ipp @@ -440,9 +440,15 @@ write_close(DynamicBuffer& db, close_reason const& cr) fh.rsv3 = false; fh.len = cr.code == close_code::none ? 0 : 2 + cr.reason.size(); - fh.mask = role_ == role_type::client; - if(fh.mask) + if(role_ == role_type::client) + { + fh.mask = true; fh.key = wr_gen_(); + } + else + { + fh.mask = false; + } detail::write(db, fh); if(cr.code != close_code::none) { @@ -668,7 +674,41 @@ on_response(response_type const& res, open(role_type::client); } -//------------------------------------------------------------------------------ +// _Fail the WebSocket Connection_ +template +void +stream:: +do_fail( + std::uint16_t code, // if set, send a close frame first + error_code ev, // error code to use upon success + error_code& ec) // set to the error, else set to ev +{ + BOOST_ASSERT(ev); + if(code != close_code::none && ! wr_close_) + { + wr_close_ = true; + detail::frame_buffer fb; + write_close< + flat_static_buffer_base>(fb, code); + boost::asio::write(stream_, fb.data(), ec); + open_ = ! ec; + if(! open_) + return; + } + using beast::websocket::teardown; + teardown(role_, stream_, ec); + if(ec == boost::asio::error::eof) + { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec.assign(0, ec.category()); + } + open_ = ! ec; + if(! open_) + return; + ec = ev; + open_ = false; +} } // websocket } // beast diff --git a/include/boost/beast/websocket/impl/write.ipp b/include/boost/beast/websocket/impl/write.ipp index 52cf919e..01307a4e 100644 --- a/include/boost/beast/websocket/impl/write.ipp +++ b/include/boost/beast/websocket/impl/write.ipp @@ -49,6 +49,7 @@ class stream::write_some_op int how_; bool fin_; bool more_; + bool cont_; public: write_some_op(write_some_op&&) = default; @@ -74,17 +75,10 @@ public: return h_; } - void operator()( - error_code ec, - std::size_t bytes_transferred, - bool) - { - (*this)(ec, bytes_transferred); - } - void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0); + std::size_t bytes_transferred = 0, + bool cont = true); friend void* asio_handler_allocate( @@ -108,7 +102,7 @@ public: bool asio_handler_is_continuation(write_some_op* op) { using boost::asio::asio_handler_is_continuation; - return asio_handler_is_continuation( + return op->cont_ || asio_handler_is_continuation( std::addressof(op->h_)); } @@ -127,8 +121,10 @@ template void stream:: write_some_op:: -operator()(error_code ec, - std::size_t bytes_transferred) +operator()( + error_code ec, + std::size_t bytes_transferred, + bool cont) { using beast::detail::clamp; using boost::asio::buffer; @@ -145,7 +141,7 @@ operator()(error_code ec, }; std::size_t n; boost::asio::mutable_buffer b; - + cont_ = cont; BOOST_ASIO_CORO_REENTER(*this) { // Set up the outgoing frame header @@ -203,7 +199,6 @@ operator()(error_code ec, } } - do_maybe_suspend: // Maybe suspend if(! ws_.wr_block_) { @@ -211,17 +206,12 @@ operator()(error_code ec, ws_.wr_block_ = tok_; // Make sure the stream is open - if(! ws_.open_) - { - BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( - bind_handler(std::move(*this), - boost::asio::error::operation_aborted)); + if(ws_.check_fail(ec)) goto upcall; - } } else { + do_suspend: // Suspend BOOST_ASSERT(ws_.wr_block_ != tok_); BOOST_ASIO_CORO_YIELD @@ -237,11 +227,8 @@ operator()(error_code ec, BOOST_ASSERT(ws_.wr_block_ == tok_); // Make sure the stream is open - if(! ws_.open_) - { - ec = boost::asio::error::operation_aborted; + if(ws_.check_fail(ec)) goto upcall; - } } //------------------------------------------------------------------ @@ -261,8 +248,8 @@ operator()(error_code ec, buffer_cat(ws_.wr_fb_.data(), cb_), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ec) - ws_.open_ = false; + if(ws_.check_fail(ec)) + goto upcall; goto upcall; } @@ -288,13 +275,10 @@ operator()(error_code ec, clamp(fh_.len), cb_)), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ec) - { - ws_.open_ = false; + if(ws_.check_fail(ec)) goto upcall; - } if(remain_ == 0) - goto upcall; + break; cb_.consume( bytes_transferred - ws_.wr_fb_.size()); fh_.op = detail::opcode::cont; @@ -305,13 +289,12 @@ operator()(error_code ec, ws_.paused_rd_.maybe_invoke() || ws_.paused_ping_.maybe_invoke()) { - BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( - std::move(*this)); - goto do_maybe_suspend; + BOOST_ASSERT(ws_.wr_block_); + goto do_suspend; } ws_.wr_block_ = tok_; } + goto upcall; } //------------------------------------------------------------------ @@ -341,11 +324,8 @@ operator()(error_code ec, buffer(ws_.wr_buf_.get(), n)), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ec) - { - ws_.open_ = false; + if(ws_.check_fail(ec)) goto upcall; - } while(remain_ > 0) { cb_.consume(ws_.wr_buf_size_); @@ -362,11 +342,8 @@ operator()(error_code ec, buffer(ws_.wr_buf_.get(), n), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ec) - { - ws_.open_ = false; + if(ws_.check_fail(ec)) goto upcall; - } } goto upcall; } @@ -399,13 +376,10 @@ operator()(error_code ec, buffer(ws_.wr_buf_.get(), n)), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ec) - { - ws_.open_ = false; + if(ws_.check_fail(ec)) goto upcall; - } if(remain_ == 0) - goto upcall; + break; cb_.consume( bytes_transferred - ws_.wr_fb_.size()); fh_.op = detail::opcode::cont; @@ -416,13 +390,12 @@ operator()(error_code ec, ws_.paused_rd_.maybe_invoke() || ws_.paused_ping_.maybe_invoke()) { - BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( - std::move(*this)); - goto do_maybe_suspend; + BOOST_ASSERT(ws_.wr_block_); + goto do_suspend; } ws_.wr_block_ = tok_; } + goto upcall; } //------------------------------------------------------------------ @@ -435,15 +408,8 @@ operator()(error_code ec, ws_.wr_buf_size_); more_ = detail::deflate( ws_.pmd_->zo, b, cb_, fin_, ec); - ws_.open_ = ! ec; - if(! ws_.open_) - { - // Always dispatching is easiest - BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( - bind_handler(std::move(*this), ec)); + if(ws_.check_fail(ec)) goto upcall; - } n = buffer_size(b); if(n == 0) { @@ -452,14 +418,6 @@ operator()(error_code ec, // latency. BOOST_ASSERT(! fin_); BOOST_ASSERT(buffer_size(cb_) == 0); - - // We can skip the dispatch if the - // asynchronous initiation function is - // not on call stack but its hard to - // figure out so be safe and dispatch. - BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( - std::move(*this)); goto upcall; } if(fh_.mask) @@ -482,11 +440,8 @@ operator()(error_code ec, buffer_cat(ws_.wr_fb_.data(), mutable_buffers_1{b}), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ec) - { - ws_.open_ = false; + if(ws_.check_fail(ec)) goto upcall; - } if(more_) { fh_.op = detail::opcode::cont; @@ -498,10 +453,8 @@ operator()(error_code ec, ws_.paused_rd_.maybe_invoke() || ws_.paused_ping_.maybe_invoke()) { - BOOST_ASIO_CORO_YIELD - ws_.get_io_service().post( - std::move(*this)); - goto do_maybe_suspend; + BOOST_ASSERT(ws_.wr_block_); + goto do_suspend; } ws_.wr_block_ = tok_; } @@ -527,6 +480,9 @@ operator()(error_code ec, ws_.paused_close_.maybe_invoke() || ws_.paused_rd_.maybe_invoke() || ws_.paused_ping_.maybe_invoke(); + if(! cont_) + return ws_.stream_.get_io_service().post( + bind_handler(h_, ec)); h_(ec); } } @@ -566,12 +522,10 @@ write_some(bool fin, using boost::asio::buffer; using boost::asio::buffer_copy; using boost::asio::buffer_size; + ec.assign(0, ec.category()); // Make sure the stream is open - if(! open_) - { - ec = boost::asio::error::operation_aborted; + if(check_fail(ec)) return; - } detail::frame_header fh; if(! wr_cont_) { @@ -627,8 +581,7 @@ write_some(bool fin, wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; if(! more) break; @@ -641,9 +594,8 @@ write_some(bool fin, (role_ == role_type::server && pmd_config_.server_no_context_takeover))) pmd_->zo.reset(); - return; } - if(! fh.mask) + else if(! fh.mask) { if(! wr_frag_) { @@ -656,8 +608,7 @@ write_some(bool fin, wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), buffers), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; } else @@ -679,8 +630,7 @@ write_some(bool fin, boost::asio::write(stream_, buffer_cat(fh_buf.data(), buffer_prefix(n, cb)), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; if(remain == 0) break; @@ -688,9 +638,8 @@ write_some(bool fin, cb.consume(n); } } - return; } - if(! wr_frag_) + else if(! wr_frag_) { // mask, no autofrag fh.fin = fin; @@ -713,8 +662,7 @@ write_some(bool fin, wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; } while(remain > 0) @@ -726,12 +674,11 @@ write_some(bool fin, remain -= n; detail::mask_inplace(b, key); boost::asio::write(stream_, b, ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; } - return; } + else { // mask, autofrag BOOST_ASSERT(wr_buf_size_ != 0); @@ -755,15 +702,13 @@ write_some(bool fin, flat_static_buffer_base>(fh_buf, fh); boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); - open_ = ! ec; - if(! open_) + if(check_fail(ec)) return; if(remain == 0) break; fh.op = detail::opcode::cont; cb.consume(n); } - return; } } @@ -784,7 +729,7 @@ async_write_some(bool fin, void(error_code)> init{handler}; write_some_op>{init.completion_handler, - *this, fin, bs}(); + *this, fin, bs}({}, 0, false); return init.result.get(); } @@ -838,7 +783,7 @@ async_write( void(error_code)> init{handler}; write_some_op>{init.completion_handler, - *this, true, bs}(); + *this, true, bs}({}, 0, false); return init.result.get(); } diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index 58ddfecf..a5d2ee9e 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -150,7 +151,7 @@ class stream struct pmd_t { // `true` if current read message is compressed - bool rd_set; + bool rd_set = false; zlib::deflate_stream zo; zlib::inflate_stream zi; @@ -165,37 +166,50 @@ class stream std::size_t rd_msg_max_ // max message size = 16 * 1024 * 1024; - std::uint64_t rd_size_; // total size of current message so far - std::uint64_t rd_remain_; // message frame bytes left in current frame + std::uint64_t rd_size_ // total size of current message so far + = 0; + std::uint64_t rd_remain_ // message frame bytes left in current frame + = 0; detail::frame_header rd_fh_; // current frame header - detail::prepared_key rd_key_; // current stateful mask key + detail::prepared_key rd_key_ // current stateful mask key + = 0; detail::frame_buffer rd_fb_; // to write control frames (during reads) detail::utf8_checker rd_utf8_; // to validate utf8 static_buffer< +tcp_frame_size> rd_buf_; // buffer for reads - detail::opcode rd_op_; // current message binary or text - bool rd_cont_; // `true` if the next frame is a continuation - bool rd_done_; // set when a message is done - bool rd_close_; // did we read a close frame? + detail::opcode rd_op_ // current message binary or text + = detail::opcode::text; + bool rd_cont_ // `true` if the next frame is a continuation + = false; + bool rd_done_ // set when a message is done + = true; + bool rd_close_ // did we read a close frame? + = false; token rd_block_; // op currenly reading token tok_; // used to order asynchronous ops - role_type role_; // server or client + role_type role_ // server or client + = role_type::client; bool open_ // `true` if connected = false; token wr_block_; // op currenly writing - bool wr_close_; // did we write a close frame? - bool wr_cont_; // next write is a continuation - bool wr_frag_; // autofrag the current message + bool wr_close_ // did we write a close frame? + = false; + bool wr_cont_ // next write is a continuation + = false; + bool wr_frag_ // autofrag the current message + = false; bool wr_frag_opt_ // autofrag option setting = true; - bool wr_compress_; // compress current message + bool wr_compress_ // compress current message + = false; detail::opcode wr_opcode_ // message type = detail::opcode::text; std::unique_ptr< std::uint8_t[]> wr_buf_; // write buffer - std::size_t wr_buf_size_; // write buffer size (current message) + std::size_t wr_buf_size_ // write buffer size (current message) + = 0; std::size_t wr_buf_opt_ // write buffer size option setting = 4096; detail::fh_buffer wr_fb_; // header buffer used for writes @@ -344,6 +358,17 @@ public: // //-------------------------------------------------------------------------- + /** Returns `true` if the stream is open. + + The stream is open after a successful handshake, and when + no error has occurred. + */ + bool + is_open() const + { + return open_; + } + /** Returns `true` if the latest message data indicates binary. This function informs the caller of whether the last @@ -3368,6 +3393,22 @@ private: void reset(); void begin_msg(); + bool + check_fail(error_code& ec) + { + if(! open_) + { + ec = boost::asio::error::operation_aborted; + return true; + } + if(ec) + { + open_ = false; + return true; + } + return false; + } + template bool parse_fh(detail::frame_header& fh, @@ -3424,13 +3465,6 @@ private: std::uint16_t code, error_code ev, error_code& ec); - - template - void - do_async_fail( - std::uint16_t code, - error_code ev, - Handler&& handler); }; } // websocket @@ -3439,7 +3473,6 @@ private: #include #include -#include #include #include #include diff --git a/test/beast/websocket/CMakeLists.txt b/test/beast/websocket/CMakeLists.txt index 2ddb2ae5..726d2afb 100644 --- a/test/beast/websocket/CMakeLists.txt +++ b/test/beast/websocket/CMakeLists.txt @@ -20,7 +20,9 @@ add_executable (tests-beast-websocket accept.cpp close.cpp error.cpp + frame.cpp handshake.cpp + mask.cpp option.cpp ping.cpp read.cpp @@ -28,8 +30,6 @@ add_executable (tests-beast-websocket role.cpp stream.cpp teardown.cpp - frame.cpp - mask.cpp utf8_checker.cpp write.cpp ) diff --git a/test/beast/websocket/Jamfile b/test/beast/websocket/Jamfile index 5fa2d65f..c684983d 100644 --- a/test/beast/websocket/Jamfile +++ b/test/beast/websocket/Jamfile @@ -11,16 +11,16 @@ local SOURCES = accept.cpp close.cpp error.cpp + frame.cpp handshake.cpp + mask.cpp option.cpp ping.cpp read.cpp rfc6455.cpp + role.cpp stream.cpp teardown.cpp - frame.cpp - mask.cpp - role.cpp utf8_checker.cpp write.cpp ; diff --git a/test/beast/websocket/accept.cpp b/test/beast/websocket/accept.cpp index 187b4a16..bd1dbce4 100644 --- a/test/beast/websocket/accept.cpp +++ b/test/beast/websocket/accept.cpp @@ -51,7 +51,7 @@ public: }(); // request in stream - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -95,7 +95,7 @@ public: } // request in stream, decorator - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -141,7 +141,7 @@ public: } // request in buffers - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -183,7 +183,7 @@ public: } // request in buffers, decorator - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -228,7 +228,7 @@ public: } // request in buffers and stream - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -273,7 +273,7 @@ public: } // request in buffers and stream, decorator - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -320,7 +320,7 @@ public: } // request in message - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -337,7 +337,7 @@ public: }); // request in message, decorator - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -357,7 +357,7 @@ public: }); // request in message, close frame in stream - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); @@ -386,7 +386,7 @@ public: }); // failed handshake (missing Sec-WebSocket-Key) - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { stream ws{ts}; auto tr = connect(ws.next_layer()); diff --git a/test/beast/websocket/close.cpp b/test/beast/websocket/close.cpp index 7f523a10..86b61b03 100644 --- a/test/beast/websocket/close.cpp +++ b/test/beast/websocket/close.cpp @@ -144,109 +144,213 @@ public: { doTestClose(AsyncClient{yield}); }); - - // suspend on write - { - echo_server es{log}; - error_code ec; - boost::asio::io_service ios; - stream ws{ios}; - ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/", ec); - BEAST_EXPECTS(! ec, ec.message()); - std::size_t count = 0; - ws.async_ping("", - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - }); - BEAST_EXPECT(ws.wr_block_); - ws.async_close({}, - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - }); - ios.run(); - BEAST_EXPECT(count == 2); - } - - // suspend on read - { - echo_server es{log}; - error_code ec; - boost::asio::io_service ios; - stream ws{ios}; - ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/", ec); - BEAST_EXPECTS(! ec, ec.message()); - flat_buffer b; - std::size_t count = 0; - ws.async_read(b, - [&](error_code ec, std::size_t) - { - ++count; - BEAST_EXPECTS( - ec == error::closed, ec.message()); - }); - BEAST_EXPECT(ws.rd_block_); - ws.async_close({}, - [&](error_code ec) - { - ++count; - BEAST_EXPECTS( - ec == boost::asio::error::operation_aborted, - ec.message()); - }); - BEAST_EXPECT(ws.wr_close_); - ios.run(); - BEAST_EXPECT(count == 2); - } } void testCloseSuspend() { - echo_server es{log, kind::async}; - boost::asio::io_service ios; - stream ws{ios}; - ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/"); - - // Cause close to be received - es.async_close(); - - multi_buffer b; - std::size_t count = 0; - ws.async_read(b, - [&](error_code ec, std::size_t) - { - ++count; - BEAST_EXPECTS(ec == error::closed, - ec.message()); - }); - while(! ws.wr_block_) - ios.run_one(); - // try to close - ws.async_close("payload", - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(ec == boost::asio:: - error::operation_aborted, - ec.message()); - }); - static std::size_t constexpr limit = 100; - std::size_t n; - for(n = 0; n < limit; ++n) + // suspend on ping + doFailLoop([&](test::fail_counter& fc) { - if(count >= 2) - break; - ios.run_one(); - } - BEAST_EXPECT(n < limit); - ios.run(); + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + ws.async_ping("", + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + BEAST_EXPECT(count == 0); + ws.async_close({}, + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on write + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + ws.async_write(sbuf("*"), + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + BEAST_EXPECT(count == 0); + ws.async_close({}, + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read ping + message + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add a ping and message to the input + ws.next_layer().append(string_view{ + "\x89\x00" "\x81\x01*", 5}); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_close({}, + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read bad message + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add an invalid frame to the input + ws.next_layer().append(string_view{ + "\x09\x00", 2}); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec != error::failed) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_close({}, + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read close #1 + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add a close frame to the input + ws.next_layer().append(string_view{ + "\x88\x00", 2}); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec != error::closed) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_close({}, + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + } + + void + testContHook() + { + struct handler + { + void operator()(error_code) {} + }; + + stream ws{ios_}; + stream::close_op op{ + handler{}, ws, {}}; + using boost::asio::asio_handler_is_continuation; + asio_handler_is_continuation(&op); } void @@ -254,6 +358,7 @@ public: { testClose(); testCloseSuspend(); + testContHook(); } }; diff --git a/test/beast/websocket/handshake.cpp b/test/beast/websocket/handshake.cpp index 28b303f1..f319848f 100644 --- a/test/beast/websocket/handshake.cpp +++ b/test/beast/websocket/handshake.cpp @@ -44,7 +44,7 @@ public: }; // handshake - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { echo_server es{log}; ws_type ws{ts}; @@ -62,7 +62,7 @@ public: }); // handshake, response - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { echo_server es{log}; ws_type ws{ts}; @@ -82,7 +82,7 @@ public: }); // handshake, decorator - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { echo_server es{log}; ws_type ws{ts}; @@ -103,7 +103,7 @@ public: }); // handshake, response, decorator - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { echo_server es{log}; ws_type ws{ts}; diff --git a/test/beast/websocket/ping.cpp b/test/beast/websocket/ping.cpp index 109f13b4..fa33178d 100644 --- a/test/beast/websocket/ping.cpp +++ b/test/beast/websocket/ping.cpp @@ -124,68 +124,313 @@ public: void testPingSuspend() { - echo_server es{log, kind::async}; - boost::asio::io_service ios; - stream ws{ios}; - ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/"); - - // Cause close to be received - es.async_close(); - - multi_buffer b; - std::size_t count = 0; - // Read a close frame. - // Sends a close frame, blocking writes. - ws.async_read(b, - [&](error_code ec, std::size_t) - { - // Read should complete with error::closed - ++count; - BEAST_EXPECTS(ec == error::closed, - ec.message()); - // Pings after a close are aborted - ws.async_ping("", - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(ec == boost::asio:: - error::operation_aborted, - ec.message()); - }); - }); - if(! BEAST_EXPECT(run_until(ios, 100, - [&]{ return ws.wr_close_; }))) - return; - // Try to ping - ws.async_ping("payload", - [&](error_code ec) - { - // Pings after a close are aborted - ++count; - BEAST_EXPECTS(ec == boost::asio:: - error::operation_aborted, - ec.message()); - // Subsequent calls to close are aborted - ws.async_close({}, - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(ec == boost::asio:: - error::operation_aborted, - ec.message()); - }); - }); - static std::size_t constexpr limit = 100; - std::size_t n; - for(n = 0; n < limit; ++n) + // suspend on write + doFailLoop([&](test::fail_counter& fc) { - if(count >= 4) - break; - ios.run_one(); + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + ws.async_write(sbuf("Hello, world"), + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + BEAST_EXPECT(count == 0); + ws.async_ping({}, + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on close + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + ws.async_close({}, + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + BEAST_EXPECT(count == 0); + ws.async_ping({}, + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read ping + message + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add a ping and message to the input + ws.next_layer().append(string_view{ + "\x89\x00" "\x81\x01*", 5}); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_ping({}, + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read bad message + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add an invalid frame to the input + ws.next_layer().append(string_view{ + "\x09\x00", 2}); + + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec != error::failed) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_ping({}, + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read close #1 + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add a close frame to the input + ws.next_layer().append(string_view{ + "\x88\x00", 2}); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec != error::closed) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_ping({}, + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read close #2 + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log, kind::async}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // Cause close to be received + es.async_close(); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec != error::closed) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_ping({}, + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + { + echo_server es{log, kind::async}; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + + // Cause close to be received + es.async_close(); + + multi_buffer b; + std::size_t count = 0; + // Read a close frame. + // Sends a close frame, blocking writes. + ws.async_read(b, + [&](error_code ec, std::size_t) + { + // Read should complete with error::closed + ++count; + BEAST_EXPECTS(ec == error::closed, + ec.message()); + // Pings after a close are aborted + ws.async_ping("", + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(ec == boost::asio:: + error::operation_aborted, + ec.message()); + }); + }); + if(! BEAST_EXPECT(run_until(ios, 100, + [&]{ return ws.wr_close_; }))) + return; + // Try to ping + ws.async_ping("payload", + [&](error_code ec) + { + // Pings after a close are aborted + ++count; + BEAST_EXPECTS(ec == boost::asio:: + error::operation_aborted, + ec.message()); + // Subsequent calls to close are aborted + ws.async_close({}, + [&](error_code ec) + { + ++count; + BEAST_EXPECTS(ec == boost::asio:: + error::operation_aborted, + ec.message()); + }); + }); + static std::size_t constexpr limit = 100; + std::size_t n; + for(n = 0; n < limit; ++n) + { + if(count >= 4) + break; + ios.run_one(); + } + BEAST_EXPECT(n < limit); + ios.run(); } - BEAST_EXPECT(n < limit); - ios.run(); + } + + void + testContHook() + { + struct handler + { + void operator()(error_code) {} + }; + + stream ws{ios_}; + stream::ping_op op{ + handler{}, ws, detail::opcode::ping, {}}; + using boost::asio::asio_handler_is_continuation; + asio_handler_is_continuation(&op); } void @@ -193,6 +438,7 @@ public: { testPing(); testPingSuspend(); + testContHook(); } }; diff --git a/test/beast/websocket/read.cpp b/test/beast/websocket/read.cpp index ea34d2e8..dded2dfe 100644 --- a/test/beast/websocket/read.cpp +++ b/test/beast/websocket/read.cpp @@ -568,38 +568,83 @@ public: check(error::closed, "\x88\x06\xfc\x15utf8"); } + } + + void + testReadSuspend() + { + using boost::asio::buffer; // suspend on write + doFailLoop([&](test::fail_counter& fc) { echo_server es{log}; - error_code ec; boost::asio::io_service ios; - stream ws{ios}; + stream ws{ios, fc}; ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/", ec); - BEAST_EXPECTS(! ec, ec.message()); + ws.handshake("localhost", "/"); // insert a ping ws.next_layer().append(string_view( "\x89\x00", 2)); std::size_t count = 0; - multi_buffer b; std::string const s = "Hello, world"; + multi_buffer b; ws.async_read(b, [&](error_code ec, std::size_t) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); BEAST_EXPECT(to_string(b.data()) == s); }); + BEAST_EXPECT(ws.rd_block_); ws.async_write(buffer(s), [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); BEAST_EXPECT(ws.wr_block_); ios.run(); BEAST_EXPECT(count == 2); + }); + } + + void + testContHook() + { + { + struct handler + { + void operator()(error_code, std::size_t) {} + }; + + char buf[32]; + stream ws{ios_}; + stream::read_some_op< + boost::asio::mutable_buffers_1, + handler> op{handler{}, ws, + boost::asio::mutable_buffers_1{ + buf, sizeof(buf)}}; + using boost::asio::asio_handler_is_continuation; + asio_handler_is_continuation(&op); + } + { + struct handler + { + void operator()(error_code, std::size_t) {} + }; + + multi_buffer b; + stream ws{ios_}; + stream::read_op< + multi_buffer, handler> op{ + handler{}, ws, b, 32, true}; + using boost::asio::asio_handler_is_continuation; + asio_handler_is_continuation(&op); } } @@ -607,6 +652,8 @@ public: run() override { testRead(); + testReadSuspend(); + testContHook(); } }; diff --git a/test/beast/websocket/test.hpp b/test/beast/websocket/test.hpp index 6926d312..726eab90 100644 --- a/test/beast/websocket/test.hpp +++ b/test/beast/websocket/test.hpp @@ -58,6 +58,7 @@ public: test::stream ts_; std::thread t_; websocket::stream ws_; + bool close_ = false; public: explicit @@ -120,11 +121,18 @@ public: ios_.post( [&] { - ws_.async_close({}, - std::bind( - &echo_server::on_close, - this, - std::placeholders::_1)); + if(ws_.is_open()) + { + ws_.async_close({}, + std::bind( + &echo_server::on_close, + this, + std::placeholders::_1)); + } + else + { + close_ = true; + } }); } @@ -173,6 +181,7 @@ public: { if(ec) return fail(ec); + do_read(); } @@ -181,6 +190,16 @@ public: { if(ec) return fail(ec); + + if(close_) + { + return ws_.async_close({}, + std::bind( + &echo_server::on_close, + this, + std::placeholders::_1)); + } + do_read(); } @@ -241,21 +260,16 @@ public: template void - doTestLoop(Test const& f) + doFailLoop( + Test const& f, std::size_t limit = 200) { - // This number has to be high for the - // test that writes the large buffer. - static std::size_t constexpr limit = 1000; - std::size_t n; for(n = 0; n <= limit; ++n) { test::fail_counter fc{n}; - test::stream ts{ios_, fc}; try { - f(ts); - ts.close(); + f(fc); break; } catch(system_error const& se) @@ -264,16 +278,28 @@ public: se.code() == test::error::fail_error, se.code().message()); } - catch(std::exception const& e) - { - fail(e.what(), __FILE__, __LINE__); - } - ts.close(); - continue; } BEAST_EXPECT(n < limit); } + template + void + doStreamLoop(Test const& f) + { + // This number has to be high for the + // test that writes the large buffer. + static std::size_t constexpr limit = 1000; + + doFailLoop( + [&](test::fail_counter& fc) + { + test::stream ts{ios_, fc}; + f(ts); + ts.close(); + } + , limit); + } + template void doTest( @@ -422,6 +448,14 @@ public: return false; } + template + bool + run_until( + boost::asio::io_service& ios, Pred&& pred) + { + return run_until(ios, 100, pred); + } + inline std::string const& random_string() diff --git a/test/beast/websocket/write.cpp b/test/beast/websocket/write.cpp index 194fd6a3..43107bac 100644 --- a/test/beast/websocket/write.cpp +++ b/test/beast/websocket/write.cpp @@ -123,7 +123,7 @@ public: }); // nomask - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { echo_server es{log, kind::async_client}; ws_type ws{ts}; @@ -149,7 +149,7 @@ public: }); // nomask, autofrag - doTestLoop([&](test::stream& ts) + doStreamLoop([&](test::stream& ts) { echo_server es{log, kind::async_client}; ws_type ws{ts}; @@ -230,93 +230,160 @@ public: { doTestWrite(AsyncClient{yield}); }); + } - // suspend on write + void + testWriteSuspend() + { + using boost::asio::buffer; + + // suspend on ping + doFailLoop([&](test::fail_counter& fc) { echo_server es{log}; - error_code ec; boost::asio::io_service ios; - stream ws{ios}; + stream ws{ios, fc}; ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/", ec); - BEAST_EXPECTS(! ec, ec.message()); + ws.handshake("localhost", "/"); std::size_t count = 0; ws.async_ping("", [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); BEAST_EXPECT(ws.wr_block_); + BEAST_EXPECT(count == 0); ws.async_write(sbuf("*"), [&](error_code ec) { ++count; - BEAST_EXPECTS( - ec == boost::asio::error::operation_aborted, - ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); - ws.async_close({}, [&](error_code){}); + BEAST_EXPECT(count == 0); ios.run(); BEAST_EXPECT(count == 2); - } + }); - // suspend on write, nomask, frag + // suspend on close + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + ws.async_close({}, + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + BEAST_EXPECT(count == 0); + ws.async_write(sbuf("*"), + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on read ping + message + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add a ping and message to the input + ws.next_layer().append(string_view{ + "\x89\x00" "\x81\x01*", 5}); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + while(! ws.wr_block_) + { + ios.run_one(); + if(! BEAST_EXPECT(! ios.stopped())) + break; + } + BEAST_EXPECT(count == 0); + ws.async_write(sbuf("*"), + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 2); + }); + + // suspend on ping: nomask, nofrag + doFailLoop([&](test::fail_counter& fc) { echo_server es{log, kind::async_client}; - error_code ec; boost::asio::io_service ios; - stream ws{ios}; + stream ws{ios, fc}; ws.next_layer().connect(es.stream()); es.async_handshake(); - ws.accept(ec); - BEAST_EXPECTS(! ec, ec.message()); + ws.accept(); std::size_t count = 0; std::string const s(16384, '*'); - ws.auto_fragment(true); + ws.auto_fragment(false); ws.async_write(buffer(s), [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); BEAST_EXPECT(ws.wr_block_); ws.async_ping("", [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); ios.run(); - ios.reset(); BEAST_EXPECT(count == 2); - flat_buffer b; - ws.async_read(b, - [&](error_code ec, std::size_t) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - BEAST_EXPECT(to_string(b.data()) == s); - ws.async_close({}, - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - }); - }); - ios.run(); - BEAST_EXPECT(count == 4); - } + }); - // suspend on write, mask, frag + // suspend on ping: nomask, frag + doFailLoop([&](test::fail_counter& fc) { - echo_server es{log, kind::async}; - error_code ec; + echo_server es{log, kind::async_client}; boost::asio::io_service ios; - stream ws{ios}; + stream ws{ios, fc}; ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/", ec); - BEAST_EXPECTS(! ec, ec.message()); + es.async_handshake(); + ws.accept(); std::size_t count = 0; std::string const s(16384, '*'); ws.auto_fragment(true); @@ -324,42 +391,93 @@ public: [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); BEAST_EXPECT(ws.wr_block_); ws.async_ping("", [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); ios.run(); - ios.reset(); BEAST_EXPECT(count == 2); - flat_buffer b; - ws.async_read(b, - [&](error_code ec, std::size_t) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - BEAST_EXPECT(to_string(b.data()) == s); - ws.async_close({}, - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - }); - }); - ios.run(); - BEAST_EXPECT(count == 4); - } + }); - // suspend on write, deflate + // suspend on ping: mask, nofrag + doFailLoop([&](test::fail_counter& fc) { - echo_server es{log, kind::async}; + echo_server es{log}; error_code ec; boost::asio::io_service ios; - stream ws{ios}; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + std::string const s(16384, '*'); + ws.auto_fragment(false); + ws.async_write(buffer(s), + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_ping("", + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + ios.run(); + }); + + // suspend on ping: mask, frag + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + error_code ec; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + std::string const s(16384, '*'); + ws.auto_fragment(true); + ws.async_write(buffer(s), + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_ping("", + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + ios.run(); + }); + + // suspend on ping: deflate + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log, kind::async}; + boost::asio::io_service ios; + stream ws{ios, fc}; { permessage_deflate pmd; pmd.client_enable = true; @@ -367,8 +485,7 @@ public: ws.set_option(pmd); } ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/", ec); - BEAST_EXPECTS(! ec, ec.message()); + ws.handshake("localhost", "/"); std::size_t count = 0; auto const& s = random_string(); ws.binary(true); @@ -376,130 +493,21 @@ public: [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); BEAST_EXPECT(ws.wr_block_); ws.async_ping("", [&](error_code ec) { ++count; - BEAST_EXPECTS(! ec, ec.message()); + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); }); ios.run(); - ios.reset(); - BEAST_EXPECT(count == 2); - flat_buffer b; - ws.async_read(b, - [&](error_code ec, std::size_t) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - BEAST_EXPECT(to_string(b.data()) == s); - ws.async_close({}, - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - }); - }); - ios.run(); - BEAST_EXPECT(count == 4); - } - } - - /* - https://github.com/boostorg/beast/issues/300 - - Write a message as two individual frames - */ - void - testIssue300() - { - for(int i = 0; i < 2; ++i ) - { - echo_server es{log, i==1 ? - kind::async : kind::sync}; - boost::asio::io_service ios; - stream ws{ios}; - ws.next_layer().connect(es.stream()); - - error_code ec; - ws.handshake("localhost", "/", ec); - if(! BEAST_EXPECTS(! ec, ec.message())) - return; - ws.write_some(false, sbuf("u")); - ws.write_some(true, sbuf("v")); - multi_buffer b; - ws.read(b, ec); - BEAST_EXPECTS(! ec, ec.message()); - } - } - - void - testWriteSuspend() - { - for(int i = 0; i < 2; ++i ) - { - echo_server es{log, i==1 ? - kind::async : kind::sync}; - boost::asio::io_service ios; - stream ws{ios}; - ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/"); - - // Make remote send a text message with bad utf8. - ws.binary(true); - put(ws.next_layer().buffer(), cbuf( - 0x03, 0xea, 0xf0, 0x28, 0x8c, 0xbc)); - multi_buffer b; - std::size_t count = 0; - // Read text message with bad utf8. - // Causes a close to be sent, blocking writes. - ws.async_read(b, - [&](error_code ec, std::size_t) - { - // Read should fail with protocol error - ++count; - BEAST_EXPECTS( - ec == error::failed, ec.message()); - // Reads after failure are aborted - ws.async_read(b, - [&](error_code ec, std::size_t) - { - ++count; - BEAST_EXPECTS(ec == boost::asio:: - error::operation_aborted, - ec.message()); - }); - }); - // Run until the read_op writes a close frame. - while(! ws.wr_block_) - ios.run_one(); - // Write a text message, leaving - // the write_op suspended as a pausation. - ws.async_write(sbuf("Hello"), - [&](error_code ec) - { - ++count; - // Send is canceled because close received. - BEAST_EXPECTS(ec == boost::asio:: - error::operation_aborted, - ec.message()); - // Writes after close are aborted. - ws.async_write(sbuf("World"), - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(ec == boost::asio:: - error::operation_aborted, - ec.message()); - }); - }); - // Run until all completions are delivered. - while(! ios.stopped()) - ios.run_one(); - BEAST_EXPECT(count == 4); - } + }); } void @@ -536,13 +544,61 @@ public: } } + /* + https://github.com/boostorg/beast/issues/300 + + Write a message as two individual frames + */ + void + testIssue300() + { + for(int i = 0; i < 2; ++i ) + { + echo_server es{log, i==1 ? + kind::async : kind::sync}; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + + error_code ec; + ws.handshake("localhost", "/", ec); + if(! BEAST_EXPECTS(! ec, ec.message())) + return; + ws.write_some(false, sbuf("u")); + ws.write_some(true, sbuf("v")); + multi_buffer b; + ws.read(b, ec); + BEAST_EXPECTS(! ec, ec.message()); + } + } + + void + testContHook() + { + struct handler + { + void operator()(error_code) {} + }; + + char buf[32]; + stream ws{ios_}; + stream::write_some_op< + boost::asio::const_buffers_1, + handler> op{handler{}, ws, true, + boost::asio::const_buffers_1{ + buf, sizeof(buf)}}; + using boost::asio::asio_handler_is_continuation; + asio_handler_is_continuation(&op); + } + void run() override { testWrite(); testWriteSuspend(); - testIssue300(); testAsyncWriteFrame(); + testIssue300(); + testContHook(); } };