per-message compression options

fix #226, #227
This commit is contained in:
alandefreitas
2022-07-05 20:19:29 -03:00
committed by Klemens
parent 677a3e0179
commit bd69638e9d
9 changed files with 178 additions and 14 deletions

View File

@@ -1,3 +1,9 @@
Version 338:
* Added per message compression options.
--------------------------------------------------------------------------------
Version 337: Version 337:
* Added timeout option to websocket * Added timeout option to websocket

View File

@@ -302,6 +302,11 @@ struct impl_base<true>
return pmd_ != nullptr; return pmd_ != nullptr;
} }
bool should_compress(std::size_t n_bytes) const
{
return n_bytes >= pmd_opts_.msg_size_threshold;
}
std::size_t std::size_t
read_size_hint_pmd( read_size_hint_pmd(
std::size_t initial_size, std::size_t initial_size,
@@ -447,6 +452,11 @@ struct impl_base<false>
return false; return false;
} }
bool should_compress(std::size_t n_bytes) const
{
return false;
}
std::size_t std::size_t
read_size_hint_pmd( read_size_hint_pmd(
std::size_t initial_size, std::size_t initial_size,

View File

@@ -304,6 +304,22 @@ text() const
return impl_->wr_opcode == detail::opcode::text; return impl_->wr_opcode == detail::opcode::text;
} }
template<class NextLayer, bool deflateSupported>
void
stream<NextLayer, deflateSupported>::
compress(bool value)
{
impl_->wr_compress_opt = value;
}
template<class NextLayer, bool deflateSupported>
bool
stream<NextLayer, deflateSupported>::
compress() const
{
return impl_->wr_compress_opt;
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// _Fail the WebSocket Connection_ // _Fail the WebSocket Connection_

View File

@@ -221,11 +221,13 @@ struct stream<NextLayer, deflateSupported>::impl_type
// Called just before sending // Called just before sending
// the first frame of each message // the first frame of each message
void void
begin_msg() begin_msg(std::size_t n_bytes)
{ {
wr_frag = wr_frag_opt; wr_frag = wr_frag_opt;
wr_compress = wr_compress =
this->pmd_enabled() && wr_compress_opt; this->pmd_enabled() &&
wr_compress_opt &&
this->should_compress(n_bytes);
// Maintain the write buffer // Maintain the write buffer
if( this->pmd_enabled() || if( this->pmd_enabled() ||

View File

@@ -86,7 +86,7 @@ public:
// Set up the outgoing frame header // Set up the outgoing frame header
if(! impl.wr_cont) if(! impl.wr_cont)
{ {
impl.begin_msg(); impl.begin_msg(beast::buffer_bytes(bs));
fh_.rsv1 = impl.wr_compress; fh_.rsv1 = impl.wr_compress;
} }
else else
@@ -114,7 +114,7 @@ public:
else else
{ {
BOOST_ASSERT(impl.wr_buf_size != 0); BOOST_ASSERT(impl.wr_buf_size != 0);
remain_ = buffer_bytes(cb_); remain_ = beast::buffer_bytes(cb_);
if(remain_ > impl.wr_buf_size) if(remain_ > impl.wr_buf_size)
how_ = do_nomask_frag; how_ = do_nomask_frag;
else else
@@ -130,7 +130,7 @@ public:
else else
{ {
BOOST_ASSERT(impl.wr_buf_size != 0); BOOST_ASSERT(impl.wr_buf_size != 0);
remain_ = buffer_bytes(cb_); remain_ = beast::buffer_bytes(cb_);
if(remain_ > impl.wr_buf_size) if(remain_ > impl.wr_buf_size)
how_ = do_mask_frag; how_ = do_mask_frag;
else else
@@ -207,7 +207,7 @@ operator()(
{ {
// send a single frame // send a single frame
fh_.fin = fin_; fh_.fin = fin_;
fh_.len = buffer_bytes(cb_); fh_.len = beast::buffer_bytes(cb_);
impl.wr_fb.clear(); impl.wr_fb.clear();
detail::write<flat_static_buffer_base>( detail::write<flat_static_buffer_base>(
impl.wr_fb, fh_); impl.wr_fb, fh_);
@@ -462,13 +462,13 @@ operator()(
more_ = impl.deflate(b, cb_, fin_, in_, ec); more_ = impl.deflate(b, cb_, fin_, in_, ec);
if(impl.check_stop_now(ec)) if(impl.check_stop_now(ec))
goto upcall; goto upcall;
n = buffer_bytes(b); n = beast::buffer_bytes(b);
if(n == 0) if(n == 0)
{ {
// The input was consumed, but there is // The input was consumed, but there is
// no output due to compression latency. // no output due to compression latency.
BOOST_ASSERT(! fin_); BOOST_ASSERT(! fin_);
BOOST_ASSERT(buffer_bytes(cb_) == 0); BOOST_ASSERT(beast::buffer_bytes(cb_) == 0);
goto upcall; goto upcall;
} }
if(fh_.mask) if(fh_.mask)
@@ -622,7 +622,7 @@ write_some(bool fin,
detail::frame_header fh; detail::frame_header fh;
if(! impl.wr_cont) if(! impl.wr_cont)
{ {
impl.begin_msg(); impl.begin_msg(beast::buffer_bytes(buffers));
fh.rsv1 = impl.wr_compress; fh.rsv1 = impl.wr_compress;
} }
else else
@@ -634,7 +634,7 @@ write_some(bool fin,
fh.op = impl.wr_cont ? fh.op = impl.wr_cont ?
detail::opcode::cont : impl.wr_opcode; detail::opcode::cont : impl.wr_opcode;
fh.mask = impl.role == role_type::client; fh.mask = impl.role == role_type::client;
auto remain = buffer_bytes(buffers); auto remain = beast::buffer_bytes(buffers);
if(impl.wr_compress) if(impl.wr_compress)
{ {
@@ -648,14 +648,14 @@ write_some(bool fin,
b, cb, fin, bytes_transferred, ec); b, cb, fin, bytes_transferred, ec);
if(impl.check_stop_now(ec)) if(impl.check_stop_now(ec))
return bytes_transferred; return bytes_transferred;
auto const n = buffer_bytes(b); auto const n = beast::buffer_bytes(b);
if(n == 0) if(n == 0)
{ {
// The input was consumed, but there // The input was consumed, but there
// is no output due to compression // is no output due to compression
// latency. // latency.
BOOST_ASSERT(! fin); BOOST_ASSERT(! fin);
BOOST_ASSERT(buffer_bytes(cb) == 0); BOOST_ASSERT(beast::buffer_bytes(cb) == 0);
fh.fin = false; fh.fin = false;
break; break;
} }

View File

@@ -55,6 +55,9 @@ struct permessage_deflate
/// Deflate memory level, 1..9 /// Deflate memory level, 1..9
int memLevel = 4; int memLevel = 4;
/// The minimum size a message should have to be compressed
std::size_t msg_size_threshold = 0;
}; };
} // websocket } // websocket

View File

@@ -597,6 +597,42 @@ public:
bool bool
text() const; text() const;
/** Set the compress message write option.
This controls whether or not outgoing messages should be
compressed. The setting is only applied when
@li The template parameter `deflateSupported` is true
@li Compression is enable. This is controlled with `stream::set_option`
@li Client and server have negotiated permessage-deflate settings
@li The message is larger than `permessage_deflate::msg_size_threshold`
This function permits adjusting per-message compression.
Changing the opcode after a message is started will only take effect
after the current message being sent is complete.
The default setting is to compress messages whenever the conditions
above are true.
@param value `true` if outgoing messages should be compressed
@par Example
Disabling compression for a single message.
@code
ws.compress(false);
ws.write(net::buffer(s), ec);
ws.compress(true);
@endcode
*/
void
compress(bool value);
/// Returns `true` if the compress message write option is set.
bool
compress() const;
/* /*
timer settings timer settings

View File

@@ -12,18 +12,18 @@
#include <boost/beast/core/detail/config.hpp> #include <boost/beast/core/detail/config.hpp>
#ifndef BOOST_BEAST_DOXYGEN
//[code_websocket_1h //[code_websocket_1h
namespace boost { namespace boost {
namespace beast { namespace beast {
namespace websocket { namespace websocket {
#ifndef BOOST_BEAST_DOXYGEN
template< template<
class NextLayer, class NextLayer,
bool deflateSupported = true> bool deflateSupported = true>
class stream; class stream;
#endif
} // websocket } // websocket
} // beast } // beast
@@ -32,3 +32,5 @@ class stream;
//] //]
#endif #endif
#endif

View File

@@ -651,6 +651,92 @@ public:
} }
} }
/*
https://github.com/boostorg/beast/issues/227
intelligent compression.
*/
void
testIssue226()
{
net::io_context ioc;
permessage_deflate pmd;
pmd.client_enable = true;
pmd.server_enable = true;
pmd.msg_size_threshold = 5;
stream<test::stream> ws0{ioc};
stream<test::stream> ws1{ioc};
ws0.next_layer().connect(ws1.next_layer());
ws0.set_option(pmd);
ws1.set_option(pmd);
ws1.async_accept(
[](error_code ec)
{
BEAST_EXPECTS(! ec, ec.message());
});
ws0.async_handshake("test", "/",
[](error_code ec)
{
BEAST_EXPECTS(! ec, ec.message());
});
ioc.run();
ioc.restart();
std::string s(256, '*');
auto const n0 =
ws0.next_layer().nwrite_bytes();
error_code ec;
BEAST_EXPECTS(! ec, ec.message());
ws1.compress(false);
ws1.write(net::buffer(s), ec);
ws1.compress(true);
auto const n1 =
ws0.next_layer().nwrite_bytes();
// Make sure the string was actually compressed
BEAST_EXPECT(n1 > n0 + s.size());
}
/*
https://github.com/boostorg/beast/issues/227
intelligent compression.
*/
void
testIssue227()
{
net::io_context ioc;
permessage_deflate pmd;
pmd.client_enable = true;
pmd.server_enable = true;
pmd.msg_size_threshold = 260;
stream<test::stream> ws0{ioc};
stream<test::stream> ws1{ioc};
ws0.next_layer().connect(ws1.next_layer());
ws0.set_option(pmd);
ws1.set_option(pmd);
ws1.async_accept(
[](error_code ec)
{
BEAST_EXPECTS(! ec, ec.message());
});
ws0.async_handshake("test", "/",
[](error_code ec)
{
BEAST_EXPECTS(! ec, ec.message());
});
ioc.run();
ioc.restart();
std::string s(256, '*');
auto const n0 =
ws0.next_layer().nwrite_bytes();
error_code ec;
BEAST_EXPECTS(! ec, ec.message());
ws1.write(net::buffer(s), ec);
auto const n1 =
ws0.next_layer().nwrite_bytes();
// Make sure the string was actually compressed
BEAST_EXPECT(n1 > n0 + s.size());
}
/* /*
https://github.com/boostorg/beast/issues/300 https://github.com/boostorg/beast/issues/300
@@ -720,6 +806,7 @@ public:
BEAST_EXPECT(n1 < n0 + s.size()); BEAST_EXPECT(n1 < n0 + s.size());
} }
#if BOOST_ASIO_HAS_CO_AWAIT #if BOOST_ASIO_HAS_CO_AWAIT
void testAwaitableCompiles( void testAwaitableCompiles(
stream<test::stream>& s, stream<test::stream>& s,
@@ -744,6 +831,8 @@ public:
testWriteSuspend(); testWriteSuspend();
testAsyncWriteFrame(); testAsyncWriteFrame();
testMoveOnly(); testMoveOnly();
testIssue226();
testIssue227();
testIssue300(); testIssue300();
testIssue1666(); testIssue1666();
#if BOOST_ASIO_HAS_CO_AWAIT #if BOOST_ASIO_HAS_CO_AWAIT