Fix outgoing websocket message compression

fix #1666
This commit is contained in:
Vinnie Falco
2019-07-29 10:35:41 -07:00
parent 2ed1c92e03
commit 9f77867f0a
5 changed files with 105 additions and 31 deletions

View File

@ -1,3 +1,9 @@
Version 265:
* Fix outgoing websocket message compression
--------------------------------------------------------------------------------
Version 264:
* Handle overflow in max size calculation in `basic_dynamic_body`

View File

@ -107,6 +107,7 @@ class stream::read_op : public stream::read_op_base
net::buffer_copy(
b_, sp->b.data(), sp->read_max);
sp->b.consume(bytes_transferred);
sp->nread_bytes += bytes_transferred;
}
else if (buffer_bytes(b_) > 0)
{
@ -231,6 +232,7 @@ struct stream::run_write_op
std::lock_guard<std::mutex> lock(out->m);
n = net::buffer_copy(out->b.prepare(n), buffers);
out->b.commit(n);
out->nwrite_bytes += n;
out->notify_read();
}
BOOST_ASSERT(! ec);
@ -294,6 +296,7 @@ read_some(MutableBufferSequence const& buffers,
auto const n = net::buffer_copy(
buffers, in_->b.data(), in_->read_max);
in_->b.consume(n);
in_->nread_bytes += n;
return n;
}
@ -377,6 +380,7 @@ write_some(
std::lock_guard<std::mutex> lock(out->m);
n = net::buffer_copy(out->b.prepare(n), buffers);
out->b.commit(n);
out->nwrite_bytes += n;
out->notify_read();
}
return n;

View File

@ -141,7 +141,9 @@ class stream
status code = status::ok;
fail_count* fc = nullptr;
std::size_t nread = 0;
std::size_t nread_bytes = 0;
std::size_t nwrite = 0;
std::size_t nwrite_bytes = 0;
std::size_t read_max =
(std::numeric_limits<std::size_t>::max)();
std::size_t write_max =
@ -361,6 +363,13 @@ public:
return in_->nread;
}
/// Return the number of bytes read
std::size_t
nread_bytes() const noexcept
{
return in_->nread_bytes;
}
/// Return the number of writes
std::size_t
nwrite() const noexcept
@ -368,6 +377,13 @@ public:
return in_->nwrite;
}
/// Return the number of bytes written
std::size_t
nwrite_bytes() const noexcept
{
return in_->nwrite_bytes;
}
/** Close the stream.
The other end of the connection will see

View File

@ -97,7 +97,8 @@ struct stream<NextLayer, deflateSupported>::impl_type
bool wr_cont /* next write is a continuation */ = false;
bool wr_frag /* autofrag the current message */ = false;
bool wr_frag_opt /* autofrag option setting */ = true;
bool wr_compress /* compress current message */ = false;
bool wr_compress; /* compress current message */
bool wr_compress_opt /* compress message setting */ = true;
detail::opcode wr_opcode /* message type */ = detail::opcode::text;
std::unique_ptr<
std::uint8_t[]> wr_buf; // write buffer
@ -209,11 +210,14 @@ struct stream<NextLayer, deflateSupported>::impl_type
timer.cancel();
}
// Called before each write frame
// Called just before sending
// the first frame of each message
void
begin_msg()
{
wr_frag = wr_frag_opt;
wr_compress =
this->pmd_enabled() && wr_compress_opt;
// Maintain the write buffer
if( this->pmd_enabled() ||
@ -232,6 +236,8 @@ struct stream<NextLayer, deflateSupported>::impl_type
wr_buf_size = wr_buf_opt;
wr_buf.reset();
}
//
}
//--------------------------------------------------------------------------

View File

@ -609,34 +609,6 @@ public:
}
}
/*
https://github.com/boostorg/beast/issues/300
Write a message as two individual frames
*/
void
testIssue300()
{
for(int i = 0; i < 2; ++i )
{
echo_server es{log, i==1 ?
kind::async : kind::sync};
net::io_context ioc;
stream<test::stream> ws{ioc};
ws.next_layer().connect(es.stream());
error_code ec;
ws.handshake("localhost", "/", ec);
if(! BEAST_EXPECTS(! ec, ec.message()))
return;
ws.write_some(false, sbuf("u"));
ws.write_some(true, sbuf("v"));
multi_buffer b;
ws.read(b, ec);
BEAST_EXPECTS(! ec, ec.message());
}
}
void
testMoveOnly()
{
@ -675,6 +647,75 @@ public:
}
}
/*
https://github.com/boostorg/beast/issues/300
Write a message as two individual frames
*/
void
testIssue300()
{
for(int i = 0; i < 2; ++i )
{
echo_server es{log, i==1 ?
kind::async : kind::sync};
net::io_context ioc;
stream<test::stream> ws{ioc};
ws.next_layer().connect(es.stream());
error_code ec;
ws.handshake("localhost", "/", ec);
if(! BEAST_EXPECTS(! ec, ec.message()))
return;
ws.write_some(false, sbuf("u"));
ws.write_some(true, sbuf("v"));
multi_buffer b;
ws.read(b, ec);
BEAST_EXPECTS(! ec, ec.message());
}
}
/*
https://github.com/boostorg/beast/issues/1666
permessage-deflate not working in version 1.70.
*/
void
testIssue1666()
{
net::io_context ioc;
permessage_deflate pmd;
pmd.client_enable = true;
pmd.server_enable = true;
stream<test::stream> ws0{ioc};
stream<test::stream> ws1{ioc};
ws0.next_layer().connect(ws1.next_layer());
ws0.set_option(pmd);
ws1.set_option(pmd);
ws1.async_accept(
[](error_code ec)
{
BEAST_EXPECTS(! ec, ec.message());
});
ws0.async_handshake("test", "/",
[](error_code ec)
{
BEAST_EXPECTS(! ec, ec.message());
});
ioc.run();
ioc.restart();
std::string s(256, '*');
auto const n0 =
ws0.next_layer().nwrite_bytes();
error_code ec;
BEAST_EXPECTS(! ec, ec.message());
ws1.write(net::buffer(s), ec);
auto const n1 =
ws0.next_layer().nwrite_bytes();
// Make sure the string was actually compressed
BEAST_EXPECT(n1 < n0 + s.size());
}
void
run() override
{
@ -682,8 +723,9 @@ public:
testPausationAbandoning();
testWriteSuspend();
testAsyncWriteFrame();
testIssue300();
testMoveOnly();
testIssue300();
testIssue1666();
}
};