diff --git a/CHANGELOG.md b/CHANGELOG.md index 82a06fc1..2d443831 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ Version 111: WebSocket: * Fix utf8 check split code point at buffer end +* Refactor stream operations and tests plus coverage -------------------------------------------------------------------------------- diff --git a/include/boost/beast/websocket/detail/pausation.hpp b/include/boost/beast/websocket/detail/pausation.hpp index 3fe62ed4..c36ae86c 100644 --- a/include/boost/beast/websocket/detail/pausation.hpp +++ b/include/boost/beast/websocket/detail/pausation.hpp @@ -187,6 +187,12 @@ public: void save(F&& f); + explicit + operator bool() const + { + return base_ != nullptr; + } + bool maybe_invoke() { diff --git a/include/boost/beast/websocket/detail/pmd_extension.hpp b/include/boost/beast/websocket/detail/pmd_extension.hpp index e4bdaeed..cb540d1f 100644 --- a/include/boost/beast/websocket/detail/pmd_extension.hpp +++ b/include/boost/beast/websocket/detail/pmd_extension.hpp @@ -57,14 +57,17 @@ parse_bits(string_view s) return -1; if(s[0] < '1' || s[0] > '9') return -1; - int i = 0; + unsigned i = 0; for(auto c : s) { if(c < '0' || c > '9') return -1; + auto const i0 = i; i = 10 * i + (c - '0'); + if(i < i0) + return -1; } - return i; + return static_cast(i); } // Parse permessage-deflate request fields @@ -354,43 +357,6 @@ pmd_normalize(pmd_offer& offer) //-------------------------------------------------------------------- -// Decompress into a DynamicBuffer -// -template -void -inflate( - InflateStream& zi, - DynamicBuffer& buffer, - boost::asio::const_buffer const& in, - error_code& ec) -{ - using boost::asio::buffer_cast; - using boost::asio::buffer_size; - zlib::z_params zs; - zs.avail_in = buffer_size(in); - zs.next_in = buffer_cast(in); - for(;;) - { - // VFALCO we could be smarter about the size - auto const bs = buffer.prepare( - read_size_or_throw(buffer, 65536)); - auto const out = *bs.begin(); - zs.avail_out = buffer_size(out); - zs.next_out = buffer_cast(out); - zi.write(zs, zlib::Flush::sync, ec); - buffer.commit(zs.total_out); - zs.total_out = 0; - if( ec == zlib::error::need_buffers || - ec == zlib::error::end_of_stream) - { - ec.assign(0, ec.category()); - break; - } - if(ec) - return; - } -} - // Compress a buffer sequence // Returns: `true` if more calls are needed // diff --git a/include/boost/beast/websocket/impl/close.ipp b/include/boost/beast/websocket/impl/close.ipp index 5a1191f1..e098b031 100644 --- a/include/boost/beast/websocket/impl/close.ipp +++ b/include/boost/beast/websocket/impl/close.ipp @@ -22,8 +22,6 @@ #include #include -#include - namespace boost { namespace beast { namespace websocket { @@ -46,7 +44,7 @@ class stream::close_op detail::frame_buffer fb; error_code ev; token tok; - bool cont; + bool cont = false; state( Handler&, @@ -121,7 +119,8 @@ public: template template void -stream::close_op:: +stream:: +close_op:: operator()( error_code ec, std::size_t bytes_transferred, @@ -140,7 +139,7 @@ operator()( d.ws.wr_block_ = d.tok; // Make sure the stream is open - if(d.ws.check_fail(ec)) + if(! d.ws.check_open(ec)) goto upcall; } else @@ -160,29 +159,33 @@ operator()( BOOST_ASSERT(d.ws.wr_block_ == d.tok); // Make sure the stream is open - if(d.ws.check_fail(ec)) + if(! d.ws.check_open(ec)) goto upcall; } // Can't call close twice BOOST_ASSERT(! d.ws.wr_close_); + // Change status to closing + BOOST_ASSERT(d.ws.status_ == status::open); + d.ws.status_ = status::closing; + // Send close frame d.ws.wr_close_ = true; BOOST_ASIO_CORO_YIELD boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); - if(d.ws.check_fail(ec)) + if(! d.ws.check_ok(ec)) goto upcall; if(d.ws.rd_close_) { // This happens when the read_op gets a close frame - // at the same time we are sending the close frame. The - // read_op will be suspended on the write block. + // at the same time close_op is sending the close frame. + // The read_op will be suspended on the write block. goto teardown; } - + // Maybe suspend if(! d.ws.rd_block_) { @@ -206,7 +209,9 @@ operator()( BOOST_ASSERT(d.ws.rd_block_ == d.tok); // Make sure the stream is open - if(d.ws.check_fail(ec)) + BOOST_ASSERT(d.ws.status_ != status::open); + BOOST_ASSERT(d.ws.status_ != status::closed); + if( d.ws.status_ == status::failed) goto upcall; BOOST_ASSERT(! d.ws.rd_close_); @@ -231,7 +236,7 @@ operator()( d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, d.ws.rd_buf_.max_size())), std::move(*this)); - if(d.ws.check_fail(ec)) + if(! d.ws.check_ok(ec)) goto upcall; d.ws.rd_buf_.commit(bytes_transferred); } @@ -271,7 +276,7 @@ operator()( d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, d.ws.rd_buf_.max_size())), std::move(*this)); - if(d.ws.check_fail(ec)) + if(! d.ws.check_ok(ec)) goto upcall; d.ws.rd_buf_.commit(bytes_transferred); } @@ -289,7 +294,6 @@ operator()( async_teardown(d.ws.role_, d.ws.stream_, std::move(*this)); BOOST_ASSERT(d.ws.wr_block_ == d.tok); - BOOST_ASSERT(d.ws.open_); if(ec == boost::asio::error::eof) { // Rationale: @@ -298,7 +302,11 @@ operator()( } if(! ec) ec = d.ev; - d.ws.open_ = false; + if(ec) + d.ws.status_ = status::failed; + else + d.ws.status_ = status::closed; + d.ws.close(); upcall: BOOST_ASSERT(d.ws.wr_block_ == d.tok); @@ -346,7 +354,7 @@ close(close_reason const& cr, error_code& ec) using beast::detail::clamp; ec.assign(0, ec.category()); // Make sure the stream is open - if(check_fail(ec)) + if(! check_open(ec)) return; // If rd_close_ is set then we already sent a close BOOST_ASSERT(! rd_close_); @@ -357,8 +365,9 @@ close(close_reason const& cr, error_code& ec) write_close(fb, cr); boost::asio::write(stream_, fb.data(), ec); } - if(check_fail(ec)) + if(! check_ok(ec)) return; + status_ = status::closing; // Drain the connection close_code code{}; if(rd_remain_ > 0) @@ -375,7 +384,7 @@ close(close_reason const& cr, error_code& ec) stream_.read_some( rd_buf_.prepare(read_size(rd_buf_, rd_buf_.max_size())), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; rd_buf_.commit(bytes_transferred); } @@ -414,8 +423,7 @@ close(close_reason const& cr, error_code& ec) stream_.read_some( rd_buf_.prepare(read_size(rd_buf_, rd_buf_.max_size())), ec); - open_ = ! ec; - if(! open_) + if(! check_ok(ec)) return; rd_buf_.commit(bytes_transferred); } diff --git a/include/boost/beast/websocket/impl/ping.ipp b/include/boost/beast/websocket/impl/ping.ipp index 591ee325..8ff65280 100644 --- a/include/boost/beast/websocket/impl/ping.ipp +++ b/include/boost/beast/websocket/impl/ping.ipp @@ -131,7 +131,7 @@ operator()(error_code ec, std::size_t) d.ws.wr_block_ = d.tok; // Make sure the stream is open - if(d.ws.check_fail(ec)) + if(! d.ws.check_open(ec)) { BOOST_ASIO_CORO_YIELD d.ws.get_io_service().post( @@ -156,7 +156,7 @@ operator()(error_code ec, std::size_t) BOOST_ASSERT(d.ws.wr_block_ == d.tok); // Make sure the stream is open - if(d.ws.check_fail(ec)) + if(! d.ws.check_open(ec)) goto upcall; } @@ -164,7 +164,7 @@ operator()(error_code ec, std::size_t) BOOST_ASIO_CORO_YIELD boost::asio::async_write(d.ws.stream_, d.fb.data(), std::move(*this)); - if(d.ws.check_fail(ec)) + if(! d.ws.check_ok(ec)) goto upcall; upcall: @@ -195,15 +195,14 @@ void stream:: ping(ping_data const& payload, error_code& ec) { - ec.assign(0, ec.category()); // Make sure the stream is open - if(check_fail(ec)) + if(! check_open(ec)) return; detail::frame_buffer fb; write_ping( fb, detail::opcode::ping, payload); boost::asio::write(stream_, fb.data(), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; } @@ -223,15 +222,14 @@ void stream:: pong(ping_data const& payload, error_code& ec) { - ec.assign(0, ec.category()); // Make sure the stream is open - if(check_fail(ec)) + if(! check_open(ec)) return; detail::frame_buffer fb; write_ping( fb, detail::opcode::pong, payload); boost::asio::write(stream_, fb.data(), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; } diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index c17b809f..a9079c1f 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -53,7 +53,7 @@ class stream::read_some_op token tok_; close_code code_; bool did_read_ = false; - bool cont_; + bool cont_ = false; public: read_some_op(read_some_op&&) = default; @@ -138,14 +138,19 @@ operator()( BOOST_ASIO_CORO_REENTER(*this) { // Maybe suspend + do_maybe_suspend: if(! ws_.rd_block_) { // Acquire the read block ws_.rd_block_ = tok_; - // Make sure the stream is open - if(ws_.check_fail(ec)) + // Make sure the stream is not closed + if( ws_.status_ == status::closed || + ws_.status_ == status::failed) + { + ec = boost::asio::error::operation_aborted; goto upcall; + } } else { @@ -164,10 +169,18 @@ operator()( ws_.get_io_service().post(std::move(*this)); BOOST_ASSERT(ws_.rd_block_ == tok_); - // Make sure the stream is open - if(ws_.check_fail(ec)) - goto upcall; + // The only way to get read blocked is if + // a `close_op` wrote a close frame + BOOST_ASSERT(ws_.wr_close_); + BOOST_ASSERT(ws_.status_ != status::open); + ec = boost::asio::error::operation_aborted; + goto upcall; } + + // if status_ == status::closing, we want to suspend + // the read operation until the close completes, + // then finish the read with operation_aborted. + loop: BOOST_ASSERT(ws_.rd_block_ == tok_); // See if we need to read a frame header. This @@ -195,19 +208,21 @@ operator()( ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); BOOST_ASSERT(ws_.rd_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; ws_.rd_buf_.commit(bytes_transferred); - // Allow a close operation to - // drain the connection if necessary. + // Allow a close operation + // to acquire the read block BOOST_ASSERT(ws_.rd_block_ == tok_); ws_.rd_block_.reset(); if( ws_.paused_r_close_.maybe_invoke()) { + // Suspend BOOST_ASSERT(ws_.rd_block_); goto do_suspend; } + // Acquire read block ws_.rd_block_ = tok_; } // Immediately apply the mask to the portion @@ -236,7 +251,7 @@ operator()( detail::read_ping(payload, b); ws_.rd_buf_.consume(len); // Ignore ping when closing - if(ws_.wr_close_) + if(ws_.status_ == status::closing) goto loop; if(ws_.ctrl_cb_) ws_.ctrl_cb_(frame_type::ping, payload); @@ -245,6 +260,15 @@ operator()( flat_static_buffer_base>(ws_.rd_fb_, detail::opcode::pong, payload); } + + //BOOST_ASSERT(! ws_.paused_r_close_); + + // Allow a close operation + // to acquire the read block + BOOST_ASSERT(ws_.rd_block_ == tok_); + ws_.rd_block_.reset(); + ws_.paused_r_close_.maybe_invoke(); + // Maybe suspend if(! ws_.wr_block_) { @@ -268,18 +292,8 @@ operator()( BOOST_ASSERT(ws_.wr_block_ == tok_); // Make sure the stream is open - if(ws_.check_fail(ec)) + if(! ws_.check_open(ec)) goto upcall; - - // Ignore ping when closing - if(ws_.wr_close_) - { - ws_.wr_block_.reset(); - ws_.paused_close_.maybe_invoke() || - ws_.paused_ping_.maybe_invoke() || - ws_.paused_wr_.maybe_invoke(); - goto loop; - } } // Send pong @@ -288,13 +302,13 @@ operator()( boost::asio::async_write(ws_.stream_, ws_.rd_fb_.data(), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; ws_.wr_block_.reset(); ws_.paused_close_.maybe_invoke() || ws_.paused_ping_.maybe_invoke() || ws_.paused_wr_.maybe_invoke(); - goto loop; + goto do_maybe_suspend; } // Handle pong frame if(ws_.rd_fh_.op == detail::opcode::pong) @@ -335,17 +349,19 @@ operator()( if(ws_.ctrl_cb_) ws_.ctrl_cb_(frame_type::close, ws_.cr_.reason); - if(! ws_.wr_close_) + // See if we are already closing + if(ws_.status_ == status::closing) { - // _Start the WebSocket Closing Handshake_ - code_ = cr.code == close_code::none ? - close_code::normal : - static_cast(cr.code); + // _Close the WebSocket Connection_ + BOOST_ASSERT(ws_.wr_close_); + code_ = close_code::none; ev_ = error::closed; goto close; } - // _Close the WebSocket Connection_ - code_ = close_code::none; + // _Start the WebSocket Closing Handshake_ + code_ = cr.code == close_code::none ? + close_code::normal : + static_cast(cr.code); ev_ = error::closed; goto close; } @@ -372,7 +388,7 @@ operator()( ws_.rd_buf_.prepare(read_size( ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; ws_.rd_buf_.commit(bytes_transferred); if(ws_.rd_fh_.mask) @@ -413,7 +429,7 @@ operator()( BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some(buffer_prefix( clamp(ws_.rd_remain_), cb_), std::move(*this)); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffer_prefix( @@ -456,7 +472,7 @@ operator()( ws_.rd_buf_.prepare(read_size( ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; BOOST_ASSERT(bytes_transferred > 0); ws_.rd_buf_.commit(bytes_transferred); @@ -498,8 +514,7 @@ operator()( zs.next_in = empty_block; zs.avail_in = sizeof(empty_block); ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); - if(ws_.check_fail(ec)) - goto upcall; + BOOST_ASSERT(! ec); // VFALCO See: // https://github.com/madler/zlib/issues/280 BOOST_ASSERT(zs.total_out == 0); @@ -521,7 +536,7 @@ operator()( } ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); BOOST_ASSERT(ec != zlib::error::end_of_stream); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( ws_.rd_size_, zs.total_out, ws_.rd_msg_max_)) @@ -560,7 +575,7 @@ operator()( ws_.wr_block_ = tok_; // Make sure the stream is open - BOOST_ASSERT(ws_.open_); + BOOST_ASSERT(ws_.status_ == status::open); } else { @@ -579,10 +594,13 @@ operator()( BOOST_ASSERT(ws_.wr_block_ == tok_); // Make sure the stream is open - if(ws_.check_fail(ec)) + if(! ws_.check_open(ec)) goto upcall; } + // Set the status + ws_.status_ = status::closing; + if(! ws_.wr_close_) { ws_.wr_close_ = true; @@ -600,9 +618,7 @@ operator()( ws_.stream_, ws_.rd_fb_.data(), std::move(*this)); BOOST_ASSERT(ws_.wr_block_ == tok_); - - // Make sure the stream is open - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; } @@ -621,11 +637,15 @@ operator()( } if(! ec) ec = ev_; - ws_.open_ = false; + if(ec && ec != error::closed) + ws_.status_ = status::failed; + else + ws_.status_ = status::closed; + ws_.close(); upcall: - BOOST_ASSERT(ws_.rd_block_ == tok_); - ws_.rd_block_.reset(); + if(ws_.rd_block_ == tok_) + ws_.rd_block_.reset(); ws_.paused_r_close_.maybe_invoke(); if(ws_.wr_block_ == tok_) { @@ -755,7 +775,7 @@ operator()( BOOST_ASIO_CORO_YIELD read_some_op{ std::move(*this), ws_, *mb}( - {}, 0, true); + {}, 0, false); if(ec) break; b_.commit(bytes_transferred); @@ -956,7 +976,7 @@ read_some( std::size_t bytes_written = 0; ec.assign(0, ec.category()); // Make sure the stream is open - if(check_fail(ec)) + if(! check_open(ec)) return 0; loop: // See if we need to read a frame header. This @@ -979,7 +999,7 @@ loop: rd_buf_.prepare(read_size( rd_buf_, rd_buf_.max_size())), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return bytes_written; rd_buf_.commit(bytes_transferred); } @@ -1018,7 +1038,7 @@ loop: write_ping(fb, detail::opcode::pong, payload); boost::asio::write(stream_, fb.data(), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return bytes_written; goto loop; } @@ -1082,7 +1102,7 @@ loop: rd_buf_.commit(stream_.read_some( rd_buf_.prepare(read_size(rd_buf_, rd_buf_.max_size())), ec)); - if(check_fail(ec)) + if(! check_ok(ec)) return bytes_written; if(rd_fh_.mask) detail::mask_inplace( @@ -1125,7 +1145,7 @@ loop: auto const bytes_transferred = stream_.read_some(buffer_prefix( clamp(rd_remain_), buffers), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffer_prefix( @@ -1187,7 +1207,7 @@ loop: rd_buf_.prepare(read_size( rd_buf_, rd_buf_.max_size())), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); rd_buf_.commit(bytes_transferred); @@ -1217,8 +1237,6 @@ loop: zs.avail_in = sizeof(empty_block); pmd_->zi.write(zs, zlib::Flush::sync, ec); BOOST_ASSERT(! ec); - if(check_fail(ec)) - return bytes_written; // VFALCO See: // https://github.com/madler/zlib/issues/280 BOOST_ASSERT(zs.total_out == 0); @@ -1240,7 +1258,7 @@ loop: } pmd_->zi.write(zs, zlib::Flush::sync, ec); BOOST_ASSERT(ec != zlib::error::end_of_stream); - if(check_fail(ec)) + if(! check_ok(ec)) return bytes_written; if(rd_msg_max_ && beast::detail::sum_exceeds( rd_size_, zs.total_out, rd_msg_max_)) diff --git a/include/boost/beast/websocket/impl/rfc6455.ipp b/include/boost/beast/websocket/impl/rfc6455.ipp index f4151e87..59677248 100644 --- a/include/boost/beast/websocket/impl/rfc6455.ipp +++ b/include/boost/beast/websocket/impl/rfc6455.ipp @@ -30,8 +30,6 @@ is_upgrade(http::header:: reset() { - BOOST_ASSERT(! open_); - open_ = false; // VFALCO is this needed? + BOOST_ASSERT(status_ != status::open); rd_remain_ = 0; rd_cont_ = false; rd_done_ = true; @@ -593,8 +592,6 @@ build_response(http::request detail::sec_ws_key_type::max_size_n) return err("Invalid Sec-WebSocket-Key"); @@ -684,6 +681,7 @@ do_fail( error_code& ec) // set to the error, else set to ev { BOOST_ASSERT(ev); + status_ = status::closing; if(code != close_code::none && ! wr_close_) { wr_close_ = true; @@ -691,8 +689,7 @@ do_fail( write_close< flat_static_buffer_base>(fb, code); boost::asio::write(stream_, fb.data(), ec); - open_ = ! ec; - if(! open_) + if(! check_ok(ec)) return; } using beast::websocket::teardown; @@ -703,11 +700,13 @@ do_fail( // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error ec.assign(0, ec.category()); } - open_ = ! ec; - if(! open_) - return; - ec = ev; - open_ = false; + if(! ec) + ec = ev; + if(ec && ec != error::closed) + status_ = status::failed; + else + status_ = status::closed; + close(); } } // websocket diff --git a/include/boost/beast/websocket/impl/write.ipp b/include/boost/beast/websocket/impl/write.ipp index 01307a4e..86ff0075 100644 --- a/include/boost/beast/websocket/impl/write.ipp +++ b/include/boost/beast/websocket/impl/write.ipp @@ -49,7 +49,7 @@ class stream::write_some_op int how_; bool fin_; bool more_; - bool cont_; + bool cont_ = false; public: write_some_op(write_some_op&&) = default; @@ -206,7 +206,7 @@ operator()( ws_.wr_block_ = tok_; // Make sure the stream is open - if(ws_.check_fail(ec)) + if(! ws_.check_open(ec)) goto upcall; } else @@ -227,7 +227,7 @@ operator()( BOOST_ASSERT(ws_.wr_block_ == tok_); // Make sure the stream is open - if(ws_.check_fail(ec)) + if(! ws_.check_open(ec)) goto upcall; } @@ -242,13 +242,11 @@ operator()( ws_.wr_fb_, fh_); ws_.wr_cont_ = ! fin_; // Send frame - BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, buffer_cat(ws_.wr_fb_.data(), cb_), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; goto upcall; } @@ -267,15 +265,13 @@ operator()( ws_.wr_fb_, fh_); ws_.wr_cont_ = ! fin_; // Send frame - BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write( ws_.stream_, buffer_cat( ws_.wr_fb_.data(), buffer_prefix( clamp(fh_.len), cb_)), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; if(remain_ == 0) break; @@ -317,14 +313,12 @@ operator()( remain_ -= n; ws_.wr_cont_ = ! fin_; // Send frame header and partial payload - BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write( ws_.stream_, buffer_cat(ws_.wr_fb_.data(), buffer(ws_.wr_buf_.get(), n)), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; while(remain_ > 0) { @@ -336,13 +330,11 @@ operator()( ws_.wr_buf_.get(), n), key_); remain_ -= n; // Send partial payload - BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, buffer(ws_.wr_buf_.get(), n), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; } goto upcall; @@ -369,14 +361,12 @@ operator()( ws_.wr_fb_, fh_); ws_.wr_cont_ = ! fin_; // Send frame - BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, buffer_cat(ws_.wr_fb_.data(), buffer(ws_.wr_buf_.get(), n)), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; if(remain_ == 0) break; @@ -408,7 +398,7 @@ operator()( ws_.wr_buf_size_); more_ = detail::deflate( ws_.pmd_->zo, b, cb_, fin_, ec); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; n = buffer_size(b); if(n == 0) @@ -434,13 +424,11 @@ operator()( flat_static_buffer_base>(ws_.wr_fb_, fh_); ws_.wr_cont_ = ! fin_; // Send frame - BOOST_ASSERT(ws_.wr_block_ == tok_); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, buffer_cat(ws_.wr_fb_.data(), mutable_buffers_1{b}), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); - if(ws_.check_fail(ec)) + if(! ws_.check_ok(ec)) goto upcall; if(more_) { @@ -460,7 +448,6 @@ operator()( } else { - BOOST_ASSERT(ws_.wr_block_ == tok_); if(fh_.fin && ( (ws_.role_ == role_type::client && ws_.pmd_config_.client_no_context_takeover) || @@ -524,7 +511,7 @@ write_some(bool fin, using boost::asio::buffer_size; ec.assign(0, ec.category()); // Make sure the stream is open - if(check_fail(ec)) + if(! check_open(ec)) return; detail::frame_header fh; if(! wr_cont_) @@ -552,8 +539,7 @@ write_some(bool fin, wr_buf_.get(), wr_buf_size_); auto const more = detail::deflate( pmd_->zo, b, cb, fin, ec); - open_ = ! ec; - if(! open_) + if(! check_ok(ec)) return; auto const n = buffer_size(b); if(n == 0) @@ -581,7 +567,7 @@ write_some(bool fin, wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; if(! more) break; @@ -608,7 +594,7 @@ write_some(bool fin, wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), buffers), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; } else @@ -630,7 +616,7 @@ write_some(bool fin, boost::asio::write(stream_, buffer_cat(fh_buf.data(), buffer_prefix(n, cb)), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; if(remain == 0) break; @@ -662,7 +648,7 @@ write_some(bool fin, wr_cont_ = ! fin; boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; } while(remain > 0) @@ -674,7 +660,7 @@ write_some(bool fin, remain -= n; detail::mask_inplace(b, key); boost::asio::write(stream_, b, ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; } } @@ -702,7 +688,7 @@ write_some(bool fin, flat_static_buffer_base>(fh_buf, fh); boost::asio::write(stream_, buffer_cat(fh_buf.data(), b), ec); - if(check_fail(ec)) + if(! check_ok(ec)) return; if(remain == 0) break; diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index a5d2ee9e..cecd7f54 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -132,6 +132,9 @@ class stream struct op {}; + using control_cb_type = + std::function; + // tokens are used to order reads and writes class token { @@ -157,8 +160,13 @@ class stream zlib::inflate_stream zi; }; - using control_cb_type = - std::function; + enum class status + { + open, + closing, + closed, + failed + }; NextLayer stream_; // the wrapped stream close_reason cr_; // set from received close frame @@ -190,8 +198,8 @@ class stream token tok_; // used to order asynchronous ops role_type role_ // server or client = role_type::client; - bool open_ // `true` if connected - = false; + status status_ + = status::closed; token wr_block_; // op currenly writing bool wr_close_ // did we write a close frame? @@ -366,7 +374,7 @@ public: bool is_open() const { - return open_; + return status_ == status::open; } /** Returns `true` if the latest message data indicates binary. @@ -3394,19 +3402,27 @@ private: void begin_msg(); bool - check_fail(error_code& ec) + check_open(error_code& ec) { - if(! open_) + if(status_ != status::open) { ec = boost::asio::error::operation_aborted; - return true; + return false; } + ec.assign(0, ec.category()); + return true; + } + + bool + check_ok(error_code& ec) + { if(ec) { - open_ = false; - return true; + if(status_ != status::closed) + status_ = status::failed; + return false; } - return false; + return true; } template diff --git a/test/beast/websocket/accept.cpp b/test/beast/websocket/accept.cpp index bd1dbce4..ebf4f2bb 100644 --- a/test/beast/websocket/accept.cpp +++ b/test/beast/websocket/accept.cpp @@ -560,6 +560,35 @@ public: "Sec-WebSocket-Version: 13\r\n" "\r\n" ); + // oversize key + check(error::handshake_failed, + "GET / HTTP/1.1\r\n" + "Host: localhost:80\r\n" + "Upgrade: WebSocket\r\n" + "Connection: upgrade\r\n" + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQdGhlIHNhbXBsZSBub25jZQ==\r\n" + "Sec-WebSocket-Version: 13\r\n" + "\r\n" + ); + // bad version + check(error::handshake_failed, + "GET / HTTP/1.1\r\n" + "Host: localhost:80\r\n" + "Upgrade: WebSocket\r\n" + "Connection: upgrade\r\n" + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + "Sec-WebSocket-Version: 12\r\n" + "\r\n" + ); + // missing version + check(error::handshake_failed, + "GET / HTTP/1.1\r\n" + "Host: localhost:80\r\n" + "Upgrade: WebSocket\r\n" + "Connection: upgrade\r\n" + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + "\r\n" + ); // valid request check({}, "GET / HTTP/1.1\r\n" diff --git a/test/beast/websocket/close.cpp b/test/beast/websocket/close.cpp index 86b61b03..a0af5eaa 100644 --- a/test/beast/websocket/close.cpp +++ b/test/beast/websocket/close.cpp @@ -39,6 +39,14 @@ public: w.close(ws, close_code::going_away); }); + // close with code and reason + doTest(pmd, [&](ws_type& ws) + { + w.close(ws, { + close_code::going_away, + "going away"}); + }); + // already closed { echo_server es{log}; @@ -125,6 +133,17 @@ public: } } + // drain masked close frame + { + echo_server es{log, kind::async_client}; + stream ws{ios_}; + ws.next_layer().connect(es.stream()); + ws.set_option(pmd); + es.async_handshake(); + ws.accept(); + w.close(ws, {}); + } + // close with incomplete read message doTest(pmd, [&](ws_type& ws) { @@ -147,8 +166,10 @@ public: } void - testCloseSuspend() + testSuspend() { + using boost::asio::buffer; + // suspend on ping doFailLoop([&](test::fail_counter& fc) { @@ -270,10 +291,10 @@ public: ws.async_read(b, [&](error_code ec, std::size_t) { - ++count; if(ec != error::failed) BOOST_THROW_EXCEPTION( system_error{ec}); + BEAST_EXPECT(++count == 1); }); while(! ws.wr_block_) { @@ -285,10 +306,10 @@ public: ws.async_close({}, [&](error_code ec) { - ++count; if(ec != boost::asio::error::operation_aborted) BOOST_THROW_EXCEPTION( system_error{ec}); + BEAST_EXPECT(++count == 2); }); BEAST_EXPECT(count == 0); ios.run(); @@ -311,10 +332,10 @@ public: ws.async_read(b, [&](error_code ec, std::size_t) { - ++count; if(ec != error::closed) BOOST_THROW_EXCEPTION( system_error{ec}); + BEAST_EXPECT(++count == 1); }); while(! ws.wr_block_) { @@ -326,16 +347,256 @@ public: ws.async_close({}, [&](error_code ec) { - ++count; if(ec != boost::asio::error::operation_aborted) BOOST_THROW_EXCEPTION( system_error{ec}); + BEAST_EXPECT(++count == 2); }); BEAST_EXPECT(count == 0); ios.run(); BEAST_EXPECT(count == 2); }); + // teardown on received close + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add a close frame to the input + ws.next_layer().append(string_view{ + "\x88\x00", 2}); + std::size_t count = 0; + std::string const s = "Hello, world!"; + ws.async_write(buffer(s), + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 3); + }); + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 3); + }); + + // check for deadlock + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // add a ping frame to the input + ws.next_layer().append(string_view{ + "\x89\x00", 2}); + std::size_t count = 0; + multi_buffer b; + std::string const s = "Hello, world!"; + ws.async_write(buffer(s), + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 3); + }); + BEAST_EXPECT(ws.rd_block_); + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + BEAST_EXPECT(ws.is_open()); + BEAST_EXPECT(ws.wr_block_); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 3); + }); + + // Four-way: close, read, write, ping + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + std::string const s = "Hello, world!"; + multi_buffer b; + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + ++count; + }); + ws.async_write(buffer(s), + [&](error_code ec) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + ++count; + }); + ws.async_ping({}, + [&](error_code ec) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + ++count; + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 4); + }); + + // Four-way: read, write, ping, close + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + std::string const s = "Hello, world!"; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec && ec != boost::asio::error::operation_aborted) + { + BEAST_EXPECTS(ec, ec.message()); + BOOST_THROW_EXCEPTION( + system_error{ec}); + } + if(! ec) + BEAST_EXPECT(to_string(b.data()) == s); + ++count; + }); + ws.async_write(buffer(s), + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + ws.async_ping({}, + [&](error_code ec) + { + if(ec != boost::asio::error::operation_aborted) + { + BEAST_EXPECTS(ec, ec.message()); + BOOST_THROW_EXCEPTION( + system_error{ec}); + } + ++count; + }); + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 4); + }); + + // Four-way: ping, read, write, close + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + std::string const s = "Hello, world!"; + multi_buffer b; + ws.async_ping({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + ++count; + }); + ws.async_write(buffer(s), + [&](error_code ec) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + ++count; + }); + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + BEAST_EXPECT(count == 0); + ios.run(); + BEAST_EXPECT(count == 4); + }); } void @@ -357,7 +618,7 @@ public: run() override { testClose(); - testCloseSuspend(); + testSuspend(); testContHook(); } }; diff --git a/test/beast/websocket/frame.cpp b/test/beast/websocket/frame.cpp index ba808aa5..505d84ed 100644 --- a/test/beast/websocket/frame.cpp +++ b/test/beast/websocket/frame.cpp @@ -7,12 +7,8 @@ // Official repository: https://github.com/boostorg/beast // -#include #include #include -#include -#include -#include namespace boost { namespace beast { @@ -21,7 +17,6 @@ namespace detail { class frame_test : public beast::unit_test::suite - , public test::enable_yield_to { public: void testCloseCodes() @@ -47,7 +42,7 @@ public: test_fh() { op = detail::opcode::text; - fin = false; + fin = true; mask = false; rsv1 = false; rsv2 = false; @@ -57,8 +52,20 @@ public: } }; + void + testWriteFrame() + { + test_fh fh; + fh.rsv2 = true; + fh.rsv3 = true; + fh.len = 65536; + frame_buffer fb; + write(fb, fh); + } + void run() override { + testWriteFrame(); testCloseCodes(); } }; diff --git a/test/beast/websocket/handshake.cpp b/test/beast/websocket/handshake.cpp index f319848f..a585436b 100644 --- a/test/beast/websocket/handshake.cpp +++ b/test/beast/websocket/handshake.cpp @@ -214,10 +214,265 @@ public: ); } + // Compression Extensions for WebSocket + // + // https://tools.ietf.org/html/rfc7692 + // + void + testExtRead() + { + detail::pmd_offer po; + + auto const accept = + [&](string_view s) + { + http::fields f; + f.set(http::field::sec_websocket_extensions, s); + po = detail::pmd_offer(); + detail::pmd_read(po, f); + BEAST_EXPECT(po.accept); + }; + + auto const reject = + [&](string_view s) + { + http::fields f; + f.set(http::field::sec_websocket_extensions, s); + po = detail::pmd_offer(); + detail::pmd_read(po, f); + BEAST_EXPECT(! po.accept); + }; + + // duplicate parameters + reject("permessage-deflate; server_max_window_bits=8; server_max_window_bits=8"); + + // missing value + reject("permessage-deflate; server_max_window_bits"); + reject("permessage-deflate; server_max_window_bits="); + + // invalid value + reject("permessage-deflate; server_max_window_bits=-1"); + reject("permessage-deflate; server_max_window_bits=7"); + reject("permessage-deflate; server_max_window_bits=16"); + reject("permessage-deflate; server_max_window_bits=999999999999999999999999"); + reject("permessage-deflate; server_max_window_bits=9a"); + + // duplicate parameters + reject("permessage-deflate; client_max_window_bits=8; client_max_window_bits=8"); + + // optional value excluded + accept("permessage-deflate; client_max_window_bits"); + BEAST_EXPECT(po.client_max_window_bits == -1); + accept("permessage-deflate; client_max_window_bits="); + BEAST_EXPECT(po.client_max_window_bits == -1); + + // invalid value + reject("permessage-deflate; client_max_window_bits=-1"); + reject("permessage-deflate; client_max_window_bits=7"); + reject("permessage-deflate; client_max_window_bits=16"); + reject("permessage-deflate; client_max_window_bits=999999999999999999999999"); + + // duplicate parameters + reject("permessage-deflate; server_no_context_takeover; server_no_context_takeover"); + + // valueless parameter + accept("permessage-deflate; server_no_context_takeover"); + BEAST_EXPECT(po.server_no_context_takeover); + accept("permessage-deflate; server_no_context_takeover="); + BEAST_EXPECT(po.server_no_context_takeover); + + // disallowed value + reject("permessage-deflate; server_no_context_takeover=-1"); + reject("permessage-deflate; server_no_context_takeover=x"); + reject("permessage-deflate; server_no_context_takeover=\"yz\""); + reject("permessage-deflate; server_no_context_takeover=999999999999999999999999"); + + // duplicate parameters + reject("permessage-deflate; client_no_context_takeover; client_no_context_takeover"); + + // valueless parameter + accept("permessage-deflate; client_no_context_takeover"); + BEAST_EXPECT(po.client_no_context_takeover); + accept("permessage-deflate; client_no_context_takeover="); + BEAST_EXPECT(po.client_no_context_takeover); + + // disallowed value + reject("permessage-deflate; client_no_context_takeover=-1"); + reject("permessage-deflate; client_no_context_takeover=x"); + reject("permessage-deflate; client_no_context_takeover=\"yz\""); + reject("permessage-deflate; client_no_context_takeover=999999999999999999999999"); + + // unknown extension parameter + reject("permessage-deflate; unknown"); + reject("permessage-deflate; unknown="); + reject("permessage-deflate; unknown=1"); + reject("permessage-deflate; unknown=x"); + reject("permessage-deflate; unknown=\"xy\""); + } + + void + testExtWrite() + { + detail::pmd_offer po; + + auto const check = + [&](string_view match) + { + http::fields f; + detail::pmd_write(f, po); + BEAST_EXPECT( + f[http::field::sec_websocket_extensions] + == match); + }; + + po.accept = true; + po.server_max_window_bits = 0; + po.client_max_window_bits = 0; + po.server_no_context_takeover = false; + po.client_no_context_takeover = false; + + check("permessage-deflate"); + + po.server_max_window_bits = 10; + check("permessage-deflate; server_max_window_bits=10"); + + po.server_max_window_bits = -1; + check("permessage-deflate; server_max_window_bits"); + + po.server_max_window_bits = 0; + po.client_max_window_bits = 10; + check("permessage-deflate; client_max_window_bits=10"); + + po.client_max_window_bits = -1; + check("permessage-deflate; client_max_window_bits"); + + po.client_max_window_bits = 0; + po.server_no_context_takeover = true; + check("permessage-deflate; server_no_context_takeover"); + + po.server_no_context_takeover = false; + po.client_no_context_takeover = true; + check("permessage-deflate; client_no_context_takeover"); + } + + void + testExtNegotiate() + { + permessage_deflate pmd; + + auto const reject = + [&]( + string_view offer) + { + detail::pmd_offer po; + { + http::fields f; + f.set(http::field::sec_websocket_extensions, offer); + detail::pmd_read(po, f); + } + http::fields f; + detail::pmd_offer config; + detail::pmd_negotiate(f, config, po, pmd); + BEAST_EXPECT(! config.accept); + }; + + auto const accept = + [&]( + string_view offer, + string_view result) + { + detail::pmd_offer po; + { + http::fields f; + f.set(http::field::sec_websocket_extensions, offer); + detail::pmd_read(po, f); + } + http::fields f; + detail::pmd_offer config; + detail::pmd_negotiate(f, config, po, pmd); + auto const got = + f[http::field::sec_websocket_extensions]; + BEAST_EXPECTS(got == result, got); + { + detail::pmd_offer poc; + detail::pmd_read(poc, f); + detail::pmd_normalize(poc); + BEAST_EXPECT(poc.accept); + } + BEAST_EXPECT(config.server_max_window_bits != 0); + BEAST_EXPECT(config.client_max_window_bits != 0); + }; + + pmd.server_enable = true; + pmd.server_max_window_bits = 15; + pmd.client_max_window_bits = 15; + pmd.server_no_context_takeover = false; + pmd.client_no_context_takeover = false; + + // default + accept( + "permessage-deflate", + "permessage-deflate"); + + // non-default server_max_window_bits + accept( + "permessage-deflate; server_max_window_bits=14", + "permessage-deflate; server_max_window_bits=14"); + + // explicit default server_max_window_bits + accept( + "permessage-deflate; server_max_window_bits=15", + "permessage-deflate"); + + // minimum window size of 8 bits (a zlib bug) + accept( + "permessage-deflate; server_max_window_bits=8", + "permessage-deflate; server_max_window_bits=9"); + + // non-default server_max_window_bits setting + pmd.server_max_window_bits = 10; + accept( + "permessage-deflate", + "permessage-deflate; server_max_window_bits=10"); + + // clamped server_max_window_bits setting #1 + pmd.server_max_window_bits = 10; + accept( + "permessage-deflate; server_max_window_bits=14", + "permessage-deflate; server_max_window_bits=10"); + + // clamped server_max_window_bits setting #2 + pmd.server_max_window_bits=8; + accept( + "permessage-deflate; server_max_window_bits=14", + "permessage-deflate; server_max_window_bits=9"); + + pmd.server_max_window_bits = 15; + + // present with no value + accept( + "permessage-deflate; client_max_window_bits", + "permessage-deflate"); + + // present with no value, non-default setting + pmd.client_max_window_bits = 10; + accept( + "permessage-deflate; client_max_window_bits", + "permessage-deflate; client_max_window_bits=10"); + + // absent, non-default setting + pmd.client_max_window_bits = 10; + reject( + "permessage-deflate"); + } + void run() override { testHandshake(); + testExtRead(); + testExtWrite(); + testExtNegotiate(); } }; diff --git a/test/beast/websocket/ping.cpp b/test/beast/websocket/ping.cpp index fa33178d..f7b0fc9c 100644 --- a/test/beast/websocket/ping.cpp +++ b/test/beast/websocket/ping.cpp @@ -89,40 +89,10 @@ public: { doTestPing(AsyncClient{yield}); }); - - // suspend on write - { - echo_server es{log}; - error_code ec; - boost::asio::io_service ios; - stream ws{ios}; - ws.next_layer().connect(es.stream()); - ws.handshake("localhost", "/", ec); - BEAST_EXPECTS(! ec, ec.message()); - std::size_t count = 0; - ws.async_write(sbuf("*"), - [&](error_code ec) - { - ++count; - BEAST_EXPECTS(! ec, ec.message()); - }); - BEAST_EXPECT(ws.wr_block_); - ws.async_ping("", - [&](error_code ec) - { - ++count; - BEAST_EXPECTS( - ec == boost::asio::error::operation_aborted, - ec.message()); - }); - ws.async_close({}, [&](error_code){}); - ios.run(); - BEAST_EXPECT(count == 2); - } } void - testPingSuspend() + testSuspend() { // suspend on write doFailLoop([&](test::fail_counter& fc) @@ -352,6 +322,45 @@ public: BEAST_EXPECT(count == 2); }); + // don't ping on close + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + error_code ec; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + ws.async_write(sbuf("*"), + [&](error_code ec) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + BEAST_EXPECT(ws.wr_block_); + ws.async_ping("", + [&](error_code ec) + { + ++count; + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + ws.async_close({}, + [&](error_code) + { + ++count; + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + ios.run(); + BEAST_EXPECT(count == 3); + }); + { echo_server es{log, kind::async}; boost::asio::io_service ios; @@ -437,7 +446,7 @@ public: run() override { testPing(); - testPingSuspend(); + testSuspend(); testContHook(); } }; diff --git a/test/beast/websocket/read.cpp b/test/beast/websocket/read.cpp index dded2dfe..8b9fa517 100644 --- a/test/beast/websocket/read.cpp +++ b/test/beast/websocket/read.cpp @@ -12,6 +12,8 @@ #include "test.hpp" +#include + namespace boost { namespace beast { namespace websocket { @@ -104,7 +106,7 @@ public: }); // two part message - // this triggers "fill the read buffer first" + // triggers "fill the read buffer first" doTest(pmd, [&](ws_type& ws) { w.write_raw(ws, sbuf( @@ -202,6 +204,58 @@ public: BEAST_EXPECT(to_string(b.data()) == "Hello, World!"); }); + // masked message, big + doStreamLoop([&](test::stream& ts) + { + echo_server es{log, kind::async_client}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); + ws.set_option(pmd); + es.async_handshake(); + try + { + w.accept(ws); + std::string const s(2000, '*'); + ws.auto_fragment(false); + ws.binary(false); + w.write(ws, buffer(s)); + multi_buffer b; + w.read(ws, b); + BEAST_EXPECT(ws.got_text()); + BEAST_EXPECT(to_string(b.data()) == s); + ws.next_layer().close(); + } + catch(...) + { + ts.close(); + throw; + } + }); + + // close + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log, kind::async}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + // Cause close to be received + es.async_close(); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + if(ec != error::closed) + BOOST_THROW_EXCEPTION( + system_error{ec}); + }); + ios.run(); + BEAST_EXPECT(count == 1); + }); + // already closed doTest(pmd, [&](ws_type& ws) { @@ -249,7 +303,7 @@ public: doReadTest(w, ws, close_code::protocol_error); }); - // receive bad close + // bad close doTest(pmd, [&](ws_type& ws) { put(ws.next_layer().buffer(), cbuf( @@ -257,15 +311,6 @@ public: doFailTest(w, ws, error::failed); }); - // expected cont - doTest(pmd, [&](ws_type& ws) - { - w.write_some(ws, false, boost::asio::null_buffers{}); - w.write_raw(ws, cbuf( - 0x81, 0x80, 0xff, 0xff, 0xff, 0xff)); - doReadTest(w, ws, close_code::protocol_error); - }); - // message size above 2^64 doTest(pmd, [&](ws_type& ws) { @@ -284,14 +329,6 @@ public: doFailTest(w, ws, error::failed); }); - // unexpected cont - doTest(pmd, [&](ws_type& ws) - { - w.write_raw(ws, cbuf( - 0x80, 0x80, 0xff, 0xff, 0xff, 0xff)); - doReadTest(w, ws, close_code::protocol_error); - }); - // bad utf8 doTest(pmd, [&](ws_type& ws) { @@ -313,10 +350,23 @@ public: doTest(pmd, [&](ws_type& ws) { std::string const s = - random_string() + - "Hello, world!" "\xc0"; - w.write(ws, buffer(s)); - doReadTest(w, ws, close_code::bad_payload); + "\x81\x7e\x0f\xa1" + + std::string(4000, '*') + "\xc0"; + ws.next_layer().append(s); + multi_buffer b; + try + { + do + { + b.commit(w.read_some(ws, b.prepare(4000))); + } + while(! ws.is_message_done()); + } + catch(system_error const& se) + { + if(se.code() != error::failed) + throw; + } }); // close frames @@ -441,6 +491,34 @@ public: BEAST_EXPECT(to_string(b.data()) == s); }); + // masked message + doStreamLoop([&](test::stream& ts) + { + echo_server es{log, kind::async_client}; + ws_type ws{ts}; + ws.next_layer().connect(es.stream()); + ws.set_option(pmd); + es.async_handshake(); + try + { + w.accept(ws); + std::string const s = "Hello, world!"; + ws.auto_fragment(false); + ws.binary(false); + w.write(ws, buffer(s)); + multi_buffer b; + w.read(ws, b); + BEAST_EXPECT(ws.got_text()); + BEAST_EXPECT(to_string(b.data()) == s); + ws.next_layer().close(); + } + catch(...) + { + ts.close(); + throw; + } + }); + // empty message doTest(pmd, [&](ws_type& ws) { @@ -571,11 +649,77 @@ public: } void - testReadSuspend() + testSuspend() { using boost::asio::buffer; +#if 1 + // suspend on read block + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + while(! ws.rd_block_) + ios.run_one(); + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + ios.run(); + BEAST_EXPECT(count == 2); + }); +#endif - // suspend on write + // suspend on release read block + doFailLoop([&](test::fail_counter& fc) + { +//log << "fc.count()==" << fc.count() << std::endl; + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + BOOST_ASSERT(ws.rd_block_); + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + ios.run(); + BEAST_EXPECT(count == 2); + }); + +#if 1 + // suspend on write pong doFailLoop([&](test::fail_counter& fc) { echo_server es{log}; @@ -592,25 +736,238 @@ public: ws.async_read(b, [&](error_code ec, std::size_t) { - ++count; if(ec) BOOST_THROW_EXCEPTION( system_error{ec}); BEAST_EXPECT(to_string(b.data()) == s); + ++count; }); BEAST_EXPECT(ws.rd_block_); ws.async_write(buffer(s), [&](error_code ec) { - ++count; if(ec) BOOST_THROW_EXCEPTION( system_error{ec}); + ++count; }); BEAST_EXPECT(ws.wr_block_); ios.run(); BEAST_EXPECT(count == 2); }); + + // Ignore ping when closing + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + // insert fragmented message with + // a ping in between the frames. + ws.next_layer().append(string_view( + "\x01\x01*" + "\x89\x00" + "\x80\x01*", 8)); + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(to_string(b.data()) == "**"); + BEAST_EXPECT(++count == 1); + b.consume(b.size()); + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 3); + }); + }); + BEAST_EXPECT(ws.rd_block_); + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + BEAST_EXPECT(ws.wr_block_); + ios.run(); + BEAST_EXPECT(count == 3); + }); + + // See if we are already closing + doFailLoop([&](test::fail_counter& fc) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios, fc}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + std::size_t count = 0; + // insert fragmented message with + // a close in between the frames. + ws.next_layer().append(string_view( + "\x01\x01*" + "\x88\x00" + "\x80\x01*", 8)); + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + if(ec != boost::asio::error::operation_aborted) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 2); + }); + BEAST_EXPECT(ws.rd_block_); + ws.async_close({}, + [&](error_code ec) + { + if(ec) + BOOST_THROW_EXCEPTION( + system_error{ec}); + BEAST_EXPECT(++count == 1); + }); + BEAST_EXPECT(ws.wr_block_); + ios.run(); + BEAST_EXPECT(count == 2); + }); +#endif + } + + void + testParseFrame() + { + auto const bad = + [&](string_view s) + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + ws.next_layer().append(s); + error_code ec; + multi_buffer b; + ws.read(b, ec); + BEAST_EXPECT(ec); + }; + + // chopped frame header + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + ws.next_layer().append( + "\x81\x7e\x01"); + std::size_t count = 0; + std::string const s(257, '*'); + error_code ec; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + BEAST_EXPECT(to_string(b.data()) == s); + }); + ios.run_one(); + es.stream().write_some( + boost::asio::buffer("\x01" + s)); + ios.run(); + BEAST_EXPECT(count == 1); + } + + // new data frame when continuation expected + bad("\x01\x01*" "\x81\x01*"); + + // reserved bits not cleared + bad("\xb1\x01*"); + bad("\xc1\x01*"); + bad("\xd1\x01*"); + + // continuation without an active message + bad("\x80\x01*"); + + // reserved bits not cleared (cont) + bad("\x01\x01*" "\xb0\x01*"); + bad("\x01\x01*" "\xc0\x01*"); + bad("\x01\x01*" "\xd0\x01*"); + + // reserved opcode + bad("\x83\x01*"); + + // fragmented control message + bad("\x09\x01*"); + + // invalid length for control message + bad("\x89\x7e\x01\x01"); + + // reserved bits not cleared (control) + bad("\xb9\x01*"); + bad("\xc9\x01*"); + bad("\xd9\x01*"); + + // unmasked frame from client + { + echo_server es{log, kind::async_client}; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + es.async_handshake(); + ws.accept(); + ws.next_layer().append( + "\x81\x01*"); + error_code ec; + multi_buffer b; + ws.read(b, ec); + BEAST_EXPECT(ec); + } + + // masked frame from server + bad("\x81\x80\xff\xff\xff\xff"); + + // chopped control frame payload + { + echo_server es{log}; + boost::asio::io_service ios; + stream ws{ios}; + ws.next_layer().connect(es.stream()); + ws.handshake("localhost", "/"); + ws.next_layer().append( + "\x89\x02*"); + std::size_t count = 0; + error_code ec; + multi_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + ++count; + BEAST_EXPECTS(! ec, ec.message()); + BEAST_EXPECT(to_string(b.data()) == "**"); + }); + ios.run_one(); + es.stream().write_some( + boost::asio::buffer( + "*" "\x81\x02**")); + ios.run(); + BEAST_EXPECT(count == 1); + } + + // length not canonical + bad(string_view("\x81\x7e\x00\x7d", 4)); + bad(string_view("\x81\x7f\x00\x00\x00\x00\x00\x00\xff\xff", 10)); } void @@ -631,6 +988,7 @@ public: buf, sizeof(buf)}}; using boost::asio::asio_handler_is_continuation; asio_handler_is_continuation(&op); + pass(); } { struct handler @@ -645,6 +1003,7 @@ public: handler{}, ws, b, 32, true}; using boost::asio::asio_handler_is_continuation; asio_handler_is_continuation(&op); + pass(); } } @@ -652,7 +1011,8 @@ public: run() override { testRead(); - testReadSuspend(); + testSuspend(); + testParseFrame(); testContHook(); } }; diff --git a/test/beast/websocket/rfc6455.cpp b/test/beast/websocket/rfc6455.cpp index 5efd8ceb..4d7546ae 100644 --- a/test/beast/websocket/rfc6455.cpp +++ b/test/beast/websocket/rfc6455.cpp @@ -34,8 +34,6 @@ public: req.insert("Connection", "upgrade"); BEAST_EXPECT(! is_upgrade(req)); req.insert("Upgrade", "websocket"); - BEAST_EXPECT(! is_upgrade(req)); - req.insert("Sec-WebSocket-Version", "13"); BEAST_EXPECT(is_upgrade(req)); } diff --git a/test/beast/websocket/stream.cpp b/test/beast/websocket/stream.cpp index c84b14d7..17132b64 100644 --- a/test/beast/websocket/stream.cpp +++ b/test/beast/websocket/stream.cpp @@ -22,7 +22,7 @@ public: void testOptions() { - stream ws(ios_); + stream ws{ios_}; ws.auto_fragment(true); ws.write_buffer_size(2048); ws.binary(false); @@ -36,48 +36,71 @@ public: { pass(); } + + auto const bad = + [&](permessage_deflate const& pmd) + { + stream ws{ios_}; + try + { + ws.set_option(pmd); + fail("", __FILE__, __LINE__); + } + catch(std::exception const&) + { + pass(); + } + }; + + { + permessage_deflate pmd; + pmd.server_max_window_bits = 16; + bad(pmd); + } + + { + permessage_deflate pmd; + pmd.server_max_window_bits = 8; + bad(pmd); + } + + { + permessage_deflate pmd; + pmd.client_max_window_bits = 16; + bad(pmd); + } + + { + permessage_deflate pmd; + pmd.client_max_window_bits = 8; + bad(pmd); + } + + { + permessage_deflate pmd; + pmd.compLevel = -1; + bad(pmd); + } + + { + permessage_deflate pmd; + pmd.compLevel = 10; + bad(pmd); + } + + { + permessage_deflate pmd; + pmd.memLevel = 0; + bad(pmd); + } + + { + permessage_deflate pmd; + pmd.memLevel = 10; + bad(pmd); + } } - //-------------------------------------------------------------------------- - - template - void - doTestStream(Wrap const& w, - permessage_deflate const& pmd) - { - using boost::asio::buffer; - - // send pong - doTest(pmd, [&](ws_type& ws) - { - w.pong(ws, ""); - }); - - // send auto fragmented message - doTest(pmd, [&](ws_type& ws) - { - ws.auto_fragment(true); - ws.write_buffer_size(8); - w.write(ws, sbuf("Now is the time for all good men")); - multi_buffer b; - w.read(ws, b); - BEAST_EXPECT(to_string(b.data()) == "Now is the time for all good men"); - }); - - // send message with write buffer limit - doTest(pmd, [&](ws_type& ws) - { - std::string s(2000, '*'); - ws.write_buffer_size(1200); - w.write(ws, buffer(s.data(), s.size())); - multi_buffer b; - w.read(ws, b); - BEAST_EXPECT(to_string(b.data()) == s); - }); - } - - //-------------------------------------------------------------------------- - void run() override { @@ -103,42 +126,6 @@ public: sizeof(websocket::stream) << std::endl; testOptions(); - -#if 0 - auto const testStream = - [this](permessage_deflate const& pmd) - { - doTestStream(SyncClient{}, pmd); - - yield_to( - [&](yield_context yield) - { - doTestStream(AsyncClient{yield}, pmd); - }); - }; - - permessage_deflate pmd; - - pmd.client_enable = false; - pmd.server_enable = false; - testStream(pmd); - - pmd.client_enable = true; - pmd.server_enable = true; - pmd.client_max_window_bits = 10; - pmd.client_no_context_takeover = false; - pmd.compLevel = 1; - pmd.memLevel = 1; - testStream(pmd); - - pmd.client_enable = true; - pmd.server_enable = true; - pmd.client_max_window_bits = 10; - pmd.client_no_context_takeover = true; - pmd.compLevel = 1; - pmd.memLevel = 1; - testStream(pmd); -#endif } }; diff --git a/test/beast/websocket/test.hpp b/test/beast/websocket/test.hpp index 726eab90..9e258fdd 100644 --- a/test/beast/websocket/test.hpp +++ b/test/beast/websocket/test.hpp @@ -264,7 +264,7 @@ public: Test const& f, std::size_t limit = 200) { std::size_t n; - for(n = 0; n <= limit; ++n) + for(n = 0; n < limit; ++n) { test::fail_counter fc{n}; try @@ -288,7 +288,7 @@ public: { // This number has to be high for the // test that writes the large buffer. - static std::size_t constexpr limit = 1000; + static std::size_t constexpr limit = 200; doFailLoop( [&](test::fail_counter& fc) @@ -308,12 +308,12 @@ public: { // This number has to be high for the // test that writes the large buffer. - static std::size_t constexpr limit = 1000; + static std::size_t constexpr limit = 200; for(int i = 0; i < 2; ++i) { std::size_t n; - for(n = 0; n <= limit; ++n) + for(n = 0; n < limit; ++n) { test::fail_counter fc{n}; test::stream ts{ios_, fc}; diff --git a/test/beast/websocket/write.cpp b/test/beast/websocket/write.cpp index 43107bac..bd22d41c 100644 --- a/test/beast/websocket/write.cpp +++ b/test/beast/websocket/write.cpp @@ -73,6 +73,20 @@ public: BEAST_EXPECT(b.size() == 0); }); + // fragmented message + doTest(pmd, [&](ws_type& ws) + { + ws.auto_fragment(false); + ws.binary(false); + std::string const s = "Hello, world!"; + w.write_some(ws, false, buffer(s.data(), 5)); + w.write_some(ws, true, buffer(s.data() + 5, s.size() - 5)); + multi_buffer b; + w.read(ws, b); + BEAST_EXPECT(ws.got_text()); + BEAST_EXPECT(to_string(b.data()) == s); + }); + // continuation doTest(pmd, [&](ws_type& ws) { diff --git a/test/extras/include/boost/beast/test/fail_counter.hpp b/test/extras/include/boost/beast/test/fail_counter.hpp index 9734fdda..6850a3ca 100644 --- a/test/extras/include/boost/beast/test/fail_counter.hpp +++ b/test/extras/include/boost/beast/test/fail_counter.hpp @@ -114,6 +114,7 @@ struct fail_error_code : error_code class fail_counter { std::size_t n_; + std::size_t i_ = 0; error_code ec_; public: @@ -131,13 +132,20 @@ public: { } + /// Returns the fail index + std::size_t + count() const + { + return n_; + } + /// Throw an exception on the Nth failure void fail() { - if(n_ > 0) - --n_; - if(! n_) + if(i_ < n_) + ++i_; + if(i_ == n_) BOOST_THROW_EXCEPTION(system_error{ec_}); } @@ -145,9 +153,9 @@ public: bool fail(error_code& ec) { - if(n_ > 0) - --n_; - if(! n_) + if(i_ < n_) + ++i_; + if(i_ == n_) { ec = ec_; return true; diff --git a/test/extras/include/boost/beast/test/stream.hpp b/test/extras/include/boost/beast/test/stream.hpp index 75b20076..c1228228 100644 --- a/test/extras/include/boost/beast/test/stream.hpp +++ b/test/extras/include/boost/beast/test/stream.hpp @@ -78,6 +78,11 @@ class stream std::size_t write_max = (std::numeric_limits::max)(); + ~state() + { + BOOST_ASSERT(! op); + } + explicit state( boost::asio::io_service& ios_, @@ -87,11 +92,6 @@ class stream { } - ~state() - { - BOOST_ASSERT(! op); - } - void on_write() { @@ -119,6 +119,10 @@ public: /// Destructor ~stream() { + { + std::unique_lock lock{in_->m}; + in_->op.reset(); + } auto out = out_.lock(); if(out) { @@ -612,16 +616,17 @@ teardown( stream& s, boost::system::error_code& ec) { - if(s.in_->fc) - { - if(s.in_->fc->fail(ec)) - return; - } + if( s.in_->fc && + s.in_->fc->fail(ec)) + return; + + s.close(); + + if( s.in_->fc && + s.in_->fc->fail(ec)) + ec = boost::asio::error::eof; else - { - s.close(); ec.assign(0, ec.category()); - } } template @@ -633,10 +638,17 @@ async_teardown( TeardownHandler&& handler) { error_code ec; - if(s.in_->fc && s.in_->fc->fail(ec)) + if( s.in_->fc && + s.in_->fc->fail(ec)) return s.get_io_service().post( bind_handler(std::move(handler), ec)); s.close(); + if( s.in_->fc && + s.in_->fc->fail(ec)) + ec = boost::asio::error::eof; + else + ec.assign(0, ec.category()); + s.get_io_service().post( bind_handler(std::move(handler), ec)); }