From a878165e366050983aeb7de16fb2ab57079fb3a8 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 24 Oct 2016 11:18:31 -0400 Subject: [PATCH] Fix write_frame masking and auto-fragment handling --- CHANGELOG.md | 1 + .../beast/websocket/detail/stream_base.hpp | 44 +++++-- include/beast/websocket/impl/write.ipp | 119 ++++++++++-------- 3 files changed, 102 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5488b85d..80390c86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ WebSocket Core * Meet DynamicBuffer requirements for static_streambuf +* Fix write_frame masking and auto-fragment handling Extras diff --git a/include/beast/websocket/detail/stream_base.hpp b/include/beast/websocket/detail/stream_base.hpp index 00c88eee..08579a0f 100644 --- a/include/beast/websocket/detail/stream_base.hpp +++ b/include/beast/websocket/detail/stream_base.hpp @@ -98,14 +98,37 @@ protected: invokable wr_op_; // invoked after read completes close_reason cr_; // set from received close frame + // State information for the message being sent + // struct wr_t { - bool cont; // next frame is continuation frame - bool autofrag; // if this message is auto fragmented - bool compress; // if this message is compressed - std::size_t size; // amount stored in buffer - std::size_t max; // size of write buffer - std::unique_ptr buf;// write buffer storage + // `true` if next frame is a continuation, + // `false` if next frame starts a new message + bool cont; + + // `true` if this message should be auto-fragmented + // This gets set to the auto-fragment option at the beginning + // of sending a message, so that the option can be changed + // mid-send without affecting the current message. + bool autofrag; + + // `true` if this message should be compressed. + // This gets set to the compress option at the beginning of + // of sending a message, so that the option can be changed + // mid-send without affecting the current message. + bool compress; + + // Size of the write buffer. + // This gets set to the write buffer size option at the + // beginning of sending a message, so that the option can be + // changed mid-send without affecting the current message. + std::size_t size; + + // The write buffer. + // The buffer is allocated or reallocated at the beginning of + // sending a message. + // + std::unique_ptr buf; void open() @@ -383,19 +406,18 @@ wr_prepare(bool compress) { wr_.autofrag = wr_autofrag_; wr_.compress = compress; - wr_.size = 0; if(compress || wr_.autofrag || role_ == detail::role_type::client) { - if(! wr_.buf || wr_.max != wr_buf_size_) + if(! wr_.buf || wr_.size != wr_buf_size_) { - wr_.max = wr_buf_size_; - wr_.buf.reset(new std::uint8_t[wr_.max]); + wr_.size = wr_buf_size_; + wr_.buf.reset(new std::uint8_t[wr_.size]); } } else { - wr_.max = wr_buf_size_; + wr_.size = wr_buf_size_; wr_.buf.reset(); } } diff --git a/include/beast/websocket/impl/write.ipp b/include/beast/websocket/impl/write.ipp index ab34abbe..28d2aec3 100644 --- a/include/beast/websocket/impl/write.ipp +++ b/include/beast/websocket/impl/write.ipp @@ -408,73 +408,70 @@ write_frame(bool fin, fh.rsv2 = false; fh.rsv3 = false; fh.mask = role_ == detail::role_type::client; + wr_.cont = ! fin; auto remain = buffer_size(buffers); if(compress) { // TODO } - else if(wr_.autofrag) + else if(! fh.mask && ! wr_.autofrag) { - consuming_buffers cb(buffers); - do + fh.fin = fin; + fh.len = remain; + detail::fh_streambuf fh_buf; + detail::write(fh_buf, fh); + boost::asio::write(stream_, + buffer_cat(fh_buf.data(), buffers), ec); + failed_ = ec != 0; + if(failed_) + return; + return; + } + else if(! fh.mask && wr_.autofrag) + { + BOOST_ASSERT(wr_.size != 0); + consuming_buffers< + ConstBufferSequence> cb(buffers); + for(;;) { - auto const room = wr_.max - wr_.size; - if(! fin && remain < room) - { - buffer_copy( - buffer(wr_.buf.get() + wr_.size, remain), cb); - wr_.size += remain; - return; - } - auto const n = detail::clamp(remain, room); - buffer_copy( - buffer(wr_.buf.get() + wr_.size, n), cb); - auto const mb = buffer(wr_.buf.get(), wr_.size + n); - if(fh.mask) - { - fh.key = maskgen_(); - detail::prepared_key_type key; - detail::prepare_key(key, fh.key); - detail::mask_inplace(mb, key); - } - fh.fin = fin && n == remain; - fh.len = buffer_size(mb); + auto const n = + detail::clamp(remain, wr_.size); + fh.len = n; + remain -= n; + fh.fin = fin ? remain == 0 : false; detail::fh_streambuf fh_buf; detail::write(fh_buf, fh); - // send header and payload boost::asio::write(stream_, - buffer_cat(fh_buf.data(), mb), ec); + buffer_cat(fh_buf.data(), + prepare_buffers(n, cb)), ec); failed_ = ec != 0; if(failed_) return; - remain -= n; - cb.consume(n); - wr_.size = 0; + if(remain == 0) + break; fh.op = opcode::cont; + cb.consume(n); } - while(remain > 0); - wr_.cont = ! fh.fin; return; } - else if(fh.mask) + else if(fh.mask && ! wr_.autofrag) { - consuming_buffers cb(buffers); - fh.fin = fin; - fh.len = remain; fh.key = maskgen_(); - wr_.cont = ! fh.fin; - detail::fh_streambuf fh_buf; - detail::write(fh_buf, fh); detail::prepared_key_type key; detail::prepare_key(key, fh.key); + fh.fin = fin; + fh.len = remain; + detail::fh_streambuf fh_buf; + detail::write(fh_buf, fh); + consuming_buffers< + ConstBufferSequence> cb(buffers); { - auto const n = detail::clamp(remain, wr_.max); + auto const n = detail::clamp(remain, wr_.size); auto const mb = buffer(wr_.buf.get(), n); buffer_copy(mb, cb); cb.consume(n); remain -= n; detail::mask_inplace(mb, key); - // send header and payload boost::asio::write(stream_, buffer_cat(fh_buf.data(), mb), ec); failed_ = ec != 0; @@ -483,13 +480,12 @@ write_frame(bool fin, } while(remain > 0) { - auto const n = detail::clamp(remain, wr_.max); + auto const n = detail::clamp(remain, wr_.size); auto const mb = buffer(wr_.buf.get(), n); buffer_copy(mb, cb); cb.consume(n); remain -= n; detail::mask_inplace(mb, key); - // send payload boost::asio::write(stream_, mb, ec); failed_ = ec != 0; if(failed_) @@ -497,16 +493,37 @@ write_frame(bool fin, } return; } + else if(fh.mask && wr_.autofrag) { - // send header and payload - fh.fin = fin; - fh.len = remain; - wr_.cont = ! fh.fin; - detail::fh_streambuf fh_buf; - detail::write(fh_buf, fh); - boost::asio::write(stream_, - buffer_cat(fh_buf.data(), buffers), ec); - failed_ = ec != 0; + BOOST_ASSERT(wr_.size != 0); + consuming_buffers< + ConstBufferSequence> cb(buffers); + for(;;) + { + fh.key = maskgen_(); + detail::prepared_key_type key; + detail::prepare_key(key, fh.key); + auto const n = + detail::clamp(remain, wr_.size); + auto const mb = buffer(wr_.buf.get(), n); + buffer_copy(mb, cb); + detail::mask_inplace(mb, key); + fh.len = n; + remain -= n; + fh.fin = fin ? remain == 0 : false; + detail::fh_streambuf fh_buf; + detail::write(fh_buf, fh); + boost::asio::write(stream_, + buffer_cat(fh_buf.data(), mb), ec); + failed_ = ec != 0; + if(failed_) + return; + if(remain == 0) + break; + fh.op = opcode::cont; + cb.consume(n); + } + return; } }