Fix done state for WebSocket reads

This commit is contained in:
Vinnie Falco
2017-08-15 12:49:02 -07:00
parent aabd33a677
commit a435dde2a3
2 changed files with 192 additions and 182 deletions

View File

@ -1,3 +1,11 @@
Version 107:
WebSocket
* Fix done state for WebSocket reads
--------------------------------------------------------------------------------
Version 106: Version 106:
* Dynamic buffer input areas are mutable * Dynamic buffer input areas are mutable

View File

@ -346,93 +346,94 @@ operator()(
// Empty non-final frame // Empty non-final frame
goto loop; goto loop;
} }
ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin; ws_.rd_.done = false;
if(ws_.rd_.done)
goto upcall; // Empty final frame
} }
if(! ws_.pmd_ || ! ws_.pmd_->rd_set) if(! ws_.pmd_ || ! ws_.pmd_->rd_set)
{ {
if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() > // Check for empty final frame
(std::min)(clamp(ws_.rd_.remain), if(ws_.rd_.remain > 0 || ! ws_.rd_.fh.fin)
buffer_size(cb_)))
{ {
// Fill the read buffer first, otherwise we if(ws_.rd_.buf.size() == 0 && ws_.rd_.buf.max_size() >
// get fewer bytes at the cost of one I/O. (std::min)(clamp(ws_.rd_.remain),
BOOST_ASIO_CORO_YIELD buffer_size(cb_)))
ws_.stream_.async_read_some(
ws_.rd_.buf.prepare(read_size(
ws_.rd_.buf, ws_.rd_.buf.max_size())),
std::move(*this));
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
goto upcall;
ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask)
detail::mask_inplace(buffer_prefix(clamp(
ws_.rd_.remain), ws_.rd_.buf.data()),
ws_.rd_.key);
}
if(ws_.rd_.buf.size() > 0)
{
// Copy from the read buffer.
// The mask was already applied.
bytes_transferred = buffer_copy(cb_,
ws_.rd_.buf.data(), clamp(ws_.rd_.remain));
auto const mb = buffer_prefix(
bytes_transferred, cb_);
ws_.rd_.remain -= bytes_transferred;
if(ws_.rd_.op == detail::opcode::text)
{ {
if(! ws_.rd_.utf8.write(mb) || // Fill the read buffer first, otherwise we
(ws_.rd_.remain == 0 && ws_.rd_.fh.fin && // get fewer bytes at the cost of one I/O.
! ws_.rd_.utf8.finish())) BOOST_ASIO_CORO_YIELD
{ ws_.stream_.async_read_some(
// _Fail the WebSocket Connection_ ws_.rd_.buf.prepare(read_size(
code = close_code::bad_payload; ws_.rd_.buf, ws_.rd_.buf.max_size())),
ec = error::failed; std::move(*this));
goto close; dispatched_ = true;
} ws_.failed_ = !!ec;
if(ws_.failed_)
goto upcall;
ws_.rd_.buf.commit(bytes_transferred);
if(ws_.rd_.fh.mask)
detail::mask_inplace(buffer_prefix(clamp(
ws_.rd_.remain), ws_.rd_.buf.data()),
ws_.rd_.key);
} }
bytes_written_ += bytes_transferred; if(ws_.rd_.buf.size() > 0)
ws_.rd_.size += bytes_transferred;
ws_.rd_.buf.consume(bytes_transferred);
}
else
{
// Read into caller's buffer
BOOST_ASSERT(ws_.rd_.remain > 0);
BOOST_ASSERT(buffer_size(cb_) > 0);
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(buffer_prefix(
clamp(ws_.rd_.remain), cb_), std::move(*this));
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
bytes_transferred, cb_);
ws_.rd_.remain -= bytes_transferred;
if(ws_.rd_.fh.mask)
detail::mask_inplace(mb, ws_.rd_.key);
if(ws_.rd_.op == detail::opcode::text)
{ {
if(! ws_.rd_.utf8.write(mb) || // Copy from the read buffer.
(ws_.rd_.remain == 0 && ws_.rd_.fh.fin && // The mask was already applied.
! ws_.rd_.utf8.finish())) bytes_transferred = buffer_copy(cb_,
ws_.rd_.buf.data(), clamp(ws_.rd_.remain));
auto const mb = buffer_prefix(
bytes_transferred, cb_);
ws_.rd_.remain -= bytes_transferred;
if(ws_.rd_.op == detail::opcode::text)
{ {
// _Fail the WebSocket Connection_ if(! ws_.rd_.utf8.write(mb) ||
code = close_code::bad_payload; (ws_.rd_.remain == 0 && ws_.rd_.fh.fin &&
ec = error::failed; ! ws_.rd_.utf8.finish()))
goto close; {
// _Fail the WebSocket Connection_
code = close_code::bad_payload;
ec = error::failed;
goto close;
}
} }
bytes_written_ += bytes_transferred;
ws_.rd_.size += bytes_transferred;
ws_.rd_.buf.consume(bytes_transferred);
}
else
{
// Read into caller's buffer
BOOST_ASSERT(ws_.rd_.remain > 0);
BOOST_ASSERT(buffer_size(cb_) > 0);
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(buffer_prefix(
clamp(ws_.rd_.remain), cb_), std::move(*this));
dispatched_ = true;
ws_.failed_ = !!ec;
if(ws_.failed_)
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
bytes_transferred, cb_);
ws_.rd_.remain -= bytes_transferred;
if(ws_.rd_.fh.mask)
detail::mask_inplace(mb, ws_.rd_.key);
if(ws_.rd_.op == detail::opcode::text)
{
if(! ws_.rd_.utf8.write(mb) ||
(ws_.rd_.remain == 0 && ws_.rd_.fh.fin &&
! ws_.rd_.utf8.finish()))
{
// _Fail the WebSocket Connection_
code = close_code::bad_payload;
ec = error::failed;
goto close;
}
}
bytes_written_ += bytes_transferred;
ws_.rd_.size += bytes_transferred;
} }
bytes_written_ += bytes_transferred;
ws_.rd_.size += bytes_transferred;
} }
ws_.rd_.done = ws_.rd_.done = ws_.rd_.remain == 0 && ws_.rd_.fh.fin;
ws_.rd_.remain == 0 && ws_.rd_.fh.fin;
goto upcall; goto upcall;
} }
else else
@ -1033,15 +1034,16 @@ loop:
// Empty non-final frame // Empty non-final frame
goto loop; goto loop;
} }
rd_.done = rd_.remain == 0 && rd_.fh.fin; rd_.done = false;
} }
else else
{ {
ec.assign(0, ec.category()); ec.assign(0, ec.category());
} }
if( ! rd_.done) if(! pmd_ || ! pmd_->rd_set)
{ {
if(! pmd_ || ! pmd_->rd_set) // Check for empty final frame
if(rd_.remain > 0 || ! rd_.fh.fin)
{ {
if(rd_.buf.size() == 0 && rd_.buf.max_size() > if(rd_.buf.size() == 0 && rd_.buf.max_size() >
(std::min)(clamp(rd_.remain), (std::min)(clamp(rd_.remain),
@ -1122,132 +1124,132 @@ loop:
bytes_written += bytes_transferred; bytes_written += bytes_transferred;
rd_.size += bytes_transferred; rd_.size += bytes_transferred;
} }
rd_.done = rd_.remain == 0 && rd_.fh.fin;
} }
else rd_.done = rd_.remain == 0 && rd_.fh.fin;
}
else
{
// Read compressed message frame payload:
// inflate even if rd_.fh.len == 0, otherwise we
// never emit the end-of-stream deflate block.
//
bool did_read = false;
consuming_buffers<MutableBufferSequence> cb{buffers};
while(buffer_size(cb) > 0)
{ {
// Read compressed message frame payload: zlib::z_params zs;
// inflate even if rd_.fh.len == 0, otherwise we
// never emit the end-of-stream deflate block.
//
bool did_read = false;
consuming_buffers<MutableBufferSequence> cb{buffers};
while(buffer_size(cb) > 0)
{ {
zlib::z_params zs; auto const out = buffer_front(cb);
zs.next_out = buffer_cast<void*>(out);
zs.avail_out = buffer_size(out);
BOOST_ASSERT(zs.avail_out > 0);
}
if(rd_.remain > 0)
{
if(rd_.buf.size() > 0)
{ {
auto const out = buffer_front(cb); // use what's there
zs.next_out = buffer_cast<void*>(out); auto const in = buffer_prefix(
zs.avail_out = buffer_size(out); clamp(rd_.remain), buffer_front(
BOOST_ASSERT(zs.avail_out > 0); rd_.buf.data()));
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
} }
if(rd_.remain > 0) else if(! did_read)
{ {
if(rd_.buf.size() > 0) // read new
{ auto const bytes_transferred =
// use what's there stream_.read_some(
auto const in = buffer_prefix( rd_.buf.prepare(read_size(
clamp(rd_.remain), buffer_front( rd_.buf, rd_.buf.max_size())),
rd_.buf.data())); ec);
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
}
else if(! did_read)
{
// read new
auto const bytes_transferred =
stream_.read_some(
rd_.buf.prepare(read_size(
rd_.buf, rd_.buf.max_size())),
ec);
failed_ = !!ec;
if(failed_)
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
rd_.buf.commit(bytes_transferred);
if(rd_.fh.mask)
detail::mask_inplace(
buffer_prefix(clamp(rd_.remain),
rd_.buf.data()), rd_.key);
auto const in = buffer_prefix(
clamp(rd_.remain), buffer_front(
rd_.buf.data()));
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
did_read = true;
}
else
{
break;
}
}
else if(rd_.fh.fin)
{
// append the empty block codes
static std::uint8_t constexpr
empty_block[4] = {
0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(! ec);
failed_ = !!ec; failed_ = !!ec;
if(failed_) if(failed_)
return bytes_written; return bytes_written;
// VFALCO See: BOOST_ASSERT(bytes_transferred > 0);
// https://github.com/madler/zlib/issues/280 rd_.buf.commit(bytes_transferred);
BOOST_ASSERT(zs.total_out == 0); if(rd_.fh.mask)
cb.consume(zs.total_out); detail::mask_inplace(
rd_.size += zs.total_out; buffer_prefix(clamp(rd_.remain),
bytes_written += zs.total_out; rd_.buf.data()), rd_.key);
if( auto const in = buffer_prefix(
(role_ == role_type::client && clamp(rd_.remain), buffer_front(
pmd_config_.server_no_context_takeover) || rd_.buf.data()));
(role_ == role_type::server && zs.avail_in = buffer_size(in);
pmd_config_.client_no_context_takeover)) zs.next_in = buffer_cast<void const*>(in);
pmd_->zi.reset(); did_read = true;
rd_.done = true;
break;
} }
else else
{ {
break; break;
} }
}
else if(rd_.fh.fin)
{
// append the empty block codes
static std::uint8_t constexpr
empty_block[4] = {
0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
pmd_->zi.write(zs, zlib::Flush::sync, ec); pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream); BOOST_ASSERT(! ec);
failed_ = !!ec; failed_ = !!ec;
if(failed_) if(failed_)
return bytes_written; return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds( // VFALCO See:
rd_.size, zs.total_out, rd_msg_max_)) // https://github.com/madler/zlib/issues/280
{ BOOST_ASSERT(zs.total_out == 0);
do_fail(
close_code::too_big,
error::failed,
ec);
return bytes_written;
}
cb.consume(zs.total_out); cb.consume(zs.total_out);
rd_.size += zs.total_out; rd_.size += zs.total_out;
rd_.remain -= zs.total_in;
rd_.buf.consume(zs.total_in);
bytes_written += zs.total_out; bytes_written += zs.total_out;
if(
(role_ == role_type::client &&
pmd_config_.server_no_context_takeover) ||
(role_ == role_type::server &&
pmd_config_.client_no_context_takeover))
pmd_->zi.reset();
rd_.done = true;
break;
} }
if(rd_.op == detail::opcode::text) else
{ {
// check utf8 break;
if(! rd_.utf8.write( }
buffer_prefix(bytes_written, buffers)) || ( pmd_->zi.write(zs, zlib::Flush::sync, ec);
rd_.remain == 0 && rd_.fh.fin && BOOST_ASSERT(ec != zlib::error::end_of_stream);
! rd_.utf8.finish())) failed_ = !!ec;
{ if(failed_)
// _Fail the WebSocket Connection_ return bytes_written;
do_fail( if(rd_msg_max_ && beast::detail::sum_exceeds(
close_code::bad_payload, rd_.size, zs.total_out, rd_msg_max_))
error::failed, {
ec); do_fail(
return bytes_written; close_code::too_big,
} error::failed,
ec);
return bytes_written;
}
cb.consume(zs.total_out);
rd_.size += zs.total_out;
rd_.remain -= zs.total_in;
rd_.buf.consume(zs.total_in);
bytes_written += zs.total_out;
}
if(rd_.op == detail::opcode::text)
{
// check utf8
if(! rd_.utf8.write(
buffer_prefix(bytes_written, buffers)) || (
rd_.remain == 0 && rd_.fh.fin &&
! rd_.utf8.finish()))
{
// _Fail the WebSocket Connection_
do_fail(
close_code::bad_payload,
error::failed,
ec);
return bytes_written;
} }
} }
} }