Allow concurrent websocket async ping and writes:

fix #271

This modifies the websocket stream implementation's composed
operations to allow caller-initiated asynchronous pings and
frame/message writes to take place at the same time.
This commit is contained in:
Vinnie Falco
2017-02-24 10:03:33 -05:00
parent a22e7056d5
commit 0ec9e8c4af
8 changed files with 31 additions and 13 deletions

View File

@ -5,6 +5,7 @@ WebSocket
* Fix race in pings during reads
* Fix race in close frames during reads
* Fix race when write suspends
* Allow concurrent websocket async ping and writes
--------------------------------------------------------------------------------

View File

@ -320,8 +320,8 @@ operations can cause socket writes. However, these writes will not
compete with caller-initiated write operations. For the purposes of
correctness with respect to the stream invariants, caller-initiated
read operations still only count as a read. This means that callers can
have a simultaneous active read and write operation in progress, while
the implementation also automatically handles control frames.
have a simultaneously active read, write, and ping operation in progress,
while the implementation also automatically handles control frames.
[heading Ping and Pong Frames]

View File

@ -98,6 +98,7 @@ public:
{
if(other.base_)
{
// type-pun
base_ = reinterpret_cast<base*>(&buf_[0]);
other.base_->move(buf_);
other.base_ = nullptr;
@ -109,11 +110,12 @@ public:
{
// Engaged invokables must be invoked before
// assignment otherwise the io_service
// invariants are broken w.r.t completions.
// completion invariants are broken.
BOOST_ASSERT(! base_);
if(other.base_)
{
// type-pun
base_ = reinterpret_cast<base*>(&buf_[0]);
other.base_->move(buf_);
other.base_ = nullptr;
@ -147,6 +149,7 @@ invokable::emplace(F&& f)
"buffer too small");
BOOST_ASSERT(! base_);
::new(buf_) holder<F>(std::forward<F>(f));
// type-pun
base_ = reinterpret_cast<base*>(&buf_[0]);
}

View File

@ -67,8 +67,9 @@ protected:
op* wr_block_; // op currenly writing
ping_data* ping_data_; // where to put the payload
invokable rd_op_; // invoked after write completes
invokable wr_op_; // invoked after read completes
invokable rd_op_; // read parking
invokable wr_op_; // write parking
invokable ping_op_; // ping parking
close_reason cr_; // set from received close frame
// State information for the message being received

View File

@ -190,7 +190,8 @@ operator()(error_code ec, bool again)
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.rd_op_.maybe_invoke();
d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke();
d_.invoke(ec);
}

View File

@ -133,7 +133,7 @@ operator()(error_code ec, bool again)
{
// suspend
d.state = 2;
d.ws.wr_op_.template emplace<
d.ws.ping_op_.template emplace<
ping_op>(std::move(*this));
return;
}
@ -188,7 +188,8 @@ operator()(error_code ec, bool again)
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.rd_op_.maybe_invoke();
d.ws.rd_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke();
d_.invoke(ec);
}

View File

@ -673,7 +673,8 @@ operator()(error_code ec,
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.wr_op_.maybe_invoke();
d.ws.ping_op_.maybe_invoke() ||
d.ws.wr_op_.maybe_invoke();
d_.invoke(ec);
}

View File

@ -280,7 +280,10 @@ operator()(error_code ec,
d.fh.op = opcode::cont;
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
if(d.ws.rd_op_.maybe_invoke())
// Allow outgoing control frames to
// be sent in between message frames:
if(d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke())
{
d.state = do_maybe_suspend;
d.ws.get_io_service().post(
@ -382,7 +385,10 @@ operator()(error_code ec,
d.fh.op = opcode::cont;
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
if(d.ws.rd_op_.maybe_invoke())
// Allow outgoing control frames to
// be sent in between message frames:
if(d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke())
{
d.state = do_maybe_suspend;
d.ws.get_io_service().post(
@ -452,7 +458,10 @@ operator()(error_code ec,
d.fh.rsv1 = false;
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
if(d.ws.rd_op_.maybe_invoke())
// Allow outgoing control frames to
// be sent in between message frames:
if(d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke())
{
d.state = do_maybe_suspend;
d.ws.get_io_service().post(
@ -529,7 +538,8 @@ operator()(error_code ec,
upcall:
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
d.ws.rd_op_.maybe_invoke();
d.ws.rd_op_.maybe_invoke() ||
d.ws.ping_op_.maybe_invoke();
d_.invoke(ec);
}