Fix write_frame masking and auto-fragment handling

This commit is contained in:
Vinnie Falco
2016-10-24 11:18:31 -04:00
parent dfb2d05be3
commit a878165e36
3 changed files with 102 additions and 62 deletions

View File

@@ -14,6 +14,7 @@ WebSocket
Core Core
* Meet DynamicBuffer requirements for static_streambuf * Meet DynamicBuffer requirements for static_streambuf
* Fix write_frame masking and auto-fragment handling
Extras Extras

View File

@@ -98,14 +98,37 @@ protected:
invokable wr_op_; // invoked after read completes invokable wr_op_; // invoked after read completes
close_reason cr_; // set from received close frame close_reason cr_; // set from received close frame
// State information for the message being sent
//
struct wr_t struct wr_t
{ {
bool cont; // next frame is continuation frame // `true` if next frame is a continuation,
bool autofrag; // if this message is auto fragmented // `false` if next frame starts a new message
bool compress; // if this message is compressed bool cont;
std::size_t size; // amount stored in buffer
std::size_t max; // size of write buffer // `true` if this message should be auto-fragmented
std::unique_ptr<std::uint8_t[]> buf;// write buffer storage // This gets set to the auto-fragment option at the beginning
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool autofrag;
// `true` if this message should be compressed.
// This gets set to the compress option at the beginning of
// of sending a message, so that the option can be changed
// mid-send without affecting the current message.
bool compress;
// Size of the write buffer.
// This gets set to the write buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t size;
// The write buffer.
// The buffer is allocated or reallocated at the beginning of
// sending a message.
//
std::unique_ptr<std::uint8_t[]> buf;
void void
open() open()
@@ -383,19 +406,18 @@ wr_prepare(bool compress)
{ {
wr_.autofrag = wr_autofrag_; wr_.autofrag = wr_autofrag_;
wr_.compress = compress; wr_.compress = compress;
wr_.size = 0;
if(compress || wr_.autofrag || if(compress || wr_.autofrag ||
role_ == detail::role_type::client) role_ == detail::role_type::client)
{ {
if(! wr_.buf || wr_.max != wr_buf_size_) if(! wr_.buf || wr_.size != wr_buf_size_)
{ {
wr_.max = wr_buf_size_; wr_.size = wr_buf_size_;
wr_.buf.reset(new std::uint8_t[wr_.max]); wr_.buf.reset(new std::uint8_t[wr_.size]);
} }
} }
else else
{ {
wr_.max = wr_buf_size_; wr_.size = wr_buf_size_;
wr_.buf.reset(); wr_.buf.reset();
} }
} }

View File

@@ -408,73 +408,70 @@ write_frame(bool fin,
fh.rsv2 = false; fh.rsv2 = false;
fh.rsv3 = false; fh.rsv3 = false;
fh.mask = role_ == detail::role_type::client; fh.mask = role_ == detail::role_type::client;
wr_.cont = ! fin;
auto remain = buffer_size(buffers); auto remain = buffer_size(buffers);
if(compress) if(compress)
{ {
// TODO // TODO
} }
else if(wr_.autofrag) else if(! fh.mask && ! wr_.autofrag)
{ {
consuming_buffers<ConstBufferSequence> cb(buffers); fh.fin = fin;
do fh.len = remain;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffers), ec);
failed_ = ec != 0;
if(failed_)
return;
return;
}
else if(! fh.mask && wr_.autofrag)
{
BOOST_ASSERT(wr_.size != 0);
consuming_buffers<
ConstBufferSequence> cb(buffers);
for(;;)
{ {
auto const room = wr_.max - wr_.size; auto const n =
if(! fin && remain < room) detail::clamp(remain, wr_.size);
{ fh.len = n;
buffer_copy( remain -= n;
buffer(wr_.buf.get() + wr_.size, remain), cb); fh.fin = fin ? remain == 0 : false;
wr_.size += remain;
return;
}
auto const n = detail::clamp(remain, room);
buffer_copy(
buffer(wr_.buf.get() + wr_.size, n), cb);
auto const mb = buffer(wr_.buf.get(), wr_.size + n);
if(fh.mask)
{
fh.key = maskgen_();
detail::prepared_key_type key;
detail::prepare_key(key, fh.key);
detail::mask_inplace(mb, key);
}
fh.fin = fin && n == remain;
fh.len = buffer_size(mb);
detail::fh_streambuf fh_buf; detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh); detail::write<static_streambuf>(fh_buf, fh);
// send header and payload
boost::asio::write(stream_, boost::asio::write(stream_,
buffer_cat(fh_buf.data(), mb), ec); buffer_cat(fh_buf.data(),
prepare_buffers(n, cb)), ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
return; return;
remain -= n; if(remain == 0)
cb.consume(n); break;
wr_.size = 0;
fh.op = opcode::cont; fh.op = opcode::cont;
cb.consume(n);
} }
while(remain > 0);
wr_.cont = ! fh.fin;
return; return;
} }
else if(fh.mask) else if(fh.mask && ! wr_.autofrag)
{ {
consuming_buffers<ConstBufferSequence> cb(buffers);
fh.fin = fin;
fh.len = remain;
fh.key = maskgen_(); fh.key = maskgen_();
wr_.cont = ! fh.fin;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
detail::prepared_key_type key; detail::prepared_key_type key;
detail::prepare_key(key, fh.key); detail::prepare_key(key, fh.key);
fh.fin = fin;
fh.len = remain;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
consuming_buffers<
ConstBufferSequence> cb(buffers);
{ {
auto const n = detail::clamp(remain, wr_.max); auto const n = detail::clamp(remain, wr_.size);
auto const mb = buffer(wr_.buf.get(), n); auto const mb = buffer(wr_.buf.get(), n);
buffer_copy(mb, cb); buffer_copy(mb, cb);
cb.consume(n); cb.consume(n);
remain -= n; remain -= n;
detail::mask_inplace(mb, key); detail::mask_inplace(mb, key);
// send header and payload
boost::asio::write(stream_, boost::asio::write(stream_,
buffer_cat(fh_buf.data(), mb), ec); buffer_cat(fh_buf.data(), mb), ec);
failed_ = ec != 0; failed_ = ec != 0;
@@ -483,13 +480,12 @@ write_frame(bool fin,
} }
while(remain > 0) while(remain > 0)
{ {
auto const n = detail::clamp(remain, wr_.max); auto const n = detail::clamp(remain, wr_.size);
auto const mb = buffer(wr_.buf.get(), n); auto const mb = buffer(wr_.buf.get(), n);
buffer_copy(mb, cb); buffer_copy(mb, cb);
cb.consume(n); cb.consume(n);
remain -= n; remain -= n;
detail::mask_inplace(mb, key); detail::mask_inplace(mb, key);
// send payload
boost::asio::write(stream_, mb, ec); boost::asio::write(stream_, mb, ec);
failed_ = ec != 0; failed_ = ec != 0;
if(failed_) if(failed_)
@@ -497,16 +493,37 @@ write_frame(bool fin,
} }
return; return;
} }
else if(fh.mask && wr_.autofrag)
{ {
// send header and payload BOOST_ASSERT(wr_.size != 0);
fh.fin = fin; consuming_buffers<
fh.len = remain; ConstBufferSequence> cb(buffers);
wr_.cont = ! fh.fin; for(;;)
detail::fh_streambuf fh_buf; {
detail::write<static_streambuf>(fh_buf, fh); fh.key = maskgen_();
boost::asio::write(stream_, detail::prepared_key_type key;
buffer_cat(fh_buf.data(), buffers), ec); detail::prepare_key(key, fh.key);
failed_ = ec != 0; auto const n =
detail::clamp(remain, wr_.size);
auto const mb = buffer(wr_.buf.get(), n);
buffer_copy(mb, cb);
detail::mask_inplace(mb, key);
fh.len = n;
remain -= n;
fh.fin = fin ? remain == 0 : false;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), mb), ec);
failed_ = ec != 0;
if(failed_)
return;
if(remain == 0)
break;
fh.op = opcode::cont;
cb.consume(n);
}
return;
} }
} }