Refactor stream operations and tests plus coverage

This commit is contained in:
Vinnie Falco
2017-08-31 17:52:09 -07:00
parent 9f089c2a33
commit 51a5a36118
22 changed files with 1290 additions and 354 deletions

View File

@ -3,6 +3,7 @@ Version 111:
WebSocket:
* Fix utf8 check split code point at buffer end
* Refactor stream operations and tests plus coverage
--------------------------------------------------------------------------------

View File

@ -187,6 +187,12 @@ public:
void
save(F&& f);
explicit
operator bool() const
{
return base_ != nullptr;
}
bool
maybe_invoke()
{

View File

@ -57,14 +57,17 @@ parse_bits(string_view s)
return -1;
if(s[0] < '1' || s[0] > '9')
return -1;
int i = 0;
unsigned i = 0;
for(auto c : s)
{
if(c < '0' || c > '9')
return -1;
auto const i0 = i;
i = 10 * i + (c - '0');
if(i < i0)
return -1;
}
return i;
return static_cast<int>(i);
}
// Parse permessage-deflate request fields
@ -354,43 +357,6 @@ pmd_normalize(pmd_offer& offer)
//--------------------------------------------------------------------
// Decompress into a DynamicBuffer
//
template<class InflateStream, class DynamicBuffer>
void
inflate(
InflateStream& zi,
DynamicBuffer& buffer,
boost::asio::const_buffer const& in,
error_code& ec)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
zlib::z_params zs;
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
for(;;)
{
// VFALCO we could be smarter about the size
auto const bs = buffer.prepare(
read_size_or_throw(buffer, 65536));
auto const out = *bs.begin();
zs.avail_out = buffer_size(out);
zs.next_out = buffer_cast<void*>(out);
zi.write(zs, zlib::Flush::sync, ec);
buffer.commit(zs.total_out);
zs.total_out = 0;
if( ec == zlib::error::need_buffers ||
ec == zlib::error::end_of_stream)
{
ec.assign(0, ec.category());
break;
}
if(ec)
return;
}
}
// Compress a buffer sequence
// Returns: `true` if more calls are needed
//

View File

@ -22,8 +22,6 @@
#include <boost/throw_exception.hpp>
#include <memory>
#include <iostream>
namespace boost {
namespace beast {
namespace websocket {
@ -46,7 +44,7 @@ class stream<NextLayer>::close_op
detail::frame_buffer fb;
error_code ev;
token tok;
bool cont;
bool cont = false;
state(
Handler&,
@ -121,7 +119,8 @@ public:
template<class NextLayer>
template<class Handler>
void
stream<NextLayer>::close_op<Handler>::
stream<NextLayer>::
close_op<Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred,
@ -140,7 +139,7 @@ operator()(
d.ws.wr_block_ = d.tok;
// Make sure the stream is open
if(d.ws.check_fail(ec))
if(! d.ws.check_open(ec))
goto upcall;
}
else
@ -160,29 +159,33 @@ operator()(
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open
if(d.ws.check_fail(ec))
if(! d.ws.check_open(ec))
goto upcall;
}
// Can't call close twice
BOOST_ASSERT(! d.ws.wr_close_);
// Change status to closing
BOOST_ASSERT(d.ws.status_ == status::open);
d.ws.status_ = status::closing;
// Send close frame
d.ws.wr_close_ = true;
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
if(d.ws.check_fail(ec))
if(! d.ws.check_ok(ec))
goto upcall;
if(d.ws.rd_close_)
{
// This happens when the read_op gets a close frame
// at the same time we are sending the close frame. The
// read_op will be suspended on the write block.
// at the same time close_op is sending the close frame.
// The read_op will be suspended on the write block.
goto teardown;
}
// Maybe suspend
if(! d.ws.rd_block_)
{
@ -206,7 +209,9 @@ operator()(
BOOST_ASSERT(d.ws.rd_block_ == d.tok);
// Make sure the stream is open
if(d.ws.check_fail(ec))
BOOST_ASSERT(d.ws.status_ != status::open);
BOOST_ASSERT(d.ws.status_ != status::closed);
if( d.ws.status_ == status::failed)
goto upcall;
BOOST_ASSERT(! d.ws.rd_close_);
@ -231,7 +236,7 @@ operator()(
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())),
std::move(*this));
if(d.ws.check_fail(ec))
if(! d.ws.check_ok(ec))
goto upcall;
d.ws.rd_buf_.commit(bytes_transferred);
}
@ -271,7 +276,7 @@ operator()(
d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
d.ws.rd_buf_.max_size())),
std::move(*this));
if(d.ws.check_fail(ec))
if(! d.ws.check_ok(ec))
goto upcall;
d.ws.rd_buf_.commit(bytes_transferred);
}
@ -289,7 +294,6 @@ operator()(
async_teardown(d.ws.role_,
d.ws.stream_, std::move(*this));
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
BOOST_ASSERT(d.ws.open_);
if(ec == boost::asio::error::eof)
{
// Rationale:
@ -298,7 +302,11 @@ operator()(
}
if(! ec)
ec = d.ev;
d.ws.open_ = false;
if(ec)
d.ws.status_ = status::failed;
else
d.ws.status_ = status::closed;
d.ws.close();
upcall:
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
@ -346,7 +354,7 @@ close(close_reason const& cr, error_code& ec)
using beast::detail::clamp;
ec.assign(0, ec.category());
// Make sure the stream is open
if(check_fail(ec))
if(! check_open(ec))
return;
// If rd_close_ is set then we already sent a close
BOOST_ASSERT(! rd_close_);
@ -357,8 +365,9 @@ close(close_reason const& cr, error_code& ec)
write_close<flat_static_buffer_base>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
}
if(check_fail(ec))
if(! check_ok(ec))
return;
status_ = status::closing;
// Drain the connection
close_code code{};
if(rd_remain_ > 0)
@ -375,7 +384,7 @@ close(close_reason const& cr, error_code& ec)
stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
rd_buf_.commit(bytes_transferred);
}
@ -414,8 +423,7 @@ close(close_reason const& cr, error_code& ec)
stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec);
open_ = ! ec;
if(! open_)
if(! check_ok(ec))
return;
rd_buf_.commit(bytes_transferred);
}

View File

