mirror of
https://github.com/boostorg/beast.git
synced 2025-07-29 20:37:31 +02:00
Fix soft-mutex assert in websocket stream:
Fix #1000 This resolves the assert 'ws_.wr_block_ == tok_'.
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
Version 155:
|
||||
|
||||
* Fix memory leak in advanced server examples
|
||||
* Fix soft-mutex assert in websocket stream
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <boost/beast/core/buffers_suffix.hpp>
|
||||
#include <boost/beast/core/error.hpp>
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/type_index.hpp>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
|
||||
@ -25,6 +26,86 @@ namespace beast {
|
||||
namespace websocket {
|
||||
namespace detail {
|
||||
|
||||
// used to order reads and writes
|
||||
class type_mutex
|
||||
{
|
||||
boost::typeindex::type_index ti_ = typeid(type_mutex);
|
||||
|
||||
public:
|
||||
type_mutex() = default;
|
||||
type_mutex(type_mutex const&) = delete;
|
||||
type_mutex& operator=(type_mutex const&) = delete;
|
||||
|
||||
type_mutex(type_mutex&& other) noexcept
|
||||
: ti_(other.ti_)
|
||||
{
|
||||
other.ti_ = boost::typeindex::type_id<void>();
|
||||
}
|
||||
|
||||
type_mutex& operator=(type_mutex&& other) noexcept
|
||||
{
|
||||
ti_ = other.ti_;
|
||||
other.ti_ = boost::typeindex::type_id<void>();
|
||||
return *this;
|
||||
}
|
||||
|
||||
// VFALCO I'm not too happy that this function is needed
|
||||
void reset()
|
||||
{
|
||||
ti_ = typeid(void);
|
||||
}
|
||||
|
||||
bool is_locked() const
|
||||
{
|
||||
return ti_ != boost::typeindex::type_id<void>();
|
||||
}
|
||||
|
||||
template<class T>
|
||||
bool is_locked(T const*) const
|
||||
{
|
||||
return ti_ == boost::typeindex::type_id<T>();
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void lock(T const*)
|
||||
{
|
||||
BOOST_ASSERT(ti_ == boost::typeindex::type_id<void>());
|
||||
ti_ = typeid(T);
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void unlock(T const*)
|
||||
{
|
||||
BOOST_ASSERT(ti_ == boost::typeindex::type_id<T>());
|
||||
ti_ = typeid(void);
|
||||
}
|
||||
|
||||
template<class T>
|
||||
bool try_lock(T const*)
|
||||
{
|
||||
// If this assert goes off it means you are attempting to
|
||||
// simultaneously initiate more than one of same asynchronous
|
||||
// operation, which is not allowed. For example, you must wait
|
||||
// for an async_read to complete before performing another
|
||||
// async_read.
|
||||
//
|
||||
BOOST_ASSERT(ti_ != boost::typeindex::type_id<T>());
|
||||
if(ti_ != boost::typeindex::type_id<void>())
|
||||
return false;
|
||||
ti_ = typeid(T);
|
||||
return true;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
bool try_unlock(T const*)
|
||||
{
|
||||
if(ti_ != boost::typeindex::type_id<T>())
|
||||
return false;
|
||||
ti_ = boost::typeindex::type_id<void>();
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
template<bool deflateSupported>
|
||||
struct stream_base
|
||||
{
|
||||
|
@ -44,7 +44,6 @@ class stream<NextLayer, deflateSupported>::close_op
|
||||
stream<NextLayer, deflateSupported>& ws;
|
||||
detail::frame_buffer fb;
|
||||
error_code ev;
|
||||
token tok;
|
||||
bool cont = false;
|
||||
|
||||
state(
|
||||
@ -52,7 +51,6 @@ class stream<NextLayer, deflateSupported>::close_op
|
||||
stream<NextLayer, deflateSupported>& ws_,
|
||||
close_reason const& cr)
|
||||
: ws(ws_)
|
||||
, tok(ws.tok_.unique())
|
||||
{
|
||||
// Serialize the close frame
|
||||
ws.template write_close<
|
||||
@ -125,11 +123,8 @@ operator()(
|
||||
BOOST_ASIO_CORO_REENTER(*this)
|
||||
{
|
||||
// Maybe suspend
|
||||
if(! d.ws.wr_block_)
|
||||
if(d.ws.wr_block_.try_lock(this))
|
||||
{
|
||||
// Acquire the write block
|
||||
d.ws.wr_block_ = d.tok;
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! d.ws.check_open(ec))
|
||||
goto upcall;
|
||||
@ -137,19 +132,17 @@ operator()(
|
||||
else
|
||||
{
|
||||
// Suspend
|
||||
BOOST_ASSERT(d.ws.wr_block_ != d.tok);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
d.ws.paused_close_.emplace(std::move(*this));
|
||||
|
||||
// Acquire the write block
|
||||
BOOST_ASSERT(! d.ws.wr_block_);
|
||||
d.ws.wr_block_ = d.tok;
|
||||
d.ws.wr_block_.lock(this);
|
||||
|
||||
// Resume
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::post(
|
||||
d.ws.get_executor(), std::move(*this));
|
||||
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
|
||||
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! d.ws.check_open(ec))
|
||||
@ -180,27 +173,20 @@ operator()(
|
||||
}
|
||||
|
||||
// Maybe suspend
|
||||
if(! d.ws.rd_block_)
|
||||
{
|
||||
// Acquire the read block
|
||||
d.ws.rd_block_ = d.tok;
|
||||
}
|
||||
else
|
||||
if(! d.ws.rd_block_.try_lock(this))
|
||||
{
|
||||
// Suspend
|
||||
BOOST_ASSERT(d.ws.rd_block_ != d.tok);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
d.ws.paused_r_close_.emplace(std::move(*this));
|
||||
|
||||
// Acquire the read block
|
||||
BOOST_ASSERT(! d.ws.rd_block_);
|
||||
d.ws.rd_block_ = d.tok;
|
||||
d.ws.rd_block_.lock(this);
|
||||
|
||||
// Resume
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::post(
|
||||
d.ws.get_executor(), std::move(*this));
|
||||
BOOST_ASSERT(d.ws.rd_block_ == d.tok);
|
||||
BOOST_ASSERT(d.ws.rd_block_.is_locked(this));
|
||||
|
||||
// Make sure the stream is open
|
||||
BOOST_ASSERT(d.ws.status_ != status::open);
|
||||
@ -275,12 +261,12 @@ operator()(
|
||||
|
||||
teardown:
|
||||
// Teardown
|
||||
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
|
||||
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
|
||||
using beast::websocket::async_teardown;
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
async_teardown(d.ws.role_,
|
||||
d.ws.stream_, std::move(*this));
|
||||
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
|
||||
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
|
||||
if(ec == boost::asio::error::eof)
|
||||
{
|
||||
// Rationale:
|
||||
@ -296,13 +282,10 @@ operator()(
|
||||
d.ws.close();
|
||||
|
||||
upcall:
|
||||
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
|
||||
d.ws.wr_block_.reset();
|
||||
if(d.ws.rd_block_ == d.tok)
|
||||
{
|
||||
d.ws.rd_block_.reset();
|
||||
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
|
||||
d.ws.wr_block_.unlock(this);
|
||||
if(d.ws.rd_block_.try_unlock(this))
|
||||
d.ws.paused_r_rd_.maybe_invoke();
|
||||
}
|
||||
d.ws.paused_rd_.maybe_invoke() ||
|
||||
d.ws.paused_ping_.maybe_invoke() ||
|
||||
d.ws.paused_wr_.maybe_invoke();
|
||||
|
@ -41,7 +41,6 @@ class stream<NextLayer, deflateSupported>::ping_op
|
||||
{
|
||||
stream<NextLayer, deflateSupported>& ws;
|
||||
detail::frame_buffer fb;
|
||||
token tok;
|
||||
|
||||
state(
|
||||
Handler const&,
|
||||
@ -49,7 +48,6 @@ class stream<NextLayer, deflateSupported>::ping_op
|
||||
detail::opcode op,
|
||||
ping_data const& payload)
|
||||
: ws(ws_)
|
||||
, tok(ws.tok_.unique())
|
||||
{
|
||||
// Serialize the control frame
|
||||
ws.template write_ping<
|
||||
@ -118,11 +116,8 @@ operator()(error_code ec, std::size_t)
|
||||
BOOST_ASIO_CORO_REENTER(*this)
|
||||
{
|
||||
// Maybe suspend
|
||||
if(! d.ws.wr_block_)
|
||||
if(d.ws.wr_block_.try_lock(this))
|
||||
{
|
||||
// Acquire the write block
|
||||
d.ws.wr_block_ = d.tok;
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! d.ws.check_open(ec))
|
||||
{
|
||||
@ -136,19 +131,17 @@ operator()(error_code ec, std::size_t)
|
||||
else
|
||||
{
|
||||
// Suspend
|
||||
BOOST_ASSERT(d.ws.wr_block_ != d.tok);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
d.ws.paused_ping_.emplace(std::move(*this));
|
||||
|
||||
// Acquire the write block
|
||||
BOOST_ASSERT(! d.ws.wr_block_);
|
||||
d.ws.wr_block_ = d.tok;
|
||||
d.ws.wr_block_.lock(this);
|
||||
|
||||
// Resume
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::post(
|
||||
d.ws.get_executor(), std::move(*this));
|
||||
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
|
||||
BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! d.ws.check_open(ec))
|
||||
@ -163,8 +156,7 @@ operator()(error_code ec, std::size_t)
|
||||
goto upcall;
|
||||
|
||||
upcall:
|
||||
BOOST_ASSERT(d.ws.wr_block_ == d.tok);
|
||||
d.ws.wr_block_.reset();
|
||||
d.ws.wr_block_.unlock(this);
|
||||
d.ws.paused_close_.maybe_invoke() ||
|
||||
d.ws.paused_rd_.maybe_invoke() ||
|
||||
d.ws.paused_wr_.maybe_invoke();
|
||||
|
@ -85,7 +85,6 @@ class stream<NextLayer, deflateSupported>::read_some_op
|
||||
buffers_suffix<MutableBufferSequence> cb_;
|
||||
std::size_t bytes_written_ = 0;
|
||||
error_code result_;
|
||||
token tok_;
|
||||
close_code code_;
|
||||
bool did_read_ = false;
|
||||
bool cont_ = false;
|
||||
@ -103,7 +102,6 @@ public:
|
||||
, ws_(ws)
|
||||
, bs_(bs)
|
||||
, cb_(bs)
|
||||
, tok_(ws_.tok_.unique())
|
||||
, code_(close_code::none)
|
||||
{
|
||||
}
|
||||
@ -165,11 +163,8 @@ operator()(
|
||||
{
|
||||
// Maybe suspend
|
||||
do_maybe_suspend:
|
||||
if(! ws_.rd_block_)
|
||||
if(ws_.rd_block_.try_lock(this))
|
||||
{
|
||||
// Acquire the read block
|
||||
ws_.rd_block_ = tok_;
|
||||
|
||||
// Make sure the stream is not closed
|
||||
if( ws_.status_ == status::closed ||
|
||||
ws_.status_ == status::failed)
|
||||
@ -182,19 +177,17 @@ operator()(
|
||||
{
|
||||
do_suspend:
|
||||
// Suspend
|
||||
BOOST_ASSERT(ws_.rd_block_ != tok_);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
ws_.paused_r_rd_.emplace(std::move(*this));
|
||||
|
||||
// Acquire the read block
|
||||
BOOST_ASSERT(! ws_.rd_block_);
|
||||
ws_.rd_block_ = tok_;
|
||||
ws_.rd_block_.lock(this);
|
||||
|
||||
// Resume
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::post(
|
||||
ws_.get_executor(), std::move(*this));
|
||||
BOOST_ASSERT(ws_.rd_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.rd_block_.is_locked(this));
|
||||
|
||||
// The only way to get read blocked is if
|
||||
// a `close_op` wrote a close frame
|
||||
@ -209,7 +202,7 @@ operator()(
|
||||
// then finish the read with operation_aborted.
|
||||
|
||||
loop:
|
||||
BOOST_ASSERT(ws_.rd_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.rd_block_.is_locked(this));
|
||||
// See if we need to read a frame header. This
|
||||
// condition is structured to give the decompressor
|
||||
// a chance to emit the final empty deflate block
|
||||
@ -230,29 +223,28 @@ operator()(
|
||||
code_ = close_code::protocol_error;
|
||||
goto close;
|
||||
}
|
||||
BOOST_ASSERT(ws_.rd_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.rd_block_.is_locked(this));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
ws_.stream_.async_read_some(
|
||||
ws_.rd_buf_.prepare(read_size(
|
||||
ws_.rd_buf_, ws_.rd_buf_.max_size())),
|
||||
std::move(*this));
|
||||
BOOST_ASSERT(ws_.rd_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.rd_block_.is_locked(this));
|
||||
if(! ws_.check_ok(ec))
|
||||
goto upcall;
|
||||
ws_.rd_buf_.commit(bytes_transferred);
|
||||
|
||||
// Allow a close operation
|
||||
// to acquire the read block
|
||||
BOOST_ASSERT(ws_.rd_block_ == tok_);
|
||||
ws_.rd_block_.reset();
|
||||
ws_.rd_block_.unlock(this);
|
||||
if( ws_.paused_r_close_.maybe_invoke())
|
||||
{
|
||||
// Suspend
|
||||
BOOST_ASSERT(ws_.rd_block_);
|
||||
BOOST_ASSERT(ws_.rd_block_.is_locked());
|
||||
goto do_suspend;
|
||||
}
|
||||
// Acquire read block
|
||||
ws_.rd_block_ = tok_;
|
||||
ws_.rd_block_.lock(this);
|
||||
}
|
||||
// Immediately apply the mask to the portion
|
||||
// of the buffer holding payload data.
|
||||
@ -302,36 +294,26 @@ operator()(
|
||||
detail::opcode::pong, payload);
|
||||
}
|
||||
|
||||
//BOOST_ASSERT(! ws_.paused_r_close_);
|
||||
|
||||
// Allow a close operation
|
||||
// to acquire the read block
|
||||
BOOST_ASSERT(ws_.rd_block_ == tok_);
|
||||
ws_.rd_block_.reset();
|
||||
ws_.rd_block_.unlock(this);
|
||||
ws_.paused_r_close_.maybe_invoke();
|
||||
|
||||
// Maybe suspend
|
||||
if(! ws_.wr_block_)
|
||||
{
|
||||
// Acquire the write block
|
||||
ws_.wr_block_ = tok_;
|
||||
}
|
||||
else
|
||||
if(! ws_.wr_block_.try_lock(this))
|
||||
{
|
||||
// Suspend
|
||||
BOOST_ASSERT(ws_.wr_block_ != tok_);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
ws_.paused_rd_.emplace(std::move(*this));
|
||||
|
||||
// Acquire the write block
|
||||
BOOST_ASSERT(! ws_.wr_block_);
|
||||
ws_.wr_block_ = tok_;
|
||||
ws_.wr_block_.lock(this);
|
||||
|
||||
// Resume
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::post(
|
||||
ws_.get_executor(), std::move(*this));
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! ws_.check_open(ec))
|
||||
@ -339,14 +321,14 @@ operator()(
|
||||
}
|
||||
|
||||
// Send pong
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_write(ws_.stream_,
|
||||
ws_.rd_fb_.data(), std::move(*this));
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
if(! ws_.check_ok(ec))
|
||||
goto upcall;
|
||||
ws_.wr_block_.reset();
|
||||
ws_.wr_block_.unlock(this);
|
||||
ws_.paused_close_.maybe_invoke() ||
|
||||
ws_.paused_ping_.maybe_invoke() ||
|
||||
ws_.paused_wr_.maybe_invoke();
|
||||
@ -629,30 +611,25 @@ operator()(
|
||||
goto upcall;
|
||||
|
||||
close:
|
||||
if(! ws_.wr_block_)
|
||||
if(ws_.wr_block_.try_lock(this))
|
||||
{
|
||||
// Acquire the write block
|
||||
ws_.wr_block_ = tok_;
|
||||
|
||||
// Make sure the stream is open
|
||||
BOOST_ASSERT(ws_.status_ == status::open);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Suspend
|
||||
BOOST_ASSERT(ws_.wr_block_ != tok_);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
ws_.paused_rd_.emplace(std::move(*this));
|
||||
|
||||
// Acquire the write block
|
||||
BOOST_ASSERT(! ws_.wr_block_);
|
||||
ws_.wr_block_ = tok_;
|
||||
ws_.wr_block_.lock(this);
|
||||
|
||||
// Resume
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::post(
|
||||
ws_.get_executor(), std::move(*this));
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! ws_.check_open(ec))
|
||||
@ -673,23 +650,23 @@ operator()(
|
||||
ws_.rd_fb_, code_);
|
||||
|
||||
// Send close frame
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_write(
|
||||
ws_.stream_, ws_.rd_fb_.data(),
|
||||
std::move(*this));
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
if(! ws_.check_ok(ec))
|
||||
goto upcall;
|
||||
}
|
||||
|
||||
// Teardown
|
||||
using beast::websocket::async_teardown;
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
async_teardown(ws_.role_,
|
||||
ws_.stream_, std::move(*this));
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
if(ec == boost::asio::error::eof)
|
||||
{
|
||||
// Rationale:
|
||||
@ -705,16 +682,12 @@ operator()(
|
||||
ws_.close();
|
||||
|
||||
upcall:
|
||||
if(ws_.rd_block_ == tok_)
|
||||
ws_.rd_block_.reset();
|
||||
ws_.rd_block_.try_unlock(this);
|
||||
ws_.paused_r_close_.maybe_invoke();
|
||||
if(ws_.wr_block_ == tok_)
|
||||
{
|
||||
ws_.wr_block_.reset();
|
||||
if(ws_.wr_block_.try_unlock(this))
|
||||
ws_.paused_close_.maybe_invoke() ||
|
||||
ws_.paused_ping_.maybe_invoke() ||
|
||||
ws_.paused_wr_.maybe_invoke();
|
||||
}
|
||||
if(! cont_)
|
||||
return boost::asio::post(
|
||||
ws_.stream_.get_executor(),
|
||||
|
@ -45,7 +45,6 @@ template<class... Args>
|
||||
stream<NextLayer, deflateSupported>::
|
||||
stream(Args&&... args)
|
||||
: stream_(std::forward<Args>(args)...)
|
||||
, tok_(1)
|
||||
{
|
||||
BOOST_ASSERT(rd_buf_.max_size() >=
|
||||
max_control_frame_size);
|
||||
@ -125,6 +124,9 @@ open(role_type role)
|
||||
rd_fh_.fin = false;
|
||||
rd_close_ = false;
|
||||
wr_close_ = false;
|
||||
// These should not be necessary, because all completion
|
||||
// handlers must be allowed to execute otherwise the
|
||||
// stream exhibits undefined behavior.
|
||||
wr_block_.reset();
|
||||
rd_block_.reset();
|
||||
cr_.code = close_code::none;
|
||||
@ -196,6 +198,9 @@ reset()
|
||||
rd_close_ = false;
|
||||
wr_close_ = false;
|
||||
wr_cont_ = false;
|
||||
// These should not be necessary, because all completion
|
||||
// handlers must be allowed to execute otherwise the
|
||||
// stream exhibits undefined behavior.
|
||||
wr_block_.reset();
|
||||
rd_block_.reset();
|
||||
cr_.code = close_code::none;
|
||||
|
@ -146,7 +146,6 @@ class stream<NextLayer, deflateSupported>::write_some_op
|
||||
std::size_t bytes_transferred_ = 0;
|
||||
std::size_t remain_;
|
||||
std::size_t in_;
|
||||
token tok_;
|
||||
int how_;
|
||||
bool fin_;
|
||||
bool more_;
|
||||
@ -165,7 +164,6 @@ public:
|
||||
: h_(std::forward<DeducedHandler>(h))
|
||||
, ws_(ws)
|
||||
, cb_(bs)
|
||||
, tok_(ws_.tok_.unique())
|
||||
, fin_(fin)
|
||||
{
|
||||
}
|
||||
@ -293,11 +291,8 @@ operator()(
|
||||
}
|
||||
|
||||
// Maybe suspend
|
||||
if(! ws_.wr_block_)
|
||||
if(ws_.wr_block_.try_lock(this))
|
||||
{
|
||||
// Acquire the write block
|
||||
ws_.wr_block_ = tok_;
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! ws_.check_open(ec))
|
||||
goto upcall;
|
||||
@ -306,19 +301,17 @@ operator()(
|
||||
{
|
||||
do_suspend:
|
||||
// Suspend
|
||||
BOOST_ASSERT(ws_.wr_block_ != tok_);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
ws_.paused_wr_.emplace(std::move(*this));
|
||||
|
||||
// Acquire the write block
|
||||
BOOST_ASSERT(! ws_.wr_block_);
|
||||
ws_.wr_block_ = tok_;
|
||||
ws_.wr_block_.lock(this);
|
||||
|
||||
// Resume
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::post(
|
||||
ws_.get_executor(), std::move(*this));
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked(this));
|
||||
|
||||
// Make sure the stream is open
|
||||
if(! ws_.check_open(ec))
|
||||
@ -377,15 +370,15 @@ operator()(
|
||||
fh_.op = detail::opcode::cont;
|
||||
// Allow outgoing control frames to
|
||||
// be sent in between message frames
|
||||
ws_.wr_block_.reset();
|
||||
ws_.wr_block_.unlock(this);
|
||||
if( ws_.paused_close_.maybe_invoke() ||
|
||||
ws_.paused_rd_.maybe_invoke() ||
|
||||
ws_.paused_ping_.maybe_invoke())
|
||||
{
|
||||
BOOST_ASSERT(ws_.wr_block_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked());
|
||||
goto do_suspend;
|
||||
}
|
||||
ws_.wr_block_ = tok_;
|
||||
ws_.wr_block_.lock(this);
|
||||
}
|
||||
goto upcall;
|
||||
}
|
||||
@ -476,15 +469,15 @@ operator()(
|
||||
fh_.op = detail::opcode::cont;
|
||||
// Allow outgoing control frames to
|
||||
// be sent in between message frames:
|
||||
ws_.wr_block_.reset();
|
||||
ws_.wr_block_.unlock(this);
|
||||
if( ws_.paused_close_.maybe_invoke() ||
|
||||
ws_.paused_rd_.maybe_invoke() ||
|
||||
ws_.paused_ping_.maybe_invoke())
|
||||
{
|
||||
BOOST_ASSERT(ws_.wr_block_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked());
|
||||
goto do_suspend;
|
||||
}
|
||||
ws_.wr_block_ = tok_;
|
||||
ws_.wr_block_.lock(this);
|
||||
}
|
||||
goto upcall;
|
||||
}
|
||||
@ -537,15 +530,15 @@ operator()(
|
||||
fh_.rsv1 = false;
|
||||
// Allow outgoing control frames to
|
||||
// be sent in between message frames:
|
||||
ws_.wr_block_.reset();
|
||||
ws_.wr_block_.unlock(this);
|
||||
if( ws_.paused_close_.maybe_invoke() ||
|
||||
ws_.paused_rd_.maybe_invoke() ||
|
||||
ws_.paused_ping_.maybe_invoke())
|
||||
{
|
||||
BOOST_ASSERT(ws_.wr_block_);
|
||||
BOOST_ASSERT(ws_.wr_block_.is_locked());
|
||||
goto do_suspend;
|
||||
}
|
||||
ws_.wr_block_ = tok_;
|
||||
ws_.wr_block_.lock(this);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -559,8 +552,7 @@ operator()(
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
upcall:
|
||||
BOOST_ASSERT(ws_.wr_block_ == tok_);
|
||||
ws_.wr_block_.reset();
|
||||
ws_.wr_block_.unlock(this);
|
||||
ws_.paused_close_.maybe_invoke() ||
|
||||
ws_.paused_rd_.maybe_invoke() ||
|
||||
ws_.paused_ping_.maybe_invoke();
|
||||
|
@ -147,21 +147,6 @@ class stream
|
||||
using control_cb_type =
|
||||
std::function<void(frame_type, string_view)>;
|
||||
|
||||
// tokens are used to order reads and writes
|
||||
class token
|
||||
{
|
||||
unsigned char id_ = 0;
|
||||
public:
|
||||
token() = default;
|
||||
token(token const&) = default;
|
||||
explicit token(unsigned char id) : id_(id) {}
|
||||
operator bool() const { return id_ != 0; }
|
||||
bool operator==(token const& t) { return id_ == t.id_; }
|
||||
bool operator!=(token const& t) { return id_ != t.id_; }
|
||||
token unique() { token t{id_++}; if(id_ == 0) ++id_; return t; }
|
||||
void reset() { id_ = 0; }
|
||||
};
|
||||
|
||||
enum class status
|
||||
{
|
||||
open,
|
||||
@ -195,15 +180,14 @@ class stream
|
||||
= true;
|
||||
bool rd_close_ // did we read a close frame?
|
||||
= false;
|
||||
token rd_block_; // op currenly reading
|
||||
detail::type_mutex rd_block_; // op currenly reading
|
||||
|
||||
token tok_; // used to order asynchronous ops
|
||||
role_type role_ // server or client
|
||||
= role_type::client;
|
||||
status status_
|
||||
= status::closed;
|
||||
|
||||
token wr_block_; // op currenly writing
|
||||
detail::type_mutex wr_block_; // op currenly writing
|
||||
bool wr_close_ // did we write a close frame?
|
||||
= false;
|
||||
bool wr_cont_ // next write is a continuation
|
||||
|
@ -187,7 +187,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
BEAST_EXPECT(count == 0);
|
||||
ws.async_close({},
|
||||
[&](error_code ec)
|
||||
@ -220,7 +220,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == 1);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
BEAST_EXPECT(count == 0);
|
||||
ws.async_close({},
|
||||
[&](error_code ec)
|
||||
@ -256,7 +256,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -297,7 +297,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 1);
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -338,7 +338,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 1);
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -433,7 +433,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 3);
|
||||
});
|
||||
BEAST_EXPECT(ws.rd_block_);
|
||||
BEAST_EXPECT(ws.rd_block_.is_locked());
|
||||
ws.async_close({},
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -443,7 +443,7 @@ public:
|
||||
BEAST_EXPECT(++count == 2);
|
||||
});
|
||||
BEAST_EXPECT(ws.is_open());
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
BEAST_EXPECT(count == 0);
|
||||
ioc.run();
|
||||
BEAST_EXPECT(count == 3);
|
||||
|
@ -112,7 +112,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == 12);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
BEAST_EXPECT(count == 0);
|
||||
ws.async_ping({},
|
||||
[&](error_code ec)
|
||||
@ -144,7 +144,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
BEAST_EXPECT(count == 0);
|
||||
ws.async_ping({},
|
||||
[&](error_code ec)
|
||||
@ -180,7 +180,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -222,7 +222,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -263,7 +263,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -303,7 +303,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -342,7 +342,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == 1);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ws.async_ping("",
|
||||
[&](error_code ec)
|
||||
{
|
||||
|
@ -43,7 +43,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 1);
|
||||
});
|
||||
while(! ws.rd_block_)
|
||||
while(! ws.rd_block_.is_locked())
|
||||
ioc.run_one();
|
||||
multi_buffer b;
|
||||
ws.async_read(b,
|
||||
@ -78,7 +78,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 2);
|
||||
});
|
||||
BOOST_ASSERT(ws.rd_block_);
|
||||
BOOST_ASSERT(ws.rd_block_.is_locked());
|
||||
ws.async_close({},
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -115,7 +115,7 @@ public:
|
||||
BEAST_EXPECT(to_string(b.data()) == s);
|
||||
++count;
|
||||
});
|
||||
BEAST_EXPECT(ws.rd_block_);
|
||||
BEAST_EXPECT(ws.rd_block_.is_locked());
|
||||
ws.async_write(buffer(s),
|
||||
[&](error_code ec, std::size_t n)
|
||||
{
|
||||
@ -125,7 +125,7 @@ public:
|
||||
BEAST_EXPECT(n == s.size());
|
||||
++count;
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ioc.run();
|
||||
BEAST_EXPECT(count == 2);
|
||||
});
|
||||
@ -164,7 +164,7 @@ public:
|
||||
BEAST_EXPECT(++count == 3);
|
||||
});
|
||||
});
|
||||
BEAST_EXPECT(ws.rd_block_);
|
||||
BEAST_EXPECT(ws.rd_block_.is_locked());
|
||||
ws.async_close({},
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -173,7 +173,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 2);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ioc.run();
|
||||
BEAST_EXPECT(count == 3);
|
||||
});
|
||||
@ -202,7 +202,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 2);
|
||||
});
|
||||
BEAST_EXPECT(ws.rd_block_);
|
||||
BEAST_EXPECT(ws.rd_block_.is_locked());
|
||||
ws.async_close({},
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -211,7 +211,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(++count == 1);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ioc.run();
|
||||
BEAST_EXPECT(count == 2);
|
||||
});
|
||||
|
@ -287,7 +287,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
BEAST_EXPECT(count == 0);
|
||||
ws.async_write(sbuf("*"),
|
||||
[&](error_code ec, std::size_t n)
|
||||
@ -320,7 +320,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
BEAST_EXPECT(count == 0);
|
||||
ws.async_write(sbuf("*"),
|
||||
[&](error_code ec, std::size_t)
|
||||
@ -356,7 +356,7 @@ public:
|
||||
BOOST_THROW_EXCEPTION(
|
||||
system_error{ec});
|
||||
});
|
||||
while(! ws.wr_block_)
|
||||
while(! ws.wr_block_.is_locked())
|
||||
{
|
||||
ioc.run_one();
|
||||
if(! BEAST_EXPECT(! ioc.stopped()))
|
||||
@ -398,7 +398,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == 16384);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ws.async_ping("",
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -432,7 +432,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == 16384);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ws.async_ping("",
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -466,7 +466,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == 16384);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ws.async_ping("",
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -499,7 +499,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == 16384);
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ws.async_ping("",
|
||||
[&](error_code ec)
|
||||
{
|
||||
@ -537,7 +537,7 @@ public:
|
||||
system_error{ec});
|
||||
BEAST_EXPECT(n == s.size());
|
||||
});
|
||||
BEAST_EXPECT(ws.wr_block_);
|
||||
BEAST_EXPECT(ws.wr_block_.is_locked());
|
||||
ws.async_ping("",
|
||||
[&](error_code ec)
|
||||
{
|
||||
|
Reference in New Issue
Block a user