From 0ec9e8c4af8a6c51ffe2aa4630839e2083d1de70 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 24 Feb 2017 10:03:33 -0500 Subject: [PATCH] Allow concurrent websocket async ping and writes: fix #271 This modifies the websocket stream implementation's composed operations to allow caller-initiated asynchronous pings and frame/message writes to take place at the same time. --- CHANGELOG.md | 1 + doc/websocket.qbk | 4 ++-- include/beast/websocket/detail/invokable.hpp | 5 ++++- include/beast/websocket/detail/stream_base.hpp | 5 +++-- include/beast/websocket/impl/close.ipp | 3 ++- include/beast/websocket/impl/ping.ipp | 5 +++-- include/beast/websocket/impl/read.ipp | 3 ++- include/beast/websocket/impl/write.ipp | 18 ++++++++++++++---- 8 files changed, 31 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39e7d487..383491ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ WebSocket * Fix race in pings during reads * Fix race in close frames during reads * Fix race when write suspends +* Allow concurrent websocket async ping and writes -------------------------------------------------------------------------------- diff --git a/doc/websocket.qbk b/doc/websocket.qbk index ce989f04..77bcc2bd 100644 --- a/doc/websocket.qbk +++ b/doc/websocket.qbk @@ -320,8 +320,8 @@ operations can cause socket writes. However, these writes will not compete with caller-initiated write operations. For the purposes of correctness with respect to the stream invariants, caller-initiated read operations still only count as a read. This means that callers can -have a simultaneous active read and write operation in progress, while -the implementation also automatically handles control frames. +have a simultaneously active read, write, and ping operation in progress, +while the implementation also automatically handles control frames. [heading Ping and Pong Frames] diff --git a/include/beast/websocket/detail/invokable.hpp b/include/beast/websocket/detail/invokable.hpp index 62e62492..80701083 100644 --- a/include/beast/websocket/detail/invokable.hpp +++ b/include/beast/websocket/detail/invokable.hpp @@ -98,6 +98,7 @@ public: { if(other.base_) { + // type-pun base_ = reinterpret_cast(&buf_[0]); other.base_->move(buf_); other.base_ = nullptr; @@ -109,11 +110,12 @@ public: { // Engaged invokables must be invoked before // assignment otherwise the io_service - // invariants are broken w.r.t completions. + // completion invariants are broken. BOOST_ASSERT(! base_); if(other.base_) { + // type-pun base_ = reinterpret_cast(&buf_[0]); other.base_->move(buf_); other.base_ = nullptr; @@ -147,6 +149,7 @@ invokable::emplace(F&& f) "buffer too small"); BOOST_ASSERT(! base_); ::new(buf_) holder(std::forward(f)); + // type-pun base_ = reinterpret_cast(&buf_[0]); } diff --git a/include/beast/websocket/detail/stream_base.hpp b/include/beast/websocket/detail/stream_base.hpp index 0a1e2679..6cd6731f 100644 --- a/include/beast/websocket/detail/stream_base.hpp +++ b/include/beast/websocket/detail/stream_base.hpp @@ -67,8 +67,9 @@ protected: op* wr_block_; // op currenly writing ping_data* ping_data_; // where to put the payload - invokable rd_op_; // invoked after write completes - invokable wr_op_; // invoked after read completes + invokable rd_op_; // read parking + invokable wr_op_; // write parking + invokable ping_op_; // ping parking close_reason cr_; // set from received close frame // State information for the message being received diff --git a/include/beast/websocket/impl/close.ipp b/include/beast/websocket/impl/close.ipp index 6bf3a452..983126f9 100644 --- a/include/beast/websocket/impl/close.ipp +++ b/include/beast/websocket/impl/close.ipp @@ -190,7 +190,8 @@ operator()(error_code ec, bool again) upcall: if(d.ws.wr_block_ == &d) d.ws.wr_block_ = nullptr; - d.ws.rd_op_.maybe_invoke(); + d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke(); d_.invoke(ec); } diff --git a/include/beast/websocket/impl/ping.ipp b/include/beast/websocket/impl/ping.ipp index 657eb5d2..59071376 100644 --- a/include/beast/websocket/impl/ping.ipp +++ b/include/beast/websocket/impl/ping.ipp @@ -133,7 +133,7 @@ operator()(error_code ec, bool again) { // suspend d.state = 2; - d.ws.wr_op_.template emplace< + d.ws.ping_op_.template emplace< ping_op>(std::move(*this)); return; } @@ -188,7 +188,8 @@ operator()(error_code ec, bool again) upcall: if(d.ws.wr_block_ == &d) d.ws.wr_block_ = nullptr; - d.ws.rd_op_.maybe_invoke(); + d.ws.rd_op_.maybe_invoke() || + d.ws.wr_op_.maybe_invoke(); d_.invoke(ec); } diff --git a/include/beast/websocket/impl/read.ipp b/include/beast/websocket/impl/read.ipp index dfecdbae..8c715de5 100644 --- a/include/beast/websocket/impl/read.ipp +++ b/include/beast/websocket/impl/read.ipp @@ -673,7 +673,8 @@ operator()(error_code ec, upcall: if(d.ws.wr_block_ == &d) d.ws.wr_block_ = nullptr; - d.ws.wr_op_.maybe_invoke(); + d.ws.ping_op_.maybe_invoke() || + d.ws.wr_op_.maybe_invoke(); d_.invoke(ec); } diff --git a/include/beast/websocket/impl/write.ipp b/include/beast/websocket/impl/write.ipp index 5f6a46e2..4b010678 100644 --- a/include/beast/websocket/impl/write.ipp +++ b/include/beast/websocket/impl/write.ipp @@ -280,7 +280,10 @@ operator()(error_code ec, d.fh.op = opcode::cont; if(d.ws.wr_block_ == &d) d.ws.wr_block_ = nullptr; - if(d.ws.rd_op_.maybe_invoke()) + // Allow outgoing control frames to + // be sent in between message frames: + if(d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke()) { d.state = do_maybe_suspend; d.ws.get_io_service().post( @@ -382,7 +385,10 @@ operator()(error_code ec, d.fh.op = opcode::cont; BOOST_ASSERT(d.ws.wr_block_ == &d); d.ws.wr_block_ = nullptr; - if(d.ws.rd_op_.maybe_invoke()) + // Allow outgoing control frames to + // be sent in between message frames: + if(d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke()) { d.state = do_maybe_suspend; d.ws.get_io_service().post( @@ -452,7 +458,10 @@ operator()(error_code ec, d.fh.rsv1 = false; BOOST_ASSERT(d.ws.wr_block_ == &d); d.ws.wr_block_ = nullptr; - if(d.ws.rd_op_.maybe_invoke()) + // Allow outgoing control frames to + // be sent in between message frames: + if(d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke()) { d.state = do_maybe_suspend; d.ws.get_io_service().post( @@ -529,7 +538,8 @@ operator()(error_code ec, upcall: if(d.ws.wr_block_ == &d) d.ws.wr_block_ = nullptr; - d.ws.rd_op_.maybe_invoke(); + d.ws.rd_op_.maybe_invoke() || + d.ws.ping_op_.maybe_invoke(); d_.invoke(ec); }