From 0d6986805115057ea2d0e85bd0f1fb091adb4aa7 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 11 May 2016 07:56:36 -0400 Subject: [PATCH] Fix resume of invokables in websocket composed operations: When a suspended composed operation is resumed, the operation now posts to the io_service to get in the correct context. Previously, invokables resumed in the context of a different completion handler. * asio_handler_invoke for any resumed invokable will return `true`. --- TODO.txt | 4 + include/beast/websocket/impl/close_op.ipp | 31 +++--- include/beast/websocket/impl/handshake_op.ipp | 16 +--- .../beast/websocket/impl/read_frame_op.ipp | 94 +++++++++++-------- include/beast/websocket/impl/teardown.ipp | 3 +- .../beast/websocket/impl/write_frame_op.ipp | 26 ++--- 6 files changed, 97 insertions(+), 77 deletions(-) diff --git a/TODO.txt b/TODO.txt index f830acf7..ff6a6c23 100644 --- a/TODO.txt +++ b/TODO.txt @@ -32,6 +32,10 @@ WebSocket: * Don't try to read requests into empty_body * Give callers control over the http request/response used during handshake * Investigate poor autobahn results in Debug builds +* Fall through composed operation switch cases +* Replace stream::error_ with stream::state_, example states: ok, error, abort_io + Need a cancel state so waking up a ping stored in invokable knows to call the + final handler with operation_aborted HTTP: * Define Parser concept in HTTP diff --git a/include/beast/websocket/impl/close_op.ipp b/include/beast/websocket/impl/close_op.ipp index 4573cb41..36af55fc 100644 --- a/include/beast/websocket/impl/close_op.ipp +++ b/include/beast/websocket/impl/close_op.ipp @@ -69,19 +69,11 @@ public: void operator()() { - auto& d = *d_; - d.cont = false; - (*this)(error_code{}, 0, false); - } - - void operator()(error_code const& ec) - { - (*this)(ec, 0); + (*this)(error_code{}, 0, true); } void - operator()(error_code ec, - std::size_t bytes_transferred, bool again = true); + operator()(error_code ec, std::size_t, bool again = true); friend void* asio_handler_allocate( @@ -117,8 +109,8 @@ public: template template void -stream::close_op::operator()( - error_code ec, std::size_t bytes_transferred, bool again) +stream::close_op:: +operator()(error_code ec, std::size_t, bool again) { auto& d = *d_; d.cont = d.cont || again; @@ -144,11 +136,20 @@ stream::close_op::operator()( boost::asio::error::operation_aborted, 0)); return; } - d.state = 2; + d.state = 3; break; // resume case 1: + // VFALCO NOTE Should d.cont be `true` or false here? + // Does this count as a continuation of the original call + // to the asynchronous initiation function (async_close)? + d.state = 2; + d.ws.get_io_service().post(bind_handler( + std::move(*this), ec, 0)); + return; + + case 2: if(d.ws.error_) { // call handler @@ -156,10 +157,10 @@ stream::close_op::operator()( ec = boost::asio::error::operation_aborted; break; } - d.state = 2; + d.state = 3; // VFALCO fall through? break; - case 2: + case 3: // send close d.state = 99; assert(! d.ws.wr_close_); diff --git a/include/beast/websocket/impl/handshake_op.ipp b/include/beast/websocket/impl/handshake_op.ipp index e126c6a0..3afea250 100644 --- a/include/beast/websocket/impl/handshake_op.ipp +++ b/include/beast/websocket/impl/handshake_op.ipp @@ -64,16 +64,11 @@ public: std::forward(h), ws, std::forward(args)...)) { - (*this)(error_code{}, 0, false); + (*this)(error_code{}, false); } - void operator()(error_code const& ec) - { - (*this)(ec, 0); - } - - void operator()(error_code ec, - std::size_t bytes_transferred, bool again = true); + void + operator()(error_code ec, bool again = true); friend void* asio_handler_allocate( @@ -109,9 +104,8 @@ public: template template void -stream::handshake_op< - Handler>::operator()(error_code ec, - std::size_t bytes_transferred, bool again) +stream::handshake_op:: +operator()(error_code ec, bool again) { auto& d = *d_; d.cont = d.cont || again; diff --git a/include/beast/websocket/impl/read_frame_op.ipp b/include/beast/websocket/impl/read_frame_op.ipp index 44c7f39b..aeba1c95 100644 --- a/include/beast/websocket/impl/read_frame_op.ipp +++ b/include/beast/websocket/impl/read_frame_op.ipp @@ -81,9 +81,7 @@ public: void operator()() { - auto& d = *d_; - d.cont = false; - (*this)(error_code{}, 0, false); + (*this)(error_code{}, 0, true); } void operator()(error_code const& ec) @@ -187,7 +185,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) ! d.ws.rd_utf8_check_.finish())) { // invalid utf8 - d.state = 16; + d.state = 18; code = close_code::bad_payload; break; } @@ -215,7 +213,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) if(code != close_code::none) { // protocol error - d.state = 16; + d.state = 18; break; } d.state = 6; @@ -241,7 +239,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) if(code != close_code::none) { // protocol error - d.state = 16; + d.state = 18; break; } if(detail::is_control(d.ws.rd_fh_.op)) @@ -292,7 +290,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) if(code != close_code::none) { // protocol error - d.state = 16; + d.state = 18; break; } d.fb.reset(); @@ -323,7 +321,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) if(code != close_code::none) { // protocol error - d.state = 16; + d.state = 18; break; } d.fb.reset(); @@ -337,7 +335,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) detail::read(d.ws.cr_, d.fb.data(), code); if(code != close_code::none) { - d.state = 16; + d.state = 18; break; } if(! d.ws.wr_close_) @@ -357,7 +355,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) read_frame_op>(std::move(*this)); return; } - d.state = 10; + d.state = 11; break; } // call handler; @@ -368,6 +366,12 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) // resume case 9: + d.state = 10; + d.ws.get_io_service().post(bind_handler( + std::move(*this), ec, bytes_transferred)); + return; + + case 10: if(d.ws.error_) { // call handler @@ -382,12 +386,12 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) ec = error::closed; break; } - d.state = 10; + d.state = 11; break; // send close - case 10: - d.state = 11; + case 11: + d.state = 12; assert(! d.ws.wr_block_); d.ws.wr_block_ = &d; boost::asio::async_write(d.ws.stream_, @@ -395,20 +399,26 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) return;; // teardown - case 11: - d.state = 12; + case 12: + d.state = 13; websocket_helpers::call_async_teardown( d.ws.next_layer(), std::move(*this)); return; - case 12: + case 13: // call handler d.state = 99; ec = error::closed; break; // resume - case 13: + case 14: + d.state = 15; + d.ws.get_io_service().post(bind_handler( + std::move(*this), ec, bytes_transferred)); + return; + + case 15: if(d.ws.error_) { // call handler @@ -422,12 +432,12 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) d.state = 2; break; } - d.state = 14; + d.state = 16; break; - case 14: + case 16: // write ping/pong - d.state = 15; + d.state = 17; assert(! d.ws.wr_block_); d.ws.wr_block_ = &d; boost::asio::async_write(d.ws.stream_, @@ -435,14 +445,14 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) return; // sent ping/pong - case 15: + case 17: d.fb.reset(); d.state = 2; d.ws.wr_block_ = nullptr; break; // fail the connection - case 16: + case 18: if(! d.ws.wr_close_) { d.fb.reset(); @@ -451,28 +461,36 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) if(d.ws.wr_block_) { // suspend - d.state = 17; + d.state = 19; d.ws.rd_op_.template emplace< read_frame_op>(std::move(*this)); return; } - d.state = 18; + d.state = 21; break; } - - // resume - case 17: - if(d.ws.wr_close_) - { - d.state = 19; - break; - } - d.state = 18; + d.state = 22; break; - case 18: + // resume + case 19: + d.state = 20; + d.ws.get_io_service().post(bind_handler( + std::move(*this), ec, bytes_transferred)); + return; + + case 20: + if(d.ws.wr_close_) + { + d.state = 22; + break; + } + d.state = 21; + break; + + case 21: // send close - d.state = 19; + d.state = 22; d.ws.wr_close_ = true; assert(! d.ws.wr_block_); d.ws.wr_block_ = &d; @@ -481,13 +499,13 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again) return; // teardown - case 19: - d.state = 20; + case 22: + d.state = 23; websocket_helpers::call_async_teardown( d.ws.next_layer(), std::move(*this)); return; - case 20: + case 23: // call handler d.state = 99; ec = error::failed; diff --git a/include/beast/websocket/impl/teardown.ipp b/include/beast/websocket/impl/teardown.ipp index 41daef59..fa7133b2 100644 --- a/include/beast/websocket/impl/teardown.ipp +++ b/include/beast/websocket/impl/teardown.ipp @@ -56,8 +56,7 @@ public: } void - operator()( - error_code ec, std::size_t, bool again = true); + operator()(error_code ec, std::size_t, bool again = true); friend void* asio_handler_allocate(std::size_t size, diff --git a/include/beast/websocket/impl/write_frame_op.ipp b/include/beast/websocket/impl/write_frame_op.ipp index 7cc100ad..cf98aab0 100644 --- a/include/beast/websocket/impl/write_frame_op.ipp +++ b/include/beast/websocket/impl/write_frame_op.ipp @@ -57,9 +57,9 @@ class stream::write_frame_op opcode::cont : ws.wr_opcode_; ws.wr_cont_ = ! fin; fh.fin = fin; - fh.rsv1 = 0; - fh.rsv2 = 0; - fh.rsv3 = 0; + fh.rsv1 = false; + fh.rsv2 = false; + fh.rsv3 = false; fh.len = boost::asio::buffer_size(cb); fh.mask = ws.role_ == role_type::client; if(fh.mask) @@ -105,9 +105,7 @@ public: void operator()() { - auto& d = *d_; - d.cont = false; - (*this)(error_code{}, 0, false); + (*this)(error_code{}, 0, true); } void operator()(error_code ec, @@ -179,11 +177,17 @@ operator()( return; } assert(! d.ws.wr_close_); - d.state = 2; + d.state = 3; break; // resume case 1: + d.state = 2; + d.ws.get_io_service().post(bind_handler( + std::move(*this), ec, bytes_transferred)); + return; + + case 2: if(d.ws.error_) { // call handler @@ -191,10 +195,10 @@ operator()( ec = boost::asio::error::operation_aborted; break; } - d.state = 2; + d.state = 3; break; - case 2: + case 3: { if(! d.fh.mask) { @@ -215,7 +219,7 @@ operator()( d.remain -= n; detail::mask_inplace(mb, d.key); // send header and payload - d.state = d.remain > 0 ? 3 : 99; + d.state = d.remain > 0 ? 4 : 99; assert(! d.ws.wr_block_); d.ws.wr_block_ = &d; boost::asio::async_write(d.ws.stream_, @@ -225,7 +229,7 @@ operator()( } // sent masked payload - case 3: + case 4: { auto const n = detail::clamp(d.remain, d.tmp_size);