@ -131,7 +131,7 @@ operator()(error_code ec, std::size_t)
d.ws.wr_block_ = d.tok;
// Make sure the stream is open
if(d.ws.check_fail(ec))
if(! d.ws.check_open(ec))
{
BOOST_ASIO_CORO_YIELD
d.ws.get_io_service().post(
@ -156,7 +156,7 @@ operator()(error_code ec, std::size_t)
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
// Make sure the stream is open
if(d.ws.check_fail(ec))
if(! d.ws.check_open(ec))
goto upcall;
}
@ -164,7 +164,7 @@ operator()(error_code ec, std::size_t)
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(d.ws.stream_,
d.fb.data(), std::move(*this));
if(d.ws.check_fail(ec))
if(! d.ws.check_ok(ec))
goto upcall;
upcall:
@ -195,15 +195,14 @@ void
stream<NextLayer>::
ping(ping_data const& payload, error_code& ec)
{
ec.assign(0, ec.category());
// Make sure the stream is open
if(check_fail(ec))
if(! check_open(ec))
return;
detail::frame_buffer fb;
write_ping<flat_static_buffer_base>(
fb, detail::opcode::ping, payload);
boost::asio::write(stream_, fb.data(), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
}
@ -223,15 +222,14 @@ void
stream<NextLayer>::
pong(ping_data const& payload, error_code& ec)
{
ec.assign(0, ec.category());
// Make sure the stream is open
if(check_fail(ec))
if(! check_open(ec))
return;
detail::frame_buffer fb;
write_ping<flat_static_buffer_base>(
fb, detail::opcode::pong, payload);
boost::asio::write(stream_, fb.data(), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
}

View File

@ -53,7 +53,7 @@ class stream<NextLayer>::read_some_op
token tok_;
close_code code_;
bool did_read_ = false;
bool cont_;
bool cont_ = false;
public:
read_some_op(read_some_op&&) = default;
@ -138,14 +138,19 @@ operator()(
BOOST_ASIO_CORO_REENTER(*this)
{
// Maybe suspend
do_maybe_suspend:
if(! ws_.rd_block_)
{
// Acquire the read block
ws_.rd_block_ = tok_;
// Make sure the stream is open
if(ws_.check_fail(ec))
// Make sure the stream is not closed
if( ws_.status_ == status::closed ||
ws_.status_ == status::failed)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
}
}
else
{
@ -164,10 +169,18 @@ operator()(
ws_.get_io_service().post(std::move(*this));
BOOST_ASSERT(ws_.rd_block_ == tok_);
// Make sure the stream is open
if(ws_.check_fail(ec))
goto upcall;
// The only way to get read blocked is if
// a `close_op` wrote a close frame
BOOST_ASSERT(ws_.wr_close_);
BOOST_ASSERT(ws_.status_ != status::open);
ec = boost::asio::error::operation_aborted;
goto upcall;
}
// if status_ == status::closing, we want to suspend
// the read operation until the close completes,
// then finish the read with operation_aborted.
loop:
BOOST_ASSERT(ws_.rd_block_ == tok_);
// See if we need to read a frame header. This
@ -195,19 +208,21 @@ operator()(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
BOOST_ASSERT(ws_.rd_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
ws_.rd_buf_.commit(bytes_transferred);
// Allow a close operation to
// drain the connection if necessary.
// Allow a close operation
// to acquire the read block
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
if( ws_.paused_r_close_.maybe_invoke())
{
// Suspend
BOOST_ASSERT(ws_.rd_block_);
goto do_suspend;
}
// Acquire read block
ws_.rd_block_ = tok_;
}
// Immediately apply the mask to the portion
@ -236,7 +251,7 @@ operator()(
detail::read_ping(payload, b);
ws_.rd_buf_.consume(len);
// Ignore ping when closing
if(ws_.wr_close_)
if(ws_.status_ == status::closing)
goto loop;
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::ping, payload);
@ -245,6 +260,15 @@ operator()(
flat_static_buffer_base>(ws_.rd_fb_,
detail::opcode::pong, payload);
}
//BOOST_ASSERT(! ws_.paused_r_close_);
// Allow a close operation
// to acquire the read block
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
ws_.paused_r_close_.maybe_invoke();
// Maybe suspend
if(! ws_.wr_block_)
{
@ -268,18 +292,8 @@ operator()(
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(ws_.check_fail(ec))
if(! ws_.check_open(ec))
goto upcall;
// Ignore ping when closing
if(ws_.wr_close_)
{
ws_.wr_block_.reset();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
goto loop;
}
}
// Send pong
@ -288,13 +302,13 @@ operator()(
boost::asio::async_write(ws_.stream_,
ws_.rd_fb_.data(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
ws_.wr_block_.reset();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
goto loop;
goto do_maybe_suspend;
}
// Handle pong frame
if(ws_.rd_fh_.op == detail::opcode::pong)
@ -335,17 +349,19 @@ operator()(
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::close,
ws_.cr_.reason);
if(! ws_.wr_close_)
// See if we are already closing
if(ws_.status_ == status::closing)
{
// _Start the WebSocket Closing Handshake_
code_ = cr.code == close_code::none ?
close_code::normal :
static_cast<close_code>(cr.code);
// _Close the WebSocket Connection_
BOOST_ASSERT(ws_.wr_close_);
code_ = close_code::none;
ev_ = error::closed;
goto close;
}
// _Close the WebSocket Connection_
code_ = close_code::none;
// _Start the WebSocket Closing Handshake_
code_ = cr.code == close_code::none ?
close_code::normal :
static_cast<close_code>(cr.code);
ev_ = error::closed;
goto close;
}
@ -372,7 +388,7 @@ operator()(
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
ws_.rd_buf_.commit(bytes_transferred);
if(ws_.rd_fh_.mask)
@ -413,7 +429,7 @@ operator()(
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(buffer_prefix(
clamp(ws_.rd_remain_), cb_), std::move(*this));
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
@ -456,7 +472,7 @@ operator()(
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
ws_.rd_buf_.commit(bytes_transferred);
@ -498,8 +514,7 @@ operator()(
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
if(ws_.check_fail(ec))
goto upcall;
BOOST_ASSERT(! ec);
// VFALCO See:
// https://github.com/madler/zlib/issues/280
BOOST_ASSERT(zs.total_out == 0);
@ -521,7 +536,7 @@ operator()(
}
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
@ -560,7 +575,7 @@ operator()(
ws_.wr_block_ = tok_;
// Make sure the stream is open
BOOST_ASSERT(ws_.open_);
BOOST_ASSERT(ws_.status_ == status::open);
}
else
{
@ -579,10 +594,13 @@ operator()(
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(ws_.check_fail(ec))
if(! ws_.check_open(ec))
goto upcall;
}
// Set the status
ws_.status_ = status::closing;
if(! ws_.wr_close_)
{
ws_.wr_close_ = true;
@ -600,9 +618,7 @@ operator()(
ws_.stream_, ws_.rd_fb_.data(),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
}
@ -621,11 +637,15 @@ operator()(
}
if(! ec)
ec = ev_;
ws_.open_ = false;
if(ec && ec != error::closed)
ws_.status_ = status::failed;
else
ws_.status_ = status::closed;
ws_.close();
upcall:
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
if(ws_.rd_block_ == tok_)
ws_.rd_block_.reset();
ws_.paused_r_close_.maybe_invoke();
if(ws_.wr_block_ == tok_)
{
@ -755,7 +775,7 @@ operator()(
BOOST_ASIO_CORO_YIELD
read_some_op<buffers_type, read_op>{
std::move(*this), ws_, *mb}(
{}, 0, true);
{}, 0, false);
if(ec)
break;
b_.commit(bytes_transferred);
@ -956,7 +976,7 @@ read_some(
std::size_t bytes_written = 0;
ec.assign(0, ec.category());
// Make sure the stream is open
if(check_fail(ec))
if(! check_open(ec))
return 0;
loop:
// See if we need to read a frame header. This
@ -979,7 +999,7 @@ loop:
rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())),
ec);
if(check_fail(ec))
if(! check_ok(ec))
return bytes_written;
rd_buf_.commit(bytes_transferred);
}
@ -1018,7 +1038,7 @@ loop:
write_ping<flat_static_buffer_base>(fb,
detail::opcode::pong, payload);
boost::asio::write(stream_, fb.data(), ec);
if(check_fail(ec))
if(! check_ok(ec))
return bytes_written;
goto loop;
}
@ -1082,7 +1102,7 @@ loop:
rd_buf_.commit(stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec));
if(check_fail(ec))
if(! check_ok(ec))
return bytes_written;
if(rd_fh_.mask)
detail::mask_inplace(
@ -1125,7 +1145,7 @@ loop:
auto const bytes_transferred =
stream_.read_some(buffer_prefix(
clamp(rd_remain_), buffers), ec);
if(check_fail(ec))
if(! check_ok(ec))
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffer_prefix(
@ -1187,7 +1207,7 @@ loop:
rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())),
ec);
if(check_fail(ec))
if(! check_ok(ec))
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
rd_buf_.commit(bytes_transferred);
@ -1217,8 +1237,6 @@ loop:
zs.avail_in = sizeof(empty_block);
pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(! ec);
if(check_fail(ec))
return bytes_written;
// VFALCO See:
// https://github.com/madler/zlib/issues/280
BOOST_ASSERT(zs.total_out == 0);
@ -1240,7 +1258,7 @@ loop:
}
pmd_->zi.write(zs, zlib::Flush::sync, ec);
BOOST_ASSERT(ec != zlib::error::end_of_stream);
if(check_fail(ec))
if(! check_ok(ec))
return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_size_, zs.total_out, rd_msg_max_))

View File

@ -30,8 +30,6 @@ is_upgrade(http::header<true,
return false;
if(! http::token_list{req["Upgrade"]}.exists("websocket"))
return false;
if(! req.count(http::field::sec_websocket_version))
return false;
return true;
}

View File

@ -134,7 +134,7 @@ open(role_type role)
{
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
open_ = true;
status_ = status::open;
rd_remain_ = 0;
rd_cont_ = false;
rd_done_ = true;
@ -193,8 +193,7 @@ void
stream<NextLayer>::
reset()
{
BOOST_ASSERT(! open_);
open_ = false; // VFALCO is this needed?
BOOST_ASSERT(status_ != status::open);
rd_remain_ = 0;
rd_cont_ = false;
rd_done_ = true;
@ -593,8 +592,6 @@ build_response(http::request<Body,
return err("Missing Host");
if(! req.count(http::field::sec_websocket_key))
return err("Missing Sec-WebSocket-Key");
if(! http::token_list{req[http::field::upgrade]}.exists("websocket"))
return err("Missing websocket Upgrade token");
auto const key = req[http::field::sec_websocket_key];
if(key.size() > detail::sec_ws_key_type::max_size_n)
return err("Invalid Sec-WebSocket-Key");
@ -684,6 +681,7 @@ do_fail(
error_code& ec) // set to the error, else set to ev
{
BOOST_ASSERT(ev);
status_ = status::closing;
if(code != close_code::none && ! wr_close_)
{
wr_close_ = true;
@ -691,8 +689,7 @@ do_fail(
write_close<
flat_static_buffer_base>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
open_ = ! ec;
if(! open_)
if(! check_ok(ec))
return;
}
using beast::websocket::teardown;
@ -703,11 +700,13 @@ do_fail(
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
open_ = ! ec;
if(! open_)
return;
ec = ev;
open_ = false;
if(! ec)
ec = ev;
if(ec && ec != error::closed)
status_ = status::failed;
else
status_ = status::closed;
close();
}
} // websocket

View File

@ -49,7 +49,7 @@ class stream<NextLayer>::write_some_op
int how_;
bool fin_;
bool more_;
bool cont_;
bool cont_ = false;
public:
write_some_op(write_some_op&&) = default;
@ -206,7 +206,7 @@ operator()(
ws_.wr_block_ = tok_;
// Make sure the stream is open
if(ws_.check_fail(ec))
if(! ws_.check_open(ec))
goto upcall;
}
else
@ -227,7 +227,7 @@ operator()(
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(ws_.check_fail(ec))
if(! ws_.check_open(ec))
goto upcall;
}
@ -242,13 +242,11 @@ operator()(
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer_cat(ws_.wr_fb_.data(), cb_),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
goto upcall;
}
@ -267,15 +265,13 @@ operator()(
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(
ws_.stream_, buffer_cat(
ws_.wr_fb_.data(), buffer_prefix(
clamp(fh_.len), cb_)),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
if(remain_ == 0)
break;
@ -317,14 +313,12 @@ operator()(
remain_ -= n;
ws_.wr_cont_ = ! fin_;
// Send frame header and partial payload
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(
ws_.stream_, buffer_cat(ws_.wr_fb_.data(),
buffer(ws_.wr_buf_.get(), n)),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
while(remain_ > 0)
{
@ -336,13 +330,11 @@ operator()(
ws_.wr_buf_.get(), n), key_);
remain_ -= n;
// Send partial payload
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer(ws_.wr_buf_.get(), n),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
}
goto upcall;
@ -369,14 +361,12 @@ operator()(
ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer_cat(ws_.wr_fb_.data(),
buffer(ws_.wr_buf_.get(), n)),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
if(remain_ == 0)
break;
@ -408,7 +398,7 @@ operator()(
ws_.wr_buf_size_);
more_ = detail::deflate(
ws_.pmd_->zo, b, cb_, fin_, ec);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
n = buffer_size(b);
if(n == 0)
@ -434,13 +424,11 @@ operator()(
flat_static_buffer_base>(ws_.wr_fb_, fh_);
ws_.wr_cont_ = ! fin_;
// Send frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
buffer_cat(ws_.wr_fb_.data(),
mutable_buffers_1{b}), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ws_.check_fail(ec))
if(! ws_.check_ok(ec))
goto upcall;
if(more_)
{
@ -460,7 +448,6 @@ operator()(
}
else
{
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(fh_.fin && (
(ws_.role_ == role_type::client &&
ws_.pmd_config_.client_no_context_takeover) ||
@ -524,7 +511,7 @@ write_some(bool fin,
using boost::asio::buffer_size;
ec.assign(0, ec.category());
// Make sure the stream is open
if(check_fail(ec))
if(! check_open(ec))
return;
detail::frame_header fh;
if(! wr_cont_)
@ -552,8 +539,7 @@ write_some(bool fin,
wr_buf_.get(), wr_buf_size_);
auto const more = detail::deflate(
pmd_->zo, b, cb, fin, ec);
open_ = ! ec;
if(! open_)
if(! check_ok(ec))
return;
auto const n = buffer_size(b);
if(n == 0)
@ -581,7 +567,7 @@ write_some(bool fin,
wr_cont_ = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
if(! more)
break;
@ -608,7 +594,7 @@ write_some(bool fin,
wr_cont_ = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffers), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
}
else
@ -630,7 +616,7 @@ write_some(bool fin,
boost::asio::write(stream_,
buffer_cat(fh_buf.data(),
buffer_prefix(n, cb)), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
if(remain == 0)
break;
@ -662,7 +648,7 @@ write_some(bool fin,
wr_cont_ = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
}
while(remain > 0)
@ -674,7 +660,7 @@ write_some(bool fin,
remain -= n;
detail::mask_inplace(b, key);
boost::asio::write(stream_, b, ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
}
}
@ -702,7 +688,7 @@ write_some(bool fin,
flat_static_buffer_base>(fh_buf, fh);
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
if(check_fail(ec))
if(! check_ok(ec))
return;
if(remain == 0)
break;

View File

@ -132,6 +132,9 @@ class stream
struct op {};
using control_cb_type =
std::function<void(frame_type, string_view)>;
// tokens are used to order reads and writes
class token
{
@ -157,8 +160,13 @@ class stream
zlib::inflate_stream zi;
};
using control_cb_type =
std::function<void(frame_type, string_view)>;
enum class status
{
open,
closing,
closed,
failed
};
NextLayer stream_; // the wrapped stream
close_reason cr_; // set from received close frame
@ -190,8 +198,8 @@ class stream
token tok_; // used to order asynchronous ops
role_type role_ // server or client
= role_type::client;
bool open_ // `true` if connected
= false;
status status_
= status::closed;
token wr_block_; // op currenly writing
bool wr_close_ // did we write a close frame?
@ -366,7 +374,7 @@ public:
bool
is_open() const
{
return open_;
return status_ == status::open;
}
/** Returns `true` if the latest message data indicates binary.
@ -3394,19 +3402,27 @@ private:
void begin_msg();
bool
check_fail(error_code& ec)
check_open(error_code& ec)
{
if(! open_)
if(status_ != status::open)
{
ec = boost::asio::error::operation_aborted;
return true;
return false;
}
ec.assign(0, ec.category());
return true;
}
bool
check_ok(error_code& ec)
{
if(ec)
{
open_ = false;
return true;
if(status_ != status::closed)
status_ = status::failed;
return false;
}
return false;
return true;
}
template<class DynamicBuffer>

View File

@ -560,6 +560,35 @@ public:
"Sec-WebSocket-Version: 13\r\n"
"\r\n"
);
// oversize key
check(error::handshake_failed,
"GET / HTTP/1.1\r\n"
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQdGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n"
);
// bad version
check(error::handshake_failed,
"GET / HTTP/1.1\r\n"
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 12\r\n"
"\r\n"
);
// missing version
check(error::handshake_failed,
"GET / HTTP/1.1\r\n"
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"\r\n"
);
// valid request
check({},
"GET / HTTP/1.1\r\n"

View File

@ -39,6 +39,14 @@ public:
w.close(ws, close_code::going_away);
});
// close with code and reason
doTest(pmd, [&](ws_type& ws)
{
w.close(ws, {
close_code::going_away,
"going away"});
});
// already closed
{
echo_server es{log};
@ -125,6 +133,17 @@ public:
}
}
// drain masked close frame
{
echo_server es{log, kind::async_client};
stream<test::stream> ws{ios_};
ws.next_layer().connect(es.stream());
ws.set_option(pmd);
es.async_handshake();
ws.accept();
w.close(ws, {});
}
// close with incomplete read message
doTest(pmd, [&](ws_type& ws)
{
@ -147,8 +166,10 @@ public:
}
void
testCloseSuspend()
testSuspend()
{
using boost::asio::buffer;
// suspend on ping
doFailLoop([&](test::fail_counter& fc)
{
@ -270,10 +291,10 @@ public:
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::failed)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
while(! ws.wr_block_)
{
@ -285,10 +306,10 @@ public:
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(count == 0);
ios.run();
@ -311,10 +332,10 @@ public:
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::closed)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
while(! ws.wr_block_)
{
@ -326,16 +347,256 @@ public:
ws.async_close({},
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 2);
});
// teardown on received close
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add a close frame to the input
ws.next_layer().append(string_view{
"\x88\x00", 2});
std::size_t count = 0;
std::string const s = "Hello, world!";
ws.async_write(buffer(s),
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 3);
});
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 3);
});
// check for deadlock
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// add a ping frame to the input
ws.next_layer().append(string_view{
"\x89\x00", 2});
std::size_t count = 0;
multi_buffer b;
std::string const s = "Hello, world!";
ws.async_write(buffer(s),
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 3);
});
BEAST_EXPECT(ws.rd_block_);
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(ws.is_open());
BEAST_EXPECT(ws.wr_block_);
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 3);
});
// Four-way: close, read, write, ping
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
std::string const s = "Hello, world!";
multi_buffer b;
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
++count;
});
ws.async_write(buffer(s),
[&](error_code ec)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
++count;
});
ws.async_ping({},
[&](error_code ec)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
++count;
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 4);
});
// Four-way: read, write, ping, close
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
std::string const s = "Hello, world!";
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec && ec != boost::asio::error::operation_aborted)
{
BEAST_EXPECTS(ec, ec.message());
BOOST_THROW_EXCEPTION(
system_error{ec});
}
if(! ec)
BEAST_EXPECT(to_string(b.data()) == s);
++count;
});
ws.async_write(buffer(s),
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
ws.async_ping({},
[&](error_code ec)
{
if(ec != boost::asio::error::operation_aborted)
{
BEAST_EXPECTS(ec, ec.message());
BOOST_THROW_EXCEPTION(
system_error{ec});
}
++count;
});
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 4);
});
// Four-way: ping, read, write, close
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
std::string const s = "Hello, world!";
multi_buffer b;
ws.async_ping({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
++count;
});
ws.async_write(buffer(s),
[&](error_code ec)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
++count;
});
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(count == 0);
ios.run();
BEAST_EXPECT(count == 4);
});
}
void
@ -357,7 +618,7 @@ public:
run() override
{
testClose();
testCloseSuspend();
testSuspend();
testContHook();
}
};

View File

@ -7,12 +7,8 @@
// Official repository: https://github.com/boostorg/beast
//
#include <boost/beast/websocket/stream.hpp>
#include <boost/beast/websocket/detail/frame.hpp>
#include <boost/beast/unit_test/suite.hpp>
#include <boost/beast/test/yield_to.hpp>
#include <initializer_list>
#include <climits>
namespace boost {
namespace beast {
@ -21,7 +17,6 @@ namespace detail {
class frame_test
: public beast::unit_test::suite
, public test::enable_yield_to
{
public:
void testCloseCodes()
@ -47,7 +42,7 @@ public:
test_fh()
{
op = detail::opcode::text;
fin = false;
fin = true;
mask = false;
rsv1 = false;
rsv2 = false;
@ -57,8 +52,20 @@ public:
}
};
void
testWriteFrame()
{
test_fh fh;
fh.rsv2 = true;
fh.rsv3 = true;
fh.len = 65536;
frame_buffer fb;
write(fb, fh);
}
void run() override
{
testWriteFrame();
testCloseCodes();
}
};

View File

@ -214,10 +214,265 @@ public:
);
}
// Compression Extensions for WebSocket
//
// https://tools.ietf.org/html/rfc7692
//
void
testExtRead()
{
detail::pmd_offer po;
auto const accept =
[&](string_view s)
{
http::fields f;
f.set(http::field::sec_websocket_extensions, s);
po = detail::pmd_offer();
detail::pmd_read(po, f);
BEAST_EXPECT(po.accept);
};
auto const reject =
[&](string_view s)
{
http::fields f;
f.set(http::field::sec_websocket_extensions, s);
po = detail::pmd_offer();
detail::pmd_read(po, f);
BEAST_EXPECT(! po.accept);
};
// duplicate parameters
reject("permessage-deflate; server_max_window_bits=8; server_max_window_bits=8");
// missing value
reject("permessage-deflate; server_max_window_bits");
reject("permessage-deflate; server_max_window_bits=");
// invalid value
reject("permessage-deflate; server_max_window_bits=-1");
reject("permessage-deflate; server_max_window_bits=7");
reject("permessage-deflate; server_max_window_bits=16");
reject("permessage-deflate; server_max_window_bits=999999999999999999999999");
reject("permessage-deflate; server_max_window_bits=9a");
// duplicate parameters
reject("permessage-deflate; client_max_window_bits=8; client_max_window_bits=8");
// optional value excluded
accept("permessage-deflate; client_max_window_bits");
BEAST_EXPECT(po.client_max_window_bits == -1);
accept("permessage-deflate; client_max_window_bits=");
BEAST_EXPECT(po.client_max_window_bits == -1);
// invalid value
reject("permessage-deflate; client_max_window_bits=-1");
reject("permessage-deflate; client_max_window_bits=7");
reject("permessage-deflate; client_max_window_bits=16");
reject("permessage-deflate; client_max_window_bits=999999999999999999999999");
// duplicate parameters
reject("permessage-deflate; server_no_context_takeover; server_no_context_takeover");
// valueless parameter
accept("permessage-deflate; server_no_context_takeover");
BEAST_EXPECT(po.server_no_context_takeover);
accept("permessage-deflate; server_no_context_takeover=");
BEAST_EXPECT(po.server_no_context_takeover);
// disallowed value
reject("permessage-deflate; server_no_context_takeover=-1");
reject("permessage-deflate; server_no_context_takeover=x");
reject("permessage-deflate; server_no_context_takeover=\"yz\"");
reject("permessage-deflate; server_no_context_takeover=999999999999999999999999");
// duplicate parameters
reject("permessage-deflate; client_no_context_takeover; client_no_context_takeover");
// valueless parameter
accept("permessage-deflate; client_no_context_takeover");
BEAST_EXPECT(po.client_no_context_takeover);
accept("permessage-deflate; client_no_context_takeover=");
BEAST_EXPECT(po.client_no_context_takeover);
// disallowed value
reject("permessage-deflate; client_no_context_takeover=-1");
reject("permessage-deflate; client_no_context_takeover=x");
reject("permessage-deflate; client_no_context_takeover=\"yz\"");
reject("permessage-deflate; client_no_context_takeover=999999999999999999999999");
// unknown extension parameter
reject("permessage-deflate; unknown");
reject("permessage-deflate; unknown=");
reject("permessage-deflate; unknown=1");
reject("permessage-deflate; unknown=x");
reject("permessage-deflate; unknown=\"xy\"");
}
void
testExtWrite()
{
detail::pmd_offer po;
auto const check =
[&](string_view match)
{
http::fields f;
detail::pmd_write(f, po);
BEAST_EXPECT(
f[http::field::sec_websocket_extensions]
== match);
};
po.accept = true;
po.server_max_window_bits = 0;
po.client_max_window_bits = 0;
po.server_no_context_takeover = false;
po.client_no_context_takeover = false;
check("permessage-deflate");
po.server_max_window_bits = 10;
check("permessage-deflate; server_max_window_bits=10");
po.server_max_window_bits = -1;
check("permessage-deflate; server_max_window_bits");
po.server_max_window_bits = 0;
po.client_max_window_bits = 10;
check("permessage-deflate; client_max_window_bits=10");
po.client_max_window_bits = -1;
check("permessage-deflate; client_max_window_bits");
po.client_max_window_bits = 0;
po.server_no_context_takeover = true;
check("permessage-deflate; server_no_context_takeover");
po.server_no_context_takeover = false;
po.client_no_context_takeover = true;
check("permessage-deflate; client_no_context_takeover");
}
void
testExtNegotiate()
{
permessage_deflate pmd;
auto const reject =
[&](
string_view offer)
{
detail::pmd_offer po;
{
http::fields f;
f.set(http::field::sec_websocket_extensions, offer);
detail::pmd_read(po, f);
}
http::fields f;
detail::pmd_offer config;
detail::pmd_negotiate(f, config, po, pmd);
BEAST_EXPECT(! config.accept);
};
auto const accept =
[&](
string_view offer,
string_view result)
{
detail::pmd_offer po;
{
http::fields f;
f.set(http::field::sec_websocket_extensions, offer);
detail::pmd_read(po, f);
}
http::fields f;
detail::pmd_offer config;
detail::pmd_negotiate(f, config, po, pmd);
auto const got =
f[http::field::sec_websocket_extensions];
BEAST_EXPECTS(got == result, got);
{
detail::pmd_offer poc;
detail::pmd_read(poc, f);
detail::pmd_normalize(poc);
BEAST_EXPECT(poc.accept);
}
BEAST_EXPECT(config.server_max_window_bits != 0);
BEAST_EXPECT(config.client_max_window_bits != 0);
};
pmd.server_enable = true;
pmd.server_max_window_bits = 15;
pmd.client_max_window_bits = 15;
pmd.server_no_context_takeover = false;
pmd.client_no_context_takeover = false;
// default
accept(
"permessage-deflate",
"permessage-deflate");
// non-default server_max_window_bits
accept(
"permessage-deflate; server_max_window_bits=14",
"permessage-deflate; server_max_window_bits=14");
// explicit default server_max_window_bits
accept(
"permessage-deflate; server_max_window_bits=15",
"permessage-deflate");
// minimum window size of 8 bits (a zlib bug)
accept(
"permessage-deflate; server_max_window_bits=8",
"permessage-deflate; server_max_window_bits=9");
// non-default server_max_window_bits setting
pmd.server_max_window_bits = 10;
accept(
"permessage-deflate",
"permessage-deflate; server_max_window_bits=10");
// clamped server_max_window_bits setting #1
pmd.server_max_window_bits = 10;
accept(
"permessage-deflate; server_max_window_bits=14",
"permessage-deflate; server_max_window_bits=10");
// clamped server_max_window_bits setting #2
pmd.server_max_window_bits=8;
accept(
"permessage-deflate; server_max_window_bits=14",
"permessage-deflate; server_max_window_bits=9");
pmd.server_max_window_bits = 15;
// present with no value
accept(
"permessage-deflate; client_max_window_bits",
"permessage-deflate");
// present with no value, non-default setting
pmd.client_max_window_bits = 10;
accept(
"permessage-deflate; client_max_window_bits",
"permessage-deflate; client_max_window_bits=10");
// absent, non-default setting
pmd.client_max_window_bits = 10;
reject(
"permessage-deflate");
}
void
run() override
{
testHandshake();
testExtRead();
testExtWrite();
testExtNegotiate();
}
};

View File

@ -89,40 +89,10 @@ public:
{
doTestPing(AsyncClient{yield});
});
// suspend on write
{
echo_server es{log};
error_code ec;
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/", ec);
BEAST_EXPECTS(! ec, ec.message());
std::size_t count = 0;
ws.async_write(sbuf("*"),
[&](error_code ec)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
});
BEAST_EXPECT(ws.wr_block_);
ws.async_ping("",
[&](error_code ec)
{
++count;
BEAST_EXPECTS(
ec == boost::asio::error::operation_aborted,
ec.message());
});
ws.async_close({}, [&](error_code){});
ios.run();
BEAST_EXPECT(count == 2);
}
}
void
testPingSuspend()
testSuspend()
{
// suspend on write
doFailLoop([&](test::fail_counter& fc)
@ -352,6 +322,45 @@ public:
BEAST_EXPECT(count == 2);
});
// don't ping on close
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
error_code ec;
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
ws.async_write(sbuf("*"),
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
BEAST_EXPECT(ws.wr_block_);
ws.async_ping("",
[&](error_code ec)
{
++count;
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
ws.async_close({},
[&](error_code)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
ios.run();
BEAST_EXPECT(count == 3);
});
{
echo_server es{log, kind::async};
boost::asio::io_service ios;
@ -437,7 +446,7 @@ public:
run() override
{
testPing();
testPingSuspend();
testSuspend();
testContHook();
}
};

View File

@ -12,6 +12,8 @@
#include "test.hpp"
#include <boost/asio/write.hpp>
namespace boost {
namespace beast {
namespace websocket {
@ -104,7 +106,7 @@ public:
});
// two part message
// this triggers "fill the read buffer first"
// triggers "fill the read buffer first"
doTest(pmd, [&](ws_type& ws)
{
w.write_raw(ws, sbuf(
@ -202,6 +204,58 @@ public:
BEAST_EXPECT(to_string(b.data()) == "Hello, World!");
});
// masked message, big
doStreamLoop([&](test::stream& ts)
{
echo_server es{log, kind::async_client};
ws_type ws{ts};
ws.next_layer().connect(es.stream());
ws.set_option(pmd);
es.async_handshake();
try
{
w.accept(ws);
std::string const s(2000, '*');
ws.auto_fragment(false);
ws.binary(false);
w.write(ws, buffer(s));
multi_buffer b;
w.read(ws, b);
BEAST_EXPECT(ws.got_text());
BEAST_EXPECT(to_string(b.data()) == s);
ws.next_layer().close();
}
catch(...)
{
ts.close();
throw;
}
});
// close
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log, kind::async};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
// Cause close to be received
es.async_close();
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec != error::closed)
BOOST_THROW_EXCEPTION(
system_error{ec});
});
ios.run();
BEAST_EXPECT(count == 1);
});
// already closed
doTest(pmd, [&](ws_type& ws)
{
@ -249,7 +303,7 @@ public:
doReadTest(w, ws, close_code::protocol_error);
});
// receive bad close
// bad close
doTest(pmd, [&](ws_type& ws)
{
put(ws.next_layer().buffer(), cbuf(
@ -257,15 +311,6 @@ public:
doFailTest(w, ws, error::failed);
});
// expected cont
doTest(pmd, [&](ws_type& ws)
{
w.write_some(ws, false, boost::asio::null_buffers{});
w.write_raw(ws, cbuf(
0x81, 0x80, 0xff, 0xff, 0xff, 0xff));
doReadTest(w, ws, close_code::protocol_error);
});
// message size above 2^64
doTest(pmd, [&](ws_type& ws)
{
@ -284,14 +329,6 @@ public:
doFailTest(w, ws, error::failed);
});
// unexpected cont
doTest(pmd, [&](ws_type& ws)
{
w.write_raw(ws, cbuf(
0x80, 0x80, 0xff, 0xff, 0xff, 0xff));
doReadTest(w, ws, close_code::protocol_error);
});
// bad utf8
doTest(pmd, [&](ws_type& ws)
{
@ -313,10 +350,23 @@ public:
doTest(pmd, [&](ws_type& ws)
{
std::string const s =
random_string() +
"Hello, world!" "\xc0";
w.write(ws, buffer(s));
doReadTest(w, ws, close_code::bad_payload);
"\x81\x7e\x0f\xa1" +
std::string(4000, '*') + "\xc0";
ws.next_layer().append(s);
multi_buffer b;
try
{
do
{
b.commit(w.read_some(ws, b.prepare(4000)));
}
while(! ws.is_message_done());
}
catch(system_error const& se)
{
if(se.code() != error::failed)
throw;
}
});
// close frames
@ -441,6 +491,34 @@ public:
BEAST_EXPECT(to_string(b.data()) == s);
});
// masked message
doStreamLoop([&](test::stream& ts)
{
echo_server es{log, kind::async_client};
ws_type ws{ts};
ws.next_layer().connect(es.stream());
ws.set_option(pmd);
es.async_handshake();
try
{
w.accept(ws);
std::string const s = "Hello, world!";
ws.auto_fragment(false);
ws.binary(false);
w.write(ws, buffer(s));
multi_buffer b;
w.read(ws, b);
BEAST_EXPECT(ws.got_text());
BEAST_EXPECT(to_string(b.data()) == s);
ws.next_layer().close();
}
catch(...)
{
ts.close();
throw;
}
});
// empty message
doTest(pmd, [&](ws_type& ws)
{
@ -571,11 +649,77 @@ public:
}
void
testReadSuspend()
testSuspend()
{
using boost::asio::buffer;
#if 1
// suspend on read block
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
while(! ws.rd_block_)
ios.run_one();
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
ios.run();
BEAST_EXPECT(count == 2);
});
#endif
// suspend on write
// suspend on release read block
doFailLoop([&](test::fail_counter& fc)
{
//log << "fc.count()==" << fc.count() << std::endl;
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BOOST_ASSERT(ws.rd_block_);
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
ios.run();
BEAST_EXPECT(count == 2);
});
#if 1
// suspend on write pong
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
@ -592,25 +736,238 @@ public:
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(to_string(b.data()) == s);
++count;
});
BEAST_EXPECT(ws.rd_block_);
ws.async_write(buffer(s),
[&](error_code ec)
{
++count;
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
++count;
});
BEAST_EXPECT(ws.wr_block_);
ios.run();
BEAST_EXPECT(count == 2);
});
// Ignore ping when closing
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
// insert fragmented message with
// a ping in between the frames.
ws.next_layer().append(string_view(
"\x01\x01*"
"\x89\x00"
"\x80\x01*", 8));
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(to_string(b.data()) == "**");
BEAST_EXPECT(++count == 1);
b.consume(b.size());
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 3);
});
});
BEAST_EXPECT(ws.rd_block_);
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(ws.wr_block_);
ios.run();
BEAST_EXPECT(count == 3);
});
// See if we are already closing
doFailLoop([&](test::fail_counter& fc)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios, fc};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
std::size_t count = 0;
// insert fragmented message with
// a close in between the frames.
ws.next_layer().append(string_view(
"\x01\x01*"
"\x88\x00"
"\x80\x01*", 8));
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
if(ec != boost::asio::error::operation_aborted)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 2);
});
BEAST_EXPECT(ws.rd_block_);
ws.async_close({},
[&](error_code ec)
{
if(ec)
BOOST_THROW_EXCEPTION(
system_error{ec});
BEAST_EXPECT(++count == 1);
});
BEAST_EXPECT(ws.wr_block_);
ios.run();
BEAST_EXPECT(count == 2);
});
#endif
}
void
testParseFrame()
{
auto const bad =
[&](string_view s)
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
ws.next_layer().append(s);
error_code ec;
multi_buffer b;
ws.read(b, ec);
BEAST_EXPECT(ec);
};
// chopped frame header
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
ws.next_layer().append(
"\x81\x7e\x01");
std::size_t count = 0;
std::string const s(257, '*');
error_code ec;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
BEAST_EXPECT(to_string(b.data()) == s);
});
ios.run_one();
es.stream().write_some(
boost::asio::buffer("\x01" + s));
ios.run();
BEAST_EXPECT(count == 1);
}
// new data frame when continuation expected
bad("\x01\x01*" "\x81\x01*");
// reserved bits not cleared
bad("\xb1\x01*");
bad("\xc1\x01*");
bad("\xd1\x01*");
// continuation without an active message
bad("\x80\x01*");
// reserved bits not cleared (cont)
bad("\x01\x01*" "\xb0\x01*");
bad("\x01\x01*" "\xc0\x01*");
bad("\x01\x01*" "\xd0\x01*");
// reserved opcode
bad("\x83\x01*");
// fragmented control message
bad("\x09\x01*");
// invalid length for control message
bad("\x89\x7e\x01\x01");
// reserved bits not cleared (control)
bad("\xb9\x01*");
bad("\xc9\x01*");
bad("\xd9\x01*");
// unmasked frame from client
{
echo_server es{log, kind::async_client};
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
es.async_handshake();
ws.accept();
ws.next_layer().append(
"\x81\x01*");
error_code ec;
multi_buffer b;
ws.read(b, ec);
BEAST_EXPECT(ec);
}
// masked frame from server
bad("\x81\x80\xff\xff\xff\xff");
// chopped control frame payload
{
echo_server es{log};
boost::asio::io_service ios;
stream<test::stream> ws{ios};
ws.next_layer().connect(es.stream());
ws.handshake("localhost", "/");
ws.next_layer().append(
"\x89\x02*");
std::size_t count = 0;
error_code ec;
multi_buffer b;
ws.async_read(b,
[&](error_code ec, std::size_t)
{
++count;
BEAST_EXPECTS(! ec, ec.message());
BEAST_EXPECT(to_string(b.data()) == "**");
});
ios.run_one();
es.stream().write_some(
boost::asio::buffer(
"*" "\x81\x02**"));
ios.run();
BEAST_EXPECT(count == 1);
}
// length not canonical
bad(string_view("\x81\x7e\x00\x7d", 4));
bad(string_view("\x81\x7f\x00\x00\x00\x00\x00\x00\xff\xff", 10));
}
void
@ -631,6 +988,7 @@ public:
buf, sizeof(buf)}};
using boost::asio::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
pass();
}
{
struct handler
@ -645,6 +1003,7 @@ public:
handler{}, ws, b, 32, true};
using boost::asio::asio_handler_is_continuation;
asio_handler_is_continuation(&op);
pass();
}
}
@ -652,7 +1011,8 @@ public:
run() override
{
testRead();
testReadSuspend();
testSuspend();
testParseFrame();
testContHook();
}
};

View File

@ -34,8 +34,6 @@ public:
req.insert("Connection", "upgrade");
BEAST_EXPECT(! is_upgrade(req));
req.insert("Upgrade", "websocket");
BEAST_EXPECT(! is_upgrade(req));
req.insert("Sec-WebSocket-Version", "13");
BEAST_EXPECT(is_upgrade(req));
}

View File

@ -22,7 +22,7 @@ public:
void
testOptions()
{
stream<test::stream> ws(ios_);
stream<test::stream> ws{ios_};
ws.auto_fragment(true);
ws.write_buffer_size(2048);
ws.binary(false);
@ -36,48 +36,71 @@ public:
{
pass();
}
auto const bad =
[&](permessage_deflate const& pmd)
{
stream<test::stream> ws{ios_};
try
{
ws.set_option(pmd);
fail("", __FILE__, __LINE__);
}
catch(std::exception const&)
{
pass();
}
};
{
permessage_deflate pmd;
pmd.server_max_window_bits = 16;
bad(pmd);
}
{
permessage_deflate pmd;
pmd.server_max_window_bits = 8;
bad(pmd);
}
{
permessage_deflate pmd;
pmd.client_max_window_bits = 16;
bad(pmd);
}
{
permessage_deflate pmd;
pmd.client_max_window_bits = 8;
bad(pmd);
}
{
permessage_deflate pmd;
pmd.compLevel = -1;
bad(pmd);
}
{
permessage_deflate pmd;
pmd.compLevel = 10;
bad(pmd);
}
{
permessage_deflate pmd;
pmd.memLevel = 0;
bad(pmd);
}
{
permessage_deflate pmd;
pmd.memLevel = 10;
bad(pmd);
}
}
//--------------------------------------------------------------------------
template<class Wrap>
void
doTestStream(Wrap const& w,
permessage_deflate const& pmd)
{
using boost::asio::buffer;
// send pong
doTest(pmd, [&](ws_type& ws)
{
w.pong(ws, "");
});
// send auto fragmented message
doTest(pmd, [&](ws_type& ws)
{
ws.auto_fragment(true);
ws.write_buffer_size(8);
w.write(ws, sbuf("Now is the time for all good men"));
multi_buffer b;
w.read(ws, b);
BEAST_EXPECT(to_string(b.data()) == "Now is the time for all good men");
});
// send message with write buffer limit
doTest(pmd, [&](ws_type& ws)
{
std::string s(2000, '*');
ws.write_buffer_size(1200);
w.write(ws, buffer(s.data(), s.size()));
multi_buffer b;
w.read(ws, b);
BEAST_EXPECT(to_string(b.data()) == s);
});
}
//--------------------------------------------------------------------------
void
run() override
{
@ -103,42 +126,6 @@ public:
sizeof(websocket::stream<test::stream&>) << std::endl;
testOptions();
#if 0
auto const testStream =
[this](permessage_deflate const& pmd)
{
doTestStream(SyncClient{}, pmd);
yield_to(
[&](yield_context yield)
{
doTestStream(AsyncClient{yield}, pmd);
});
};
permessage_deflate pmd;
pmd.client_enable = false;
pmd.server_enable = false;
testStream(pmd);
pmd.client_enable = true;
pmd.server_enable = true;
pmd.client_max_window_bits = 10;
pmd.client_no_context_takeover = false;
pmd.compLevel = 1;
pmd.memLevel = 1;
testStream(pmd);
pmd.client_enable = true;
pmd.server_enable = true;
pmd.client_max_window_bits = 10;
pmd.client_no_context_takeover = true;
pmd.compLevel = 1;
pmd.memLevel = 1;
testStream(pmd);
#endif
}
};

View File

@ -264,7 +264,7 @@ public:
Test const& f, std::size_t limit = 200)
{
std::size_t n;
for(n = 0; n <= limit; ++n)
for(n = 0; n < limit; ++n)
{
test::fail_counter fc{n};
try
@ -288,7 +288,7 @@ public:
{
// This number has to be high for the
// test that writes the large buffer.
static std::size_t constexpr limit = 1000;
static std::size_t constexpr limit = 200;
doFailLoop(
[&](test::fail_counter& fc)
@ -308,12 +308,12 @@ public:
{
// This number has to be high for the
// test that writes the large buffer.
static std::size_t constexpr limit = 1000;
static std::size_t constexpr limit = 200;
for(int i = 0; i < 2; ++i)
{
std::size_t n;
for(n = 0; n <= limit; ++n)
for(n = 0; n < limit; ++n)
{
test::fail_counter fc{n};
test::stream ts{ios_, fc};

View File

@ -73,6 +73,20 @@ public:
BEAST_EXPECT(b.size() == 0);
});
// fragmented message
doTest(pmd, [&](ws_type& ws)
{
ws.auto_fragment(false);
ws.binary(false);
std::string const s = "Hello, world!";
w.write_some(ws, false, buffer(s.data(), 5));
w.write_some(ws, true, buffer(s.data() + 5, s.size() - 5));
multi_buffer b;
w.read(ws, b);
BEAST_EXPECT(ws.got_text());
BEAST_EXPECT(to_string(b.data()) == s);
});
// continuation
doTest(pmd, [&](ws_type& ws)
{

View File

@ -114,6 +114,7 @@ struct fail_error_code : error_code
class fail_counter
{
std::size_t n_;
std::size_t i_ = 0;
error_code ec_;
public:
@ -131,13 +132,20 @@ public:
{
}
/// Returns the fail index
std::size_t
count() const
{
return n_;
}
/// Throw an exception on the Nth failure
void
fail()
{
if(n_ > 0)
--n_;
if(! n_)
if(i_ < n_)
++i_;
if(i_ == n_)
BOOST_THROW_EXCEPTION(system_error{ec_});
}
@ -145,9 +153,9 @@ public:
bool
fail(error_code& ec)
{
if(n_ > 0)
--n_;
if(! n_)
if(i_ < n_)
++i_;
if(i_ == n_)
{
ec = ec_;
return true;

View File

@ -78,6 +78,11 @@ class stream
std::size_t write_max =
(std::numeric_limits<std::size_t>::max)();
~state()
{
BOOST_ASSERT(! op);
}
explicit
state(
boost::asio::io_service& ios_,
@ -87,11 +92,6 @@ class stream
{
}
~state()
{
BOOST_ASSERT(! op);
}
void
on_write()
{
@ -119,6 +119,10 @@ public:
/// Destructor
~stream()
{
{
std::unique_lock<std::mutex> lock{in_->m};
in_->op.reset();
}
auto out = out_.lock();
if(out)
{
@ -612,16 +616,17 @@ teardown(
stream& s,
boost::system::error_code& ec)
{
if(s.in_->fc)
{
if(s.in_->fc->fail(ec))
return;
}
if( s.in_->fc &&
s.in_->fc->fail(ec))
return;
s.close();
if( s.in_->fc &&
s.in_->fc->fail(ec))
ec = boost::asio::error::eof;
else
{
s.close();
ec.assign(0, ec.category());
}
}
template<class TeardownHandler>
@ -633,10 +638,17 @@ async_teardown(
TeardownHandler&& handler)
{
error_code ec;
if(s.in_->fc && s.in_->fc->fail(ec))
if( s.in_->fc &&
s.in_->fc->fail(ec))
return s.get_io_service().post(
bind_handler(std::move(handler), ec));
s.close();
if( s.in_->fc &&
s.in_->fc->fail(ec))
ec = boost::asio::error::eof;
else
ec.assign(0, ec.category());
s.get_io_service().post(
bind_handler(std::move(handler), ec));
}