From a435dde2a307a5abc1438061f27e7a8e3181f782 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 15 Aug 2017 12:49:02 -0700 Subject: [PATCH] Fix done state for WebSocket reads --- CHANGELOG.md | 8 + include/boost/beast/websocket/impl/read.ipp | 366 ++++++++++---------- 2 files changed, 192 insertions(+), 182 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e428603..dd81af63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +Version 107: + +WebSocket + +* Fix done state for WebSocket reads + +-------------------------------------------------------------------------------- + Version 106: * Dynamic buffer input areas are mutable diff --git a/include/boost/beast/websocket/impl/read.ipp b/include/boost/beast/websocket/impl/read.ipp index ce2655b9..071d9fa2 100644 --- a/include/boost/beast/websocket/impl/read.ipp +++ b/include/boost/beast/websocket/impl/read.ipp @@ -346,93 +346,94 @@ operator()( // Empty non-final frame goto loop; } - ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin; - if(ws_.rd_.done) - goto upcall; // Empty final frame + ws_.rd_.done = false; } if(! ws_.pmd_ || ! ws_.pmd_->rd_set) { - if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > - (std::min)(clamp(ws_.rd_.remain), - buffer_size(cb_))) + // Check for empty final frame + if(ws_.rd_.remain > 0 || ! ws_.rd_.fh.fin) { - // Fill the read buffer first, otherwise we - // get fewer bytes at the cost of one I/O. - BOOST_ASIO_CORO_YIELD - ws_.stream_.async_read_some( - ws_.rd_.buf.prepare(read_size( - ws_.rd_.buf, ws_.rd_.buf.max_size())), - std::move(*this)); - dispatched_ = true; - ws_.failed_ = !!ec; - if(ws_.failed_) - goto upcall; - ws_.rd_.buf.commit(bytes_transferred); - if(ws_.rd_.fh.mask) - detail::mask_inplace(buffer_prefix(clamp( - ws_.rd_.remain), ws_.rd_.buf.data()), - ws_.rd_.key); - } - if(ws_.rd_.buf.size() > 0) - { - // Copy from the read buffer. - // The mask was already applied. - bytes_transferred = buffer_copy(cb_, - ws_.rd_.buf.data(), clamp(ws_.rd_.remain)); - auto const mb = buffer_prefix( - bytes_transferred, cb_); - ws_.rd_.remain -= bytes_transferred; - if(ws_.rd_.op == detail::opcode::text) + if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > + (std::min)(clamp(ws_.rd_.remain), + buffer_size(cb_))) { - if(! ws_.rd_.utf8.write(mb) || - (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && - ! ws_.rd_.utf8.finish())) - { - // _Fail the WebSocket Connection_ - code = close_code::bad_payload; - ec = error::failed; - goto close; - } + // Fill the read buffer first, otherwise we + // get fewer bytes at the cost of one I/O. + BOOST_ASIO_CORO_YIELD + ws_.stream_.async_read_some( + ws_.rd_.buf.prepare(read_size( + ws_.rd_.buf, ws_.rd_.buf.max_size())), + std::move(*this)); + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + goto upcall; + ws_.rd_.buf.commit(bytes_transferred); + if(ws_.rd_.fh.mask) + detail::mask_inplace(buffer_prefix(clamp( + ws_.rd_.remain), ws_.rd_.buf.data()), + ws_.rd_.key); } - bytes_written_ += bytes_transferred; - ws_.rd_.size += bytes_transferred; - ws_.rd_.buf.consume(bytes_transferred); - } - else - { - // Read into caller's buffer - BOOST_ASSERT(ws_.rd_.remain > 0); - BOOST_ASSERT(buffer_size(cb_) > 0); - BOOST_ASIO_CORO_YIELD - ws_.stream_.async_read_some(buffer_prefix( - clamp(ws_.rd_.remain), cb_), std::move(*this)); - dispatched_ = true; - ws_.failed_ = !!ec; - if(ws_.failed_) - goto upcall; - BOOST_ASSERT(bytes_transferred > 0); - auto const mb = buffer_prefix( - bytes_transferred, cb_); - ws_.rd_.remain -= bytes_transferred; - if(ws_.rd_.fh.mask) - detail::mask_inplace(mb, ws_.rd_.key); - if(ws_.rd_.op == detail::opcode::text) + if(ws_.rd_.buf.size() > 0) { - if(! ws_.rd_.utf8.write(mb) || - (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && - ! ws_.rd_.utf8.finish())) + // Copy from the read buffer. + // The mask was already applied. + bytes_transferred = buffer_copy(cb_, + ws_.rd_.buf.data(), clamp(ws_.rd_.remain)); + auto const mb = buffer_prefix( + bytes_transferred, cb_); + ws_.rd_.remain -= bytes_transferred; + if(ws_.rd_.op == detail::opcode::text) { - // _Fail the WebSocket Connection_ - code = close_code::bad_payload; - ec = error::failed; - goto close; + if(! ws_.rd_.utf8.write(mb) || + (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + code = close_code::bad_payload; + ec = error::failed; + goto close; + } } + bytes_written_ += bytes_transferred; + ws_.rd_.size += bytes_transferred; + ws_.rd_.buf.consume(bytes_transferred); + } + else + { + // Read into caller's buffer + BOOST_ASSERT(ws_.rd_.remain > 0); + BOOST_ASSERT(buffer_size(cb_) > 0); + BOOST_ASIO_CORO_YIELD + ws_.stream_.async_read_some(buffer_prefix( + clamp(ws_.rd_.remain), cb_), std::move(*this)); + dispatched_ = true; + ws_.failed_ = !!ec; + if(ws_.failed_) + goto upcall; + BOOST_ASSERT(bytes_transferred > 0); + auto const mb = buffer_prefix( + bytes_transferred, cb_); + ws_.rd_.remain -= bytes_transferred; + if(ws_.rd_.fh.mask) + detail::mask_inplace(mb, ws_.rd_.key); + if(ws_.rd_.op == detail::opcode::text) + { + if(! ws_.rd_.utf8.write(mb) || + (ws_.rd_.remain == 0 && ws_.rd_.fh.fin && + ! ws_.rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + code = close_code::bad_payload; + ec = error::failed; + goto close; + } + } + bytes_written_ += bytes_transferred; + ws_.rd_.size += bytes_transferred; } - bytes_written_ += bytes_transferred; - ws_.rd_.size += bytes_transferred; } - ws_.rd_.done = - ws_.rd_.remain == 0 && ws_.rd_.fh.fin; + ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin; goto upcall; } else @@ -1033,15 +1034,16 @@ loop: // Empty non-final frame goto loop; } - rd_.done = rd_.remain == 0 && rd_.fh.fin; + rd_.done = false; } else { ec.assign(0, ec.category()); } - if( ! rd_.done) + if(! pmd_ || ! pmd_->rd_set) { - if(! pmd_ || ! pmd_->rd_set) + // Check for empty final frame + if(rd_.remain > 0 || ! rd_.fh.fin) { if(rd_.buf.size() == 0 && rd_.buf.max_size() > (std::min)(clamp(rd_.remain), @@ -1122,132 +1124,132 @@ loop: bytes_written += bytes_transferred; rd_.size += bytes_transferred; } - rd_.done = rd_.remain == 0 && rd_.fh.fin; } - else + rd_.done = rd_.remain == 0 && rd_.fh.fin; + } + else + { + // Read compressed message frame payload: + // inflate even if rd_.fh.len == 0, otherwise we + // never emit the end-of-stream deflate block. + // + bool did_read = false; + consuming_buffers cb{buffers}; + while(buffer_size(cb) > 0) { - // Read compressed message frame payload: - // inflate even if rd_.fh.len == 0, otherwise we - // never emit the end-of-stream deflate block. - // - bool did_read = false; - consuming_buffers cb{buffers}; - while(buffer_size(cb) > 0) + zlib::z_params zs; { - zlib::z_params zs; + auto const out = buffer_front(cb); + zs.next_out = buffer_cast(out); + zs.avail_out = buffer_size(out); + BOOST_ASSERT(zs.avail_out > 0); + } + if(rd_.remain > 0) + { + if(rd_.buf.size() > 0) { - auto const out = buffer_front(cb); - zs.next_out = buffer_cast(out); - zs.avail_out = buffer_size(out); - BOOST_ASSERT(zs.avail_out > 0); + // use what's there + auto const in = buffer_prefix( + clamp(rd_.remain), buffer_front( + rd_.buf.data())); + zs.avail_in = buffer_size(in); + zs.next_in = buffer_cast(in); } - if(rd_.remain > 0) + else if(! did_read) { - if(rd_.buf.size() > 0) - { - // use what's there - auto const in = buffer_prefix( - clamp(rd_.remain), buffer_front( - rd_.buf.data())); - zs.avail_in = buffer_size(in); - zs.next_in = buffer_cast(in); - } - else if(! did_read) - { - // read new - auto const bytes_transferred = - stream_.read_some( - rd_.buf.prepare(read_size( - rd_.buf, rd_.buf.max_size())), - ec); - failed_ = !!ec; - if(failed_) - return bytes_written; - BOOST_ASSERT(bytes_transferred > 0); - rd_.buf.commit(bytes_transferred); - if(rd_.fh.mask) - detail::mask_inplace( - buffer_prefix(clamp(rd_.remain), - rd_.buf.data()), rd_.key); - auto const in = buffer_prefix( - clamp(rd_.remain), buffer_front( - rd_.buf.data())); - zs.avail_in = buffer_size(in); - zs.next_in = buffer_cast(in); - did_read = true; - } - else - { - break; - } - } - else if(rd_.fh.fin) - { - // append the empty block codes - static std::uint8_t constexpr - empty_block[4] = { - 0x00, 0x00, 0xff, 0xff }; - zs.next_in = empty_block; - zs.avail_in = sizeof(empty_block); - pmd_->zi.write(zs, zlib::Flush::sync, ec); - BOOST_ASSERT(! ec); + // read new + auto const bytes_transferred = + stream_.read_some( + rd_.buf.prepare(read_size( + rd_.buf, rd_.buf.max_size())), + ec); failed_ = !!ec; if(failed_) return bytes_written; - // VFALCO See: - // https://github.com/madler/zlib/issues/280 - BOOST_ASSERT(zs.total_out == 0); - cb.consume(zs.total_out); - rd_.size += zs.total_out; - bytes_written += zs.total_out; - if( - (role_ == role_type::client && - pmd_config_.server_no_context_takeover) || - (role_ == role_type::server && - pmd_config_.client_no_context_takeover)) - pmd_->zi.reset(); - rd_.done = true; - break; + BOOST_ASSERT(bytes_transferred > 0); + rd_.buf.commit(bytes_transferred); + if(rd_.fh.mask) + detail::mask_inplace( + buffer_prefix(clamp(rd_.remain), + rd_.buf.data()), rd_.key); + auto const in = buffer_prefix( + clamp(rd_.remain), buffer_front( + rd_.buf.data())); + zs.avail_in = buffer_size(in); + zs.next_in = buffer_cast(in); + did_read = true; } else { break; } + } + else if(rd_.fh.fin) + { + // append the empty block codes + static std::uint8_t constexpr + empty_block[4] = { + 0x00, 0x00, 0xff, 0xff }; + zs.next_in = empty_block; + zs.avail_in = sizeof(empty_block); pmd_->zi.write(zs, zlib::Flush::sync, ec); - BOOST_ASSERT(ec != zlib::error::end_of_stream); + BOOST_ASSERT(! ec); failed_ = !!ec; if(failed_) return bytes_written; - if(rd_msg_max_ && beast::detail::sum_exceeds( - rd_.size, zs.total_out, rd_msg_max_)) - { - do_fail( - close_code::too_big, - error::failed, - ec); - return bytes_written; - } + // VFALCO See: + // https://github.com/madler/zlib/issues/280 + BOOST_ASSERT(zs.total_out == 0); cb.consume(zs.total_out); rd_.size += zs.total_out; - rd_.remain -= zs.total_in; - rd_.buf.consume(zs.total_in); bytes_written += zs.total_out; + if( + (role_ == role_type::client && + pmd_config_.server_no_context_takeover) || + (role_ == role_type::server && + pmd_config_.client_no_context_takeover)) + pmd_->zi.reset(); + rd_.done = true; + break; } - if(rd_.op == detail::opcode::text) + else { - // check utf8 - if(! rd_.utf8.write( - buffer_prefix(bytes_written, buffers)) || ( - rd_.remain == 0 && rd_.fh.fin && - ! rd_.utf8.finish())) - { - // _Fail the WebSocket Connection_ - do_fail( - close_code::bad_payload, - error::failed, - ec); - return bytes_written; - } + break; + } + pmd_->zi.write(zs, zlib::Flush::sync, ec); + BOOST_ASSERT(ec != zlib::error::end_of_stream); + failed_ = !!ec; + if(failed_) + return bytes_written; + if(rd_msg_max_ && beast::detail::sum_exceeds( + rd_.size, zs.total_out, rd_msg_max_)) + { + do_fail( + close_code::too_big, + error::failed, + ec); + return bytes_written; + } + cb.consume(zs.total_out); + rd_.size += zs.total_out; + rd_.remain -= zs.total_in; + rd_.buf.consume(zs.total_in); + bytes_written += zs.total_out; + } + if(rd_.op == detail::opcode::text) + { + // check utf8 + if(! rd_.utf8.write( + buffer_prefix(bytes_written, buffers)) || ( + rd_.remain == 0 && rd_.fh.fin && + ! rd_.utf8.finish())) + { + // _Fail the WebSocket Connection_ + do_fail( + close_code::bad_payload, + error::failed, + ec); + return bytes_written; } } }