From dfd08bf6aeb12e5718156b878e6363c00749ab8d Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 20 Jan 2019 12:25:30 -0800 Subject: [PATCH] Refactor websocket::stream operations --- CHANGELOG.md | 6 + .../beast/_experimental/test/impl/stream.hpp | 8 +- include/boost/beast/core/async_op_base.hpp | 2 +- .../core/{ => detail}/bind_continuation.hpp | 43 +- .../beast/websocket/detail/stream_base.hpp | 9 +- include/boost/beast/websocket/impl/accept.hpp | 2 - include/boost/beast/websocket/impl/close.hpp | 363 ++++++------- .../boost/beast/websocket/impl/handshake.hpp | 59 +-- include/boost/beast/websocket/impl/ping.hpp | 106 ++-- include/boost/beast/websocket/impl/read.hpp | 417 +++++++-------- include/boost/beast/websocket/impl/stream.hpp | 2 +- .../beast/websocket/impl/stream_impl.hpp | 209 +++++++- .../boost/beast/websocket/impl/teardown.hpp | 34 +- include/boost/beast/websocket/impl/write.hpp | 491 +++++++++--------- include/boost/beast/websocket/stream.hpp | 28 +- test/beast/core/CMakeLists.txt | 2 +- test/beast/core/Jamfile | 2 +- ...tion.cpp => _detail_bind_continuation.cpp} | 4 +- test/beast/websocket/CMakeLists.txt | 1 + test/beast/websocket/Jamfile | 1 + test/beast/websocket/close.cpp | 16 - test/beast/websocket/ping.cpp | 16 - test/beast/websocket/read2.cpp | 38 -- test/beast/websocket/timer.cpp | 138 +++++ test/beast/websocket/write.cpp | 20 - 25 files changed, 1103 insertions(+), 914 deletions(-) rename include/boost/beast/core/{ => detail}/bind_continuation.hpp (54%) rename test/beast/core/{bind_continuation.cpp => _detail_bind_continuation.cpp} (98%) create mode 100644 test/beast/websocket/timer.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index a11aefca..0f53a582 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +Version 216: + +* Refactor websocket::stream operations + +-------------------------------------------------------------------------------- + Version 215: * basic_stream uses boost::shared_ptr diff --git a/include/boost/beast/_experimental/test/impl/stream.hpp b/include/boost/beast/_experimental/test/impl/stream.hpp index 69044a20..3b710bdd 100644 --- a/include/boost/beast/_experimental/test/impl/stream.hpp +++ b/include/boost/beast/_experimental/test/impl/stream.hpp @@ -335,7 +335,7 @@ read_some(MutableBufferSequence const& buffers, // A request to read 0 bytes from a stream is a no-op. if(buffer_size(buffers) == 0) { - ec.clear(); + ec = {}; return 0; } @@ -481,7 +481,7 @@ write_some( // A request to write 0 bytes to a stream is a no-op. if(buffer_size(buffers) == 0) { - ec.clear(); + ec = {}; return 0; } @@ -591,7 +591,7 @@ teardown( s.in_->fc->fail(ec)) ec = net::error::eof; else - ec.clear(); + ec = {}; } template @@ -613,7 +613,7 @@ async_teardown( s.in_->fc->fail(ec)) ec = net::error::eof; else - ec.clear(); + ec = {}; net::post( s.get_executor(), diff --git a/include/boost/beast/core/async_op_base.hpp b/include/boost/beast/core/async_op_base.hpp index 6188e710..07af546a 100644 --- a/include/boost/beast/core/async_op_base.hpp +++ b/include/boost/beast/core/async_op_base.hpp @@ -210,7 +210,7 @@ public: this parameter is optional and may be omitted. */ #if BOOST_BEAST_DOXYGEN - template + template async_op_base( Handler&& handler, Executor1 const& ex1, diff --git a/include/boost/beast/core/bind_continuation.hpp b/include/boost/beast/core/detail/bind_continuation.hpp similarity index 54% rename from include/boost/beast/core/bind_continuation.hpp rename to include/boost/beast/core/detail/bind_continuation.hpp index ad91107e..7c1befb7 100644 --- a/include/boost/beast/core/bind_continuation.hpp +++ b/include/boost/beast/core/detail/bind_continuation.hpp @@ -7,8 +7,8 @@ // Official repository: https://github.com/boostorg/beast // -#ifndef BOOST_BEAST_BIND_CONTINUATION_HPP -#define BOOST_BEAST_BIND_CONTINUATION_HPP +#ifndef BOOST_BEAST_DETAIL_BIND_CONTINUATION_HPP +#define BOOST_BEAST_DETAIL_BIND_CONTINUATION_HPP #include #include @@ -19,6 +19,7 @@ namespace boost { namespace beast { +namespace detail { /** Mark a completion handler as a continuation. @@ -29,6 +30,43 @@ namespace beast { function represents a continuation of the current asynchronous flow of control. + @param handler The handler to wrap. + + @see + + @li [N4242] Executors and Asynchronous Operations, Revision 1 +*/ +template +#if BOOST_BEAST_DOXYGEN +__implementation_defined__ +#else +net::executor_binder< + typename std::decay::type, + detail::remap_post_to_defer< + net::associated_executor_t>> +#endif +bind_continuation(CompletionHandler&& handler) +{ + return net::bind_executor( + detail::remap_post_to_defer< + net::associated_executor_t>( + net::get_associated_executor(handler)), + std::forward(handler)); +} + +/** Mark a completion handler as a continuation. + + This function wraps a completion handler to associate it with an + executor whose `post` operation is remapped to the `defer` operation. + It is used by composed asynchronous operation implementations to + indicate that a completion handler submitted to an initiating + function represents a continuation of the current asynchronous + flow of control. + + @param ex The executor to use + + @param handler The handler to wrap + @see @li [N4242] Executors and Asynchronous Operations, Revision 1 @@ -49,6 +87,7 @@ bind_continuation( std::forward(handler)); } +} // detail } // beast } // boost diff --git a/include/boost/beast/websocket/detail/stream_base.hpp b/include/boost/beast/websocket/detail/stream_base.hpp index a14f69bc..9bd982bb 100644 --- a/include/boost/beast/websocket/detail/stream_base.hpp +++ b/include/boost/beast/websocket/detail/stream_base.hpp @@ -494,7 +494,13 @@ struct impl_base struct stream_base { protected: - bool secure_prng_ = true; + enum class status + { + open, + closing, + closed, + failed + }; std::uint32_t create_mask() @@ -505,6 +511,7 @@ protected: return key; } + bool secure_prng_ = true; }; } // detail diff --git a/include/boost/beast/websocket/impl/accept.hpp b/include/boost/beast/websocket/impl/accept.hpp index 65828788..5cfe5774 100644 --- a/include/boost/beast/websocket/impl/accept.hpp +++ b/include/boost/beast/websocket/impl/accept.hpp @@ -547,7 +547,6 @@ async_accept( BOOST_BEAST_HANDLER_INIT( AcceptHandler, void(error_code)); impl_->reset(); - using net::asio_handler_is_continuation; response_op< BOOST_ASIO_HANDLER_TYPE( AcceptHandler, void(error_code))>{ @@ -579,7 +578,6 @@ async_accept_ex( BOOST_BEAST_HANDLER_INIT( AcceptHandler, void(error_code)); impl_->reset(); - using net::asio_handler_is_continuation; response_op< BOOST_ASIO_HANDLER_TYPE( AcceptHandler, void(error_code))>{ diff --git a/include/boost/beast/websocket/impl/close.hpp b/include/boost/beast/websocket/impl/close.hpp index 81ef98b3..653dcdfe 100644 --- a/include/boost/beast/websocket/impl/close.hpp +++ b/include/boost/beast/websocket/impl/close.hpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -39,25 +39,9 @@ class stream::close_op Handler, beast::executor_type> , public net::coroutine { - struct state - { - stream& ws; - detail::frame_buffer fb; - error_code ev; - bool cont = false; - - state( - stream& ws_, - close_reason const& cr) - : ws(ws_) - { - // Serialize the close frame - ws.template write_close< - flat_static_buffer_base>(fb, cr); - } - }; - - state& d_; + stream& ws_; + error_code ev_; + detail::frame_buffer& fb_; public: static constexpr int id = 4; // for soft_mutex @@ -70,9 +54,13 @@ public: : stable_async_op_base< Handler, beast::executor_type>( std::forward(h), ws.get_executor()) - , d_(beast::allocate_stable( - *this, ws, cr)) + , ws_(ws) + , fb_(beast::allocate_stable(*this)) { + // Serialize the close frame + ws.template write_close< + flat_static_buffer_base>(fb_, cr); + (*this)({}, 0, false); } void @@ -82,46 +70,36 @@ public: bool cont = true) { using beast::detail::clamp; - d_.cont = cont; + auto& impl = *ws_.impl_; BOOST_ASIO_CORO_REENTER(*this) { - // Attempt to acquire write block - if(! d_.ws.impl_->wr_block.try_lock(this)) + // Acquire the write lock + if(! impl.wr_block.try_lock(this)) { - // Suspend BOOST_ASIO_CORO_YIELD - d_.ws.impl_->paused_close.emplace(std::move(*this)); - - // Acquire the write block - d_.ws.impl_->wr_block.lock(this); - - // Resume + impl.paused_close.emplace(std::move(*this)); + impl.wr_block.lock(this); BOOST_ASIO_CORO_YIELD - net::post( - d_.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); + net::post(std::move(*this)); + BOOST_ASSERT(impl.wr_block.is_locked(this)); } - - // Make sure the stream is open - if(! d_.ws.impl_->check_open(ec)) + if(impl.check_stop_now(ec)) goto upcall; // Can't call close twice - BOOST_ASSERT(! d_.ws.impl_->wr_close); - - // Change status to closing - BOOST_ASSERT(d_.ws.impl_->status_ == status::open); - d_.ws.impl_->status_ = status::closing; + // TODO return a custom error code + BOOST_ASSERT(! impl.wr_close); // Send close frame - d_.ws.impl_->wr_close = true; + impl.wr_close = true; + impl.change_status(status::closing); BOOST_ASIO_CORO_YIELD - net::async_write(d_.ws.impl_->stream, - d_.fb.data(), std::move(*this)); - if(! d_.ws.impl_->check_ok(ec)) + net::async_write(impl.stream, fb_.data(), + beast::detail::bind_continuation(std::move(*this))); + if(impl.check_stop_now(ec)) goto upcall; - if(d_.ws.impl_->rd_close) + if(impl.rd_close) { // This happens when the read_op gets a close frame // at the same time close_op is sending the close frame. @@ -129,101 +107,94 @@ public: goto teardown; } - // Maybe suspend - if(! d_.ws.impl_->rd_block.try_lock(this)) + // Acquire the read lock + if(! impl.rd_block.try_lock(this)) { - // Suspend BOOST_ASIO_CORO_YIELD - d_.ws.impl_->paused_r_close.emplace(std::move(*this)); - - // Acquire the read block - d_.ws.impl_->rd_block.lock(this); - - // Resume + impl.paused_r_close.emplace(std::move(*this)); + impl.rd_block.lock(this); BOOST_ASIO_CORO_YIELD - net::post( - d_.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d_.ws.impl_->rd_block.is_locked(this)); - - // Make sure the stream is open - BOOST_ASSERT(d_.ws.impl_->status_ != status::open); - BOOST_ASSERT(d_.ws.impl_->status_ != status::closed); - if( d_.ws.impl_->status_ == status::failed) + net::post(std::move(*this)); + BOOST_ASSERT(impl.rd_block.is_locked(this)); + if(impl.check_stop_now(ec)) goto upcall; - - BOOST_ASSERT(! d_.ws.impl_->rd_close); + BOOST_ASSERT(! impl.rd_close); } - // Drain - if(d_.ws.impl_->rd_remain > 0) + // Read until a receiving a close frame + // TODO There should be a timeout on this + if(impl.rd_remain > 0) goto read_payload; for(;;) { // Read frame header - while(! d_.ws.parse_fh( - d_.ws.impl_->rd_fh, d_.ws.impl_->rd_buf, d_.ev)) + while(! ws_.parse_fh( + impl.rd_fh, impl.rd_buf, ev_)) { - if(d_.ev) + if(ev_) goto teardown; BOOST_ASIO_CORO_YIELD - d_.ws.impl_->stream.async_read_some( - d_.ws.impl_->rd_buf.prepare(read_size(d_.ws.impl_->rd_buf, - d_.ws.impl_->rd_buf.max_size())), - std::move(*this)); - if(! d_.ws.impl_->check_ok(ec)) + impl.stream.async_read_some( + impl.rd_buf.prepare(read_size( + impl.rd_buf, impl.rd_buf.max_size())), + beast::detail::bind_continuation(std::move(*this))); + impl.rd_buf.commit(bytes_transferred); + if(impl.check_stop_now(ec)) goto upcall; - d_.ws.impl_->rd_buf.commit(bytes_transferred); } - if(detail::is_control(d_.ws.impl_->rd_fh.op)) + if(detail::is_control(impl.rd_fh.op)) { - // Process control frame - if(d_.ws.impl_->rd_fh.op == detail::opcode::close) + // Discard ping or pong frame + if(impl.rd_fh.op != detail::opcode::close) { - BOOST_ASSERT(! d_.ws.impl_->rd_close); - d_.ws.impl_->rd_close = true; - auto const mb = buffers_prefix( - clamp(d_.ws.impl_->rd_fh.len), - d_.ws.impl_->rd_buf.data()); - if(d_.ws.impl_->rd_fh.len > 0 && d_.ws.impl_->rd_fh.mask) - detail::mask_inplace(mb, d_.ws.impl_->rd_key); - detail::read_close(d_.ws.impl_->cr, mb, d_.ev); - if(d_.ev) - goto teardown; - d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_fh.len)); + impl.rd_buf.consume(clamp(impl.rd_fh.len)); + continue; + } + + // Process close frame + // TODO Should we invoke the control callback? + BOOST_ASSERT(! impl.rd_close); + impl.rd_close = true; + auto const mb = buffers_prefix( + clamp(impl.rd_fh.len), + impl.rd_buf.data()); + if(impl.rd_fh.len > 0 && impl.rd_fh.mask) + detail::mask_inplace(mb, impl.rd_key); + detail::read_close(impl.cr, mb, ev_); + if(ev_) goto teardown; - } - d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_fh.len)); + impl.rd_buf.consume(clamp(impl.rd_fh.len)); + goto teardown; } - else + + read_payload: + // Discard message frame + while(impl.rd_buf.size() < impl.rd_remain) { - read_payload: - while(d_.ws.impl_->rd_buf.size() < d_.ws.impl_->rd_remain) - { - d_.ws.impl_->rd_remain -= d_.ws.impl_->rd_buf.size(); - d_.ws.impl_->rd_buf.consume(d_.ws.impl_->rd_buf.size()); - BOOST_ASIO_CORO_YIELD - d_.ws.impl_->stream.async_read_some( - d_.ws.impl_->rd_buf.prepare(read_size(d_.ws.impl_->rd_buf, - d_.ws.impl_->rd_buf.max_size())), - std::move(*this)); - if(! d_.ws.impl_->check_ok(ec)) - goto upcall; - d_.ws.impl_->rd_buf.commit(bytes_transferred); - } - BOOST_ASSERT(d_.ws.impl_->rd_buf.size() >= d_.ws.impl_->rd_remain); - d_.ws.impl_->rd_buf.consume(clamp(d_.ws.impl_->rd_remain)); - d_.ws.impl_->rd_remain = 0; + impl.rd_remain -= impl.rd_buf.size(); + impl.rd_buf.consume(impl.rd_buf.size()); + BOOST_ASIO_CORO_YIELD + impl.stream.async_read_some( + impl.rd_buf.prepare(read_size( + impl.rd_buf, impl.rd_buf.max_size())), + beast::detail::bind_continuation(std::move(*this))); + impl.rd_buf.commit(bytes_transferred); + if(impl.check_stop_now(ec)) + goto upcall; } + BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain); + impl.rd_buf.consume(clamp(impl.rd_remain)); + impl.rd_remain = 0; } teardown: // Teardown - BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); + BOOST_ASSERT(impl.wr_block.is_locked(this)); using beast::websocket::async_teardown; BOOST_ASIO_CORO_YIELD - async_teardown(d_.ws.impl_->role, - d_.ws.impl_->stream, std::move(*this)); - BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); + async_teardown(impl.role, impl.stream, + beast::detail::bind_continuation(std::move(*this))); + BOOST_ASSERT(impl.wr_block.is_locked(this)); if(ec == net::error::eof) { // Rationale: @@ -231,22 +202,21 @@ public: ec = {}; } if(! ec) - ec = d_.ev; + ec = ev_; if(ec) - d_.ws.impl_->status_ = status::failed; + impl.change_status(status::failed); else - d_.ws.impl_->status_ = status::closed; - d_.ws.impl_->close(); + impl.change_status(status::closed); + impl.close(); upcall: - BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); - d_.ws.impl_->wr_block.unlock(this); - if(d_.ws.impl_->rd_block.try_unlock(this)) - d_.ws.impl_->paused_r_rd.maybe_invoke(); - d_.ws.impl_->paused_rd.maybe_invoke() || - d_.ws.impl_->paused_ping.maybe_invoke() || - d_.ws.impl_->paused_wr.maybe_invoke(); - this->invoke(d_.cont, ec); + impl.wr_block.unlock(this); + impl.rd_block.try_unlock(this) + && impl.paused_r_rd.maybe_invoke(); + impl.paused_rd.maybe_invoke() + || impl.paused_ping.maybe_invoke() + || impl.paused_wr.maybe_invoke(); + this->invoke(cont, ec); } } }; @@ -274,87 +244,97 @@ close(close_reason const& cr, error_code& ec) static_assert(is_sync_stream::value, "SyncStream requirements not met"); using beast::detail::clamp; + auto& impl = *impl_; ec = {}; - // Make sure the stream is open - if(! impl_->check_open(ec)) + if(impl.check_stop_now(ec)) return; - // If rd_close_ is set then we already sent a close - BOOST_ASSERT(! impl_->rd_close); - BOOST_ASSERT(! impl_->wr_close); - impl_->wr_close = true; + BOOST_ASSERT(! impl.rd_close); + + // Can't call close twice + // TODO return a custom error code + BOOST_ASSERT(! impl.wr_close); + + // Send close frame { + impl.wr_close = true; + impl.change_status(status::closing); detail::frame_buffer fb; write_close(fb, cr); - net::write(impl_->stream, fb.data(), ec); + net::write(impl.stream, fb.data(), ec); + if(impl.check_stop_now(ec)) + return; } - if(! impl_->check_ok(ec)) - return; - impl_->status_ = status::closing; - error_code result; - // Drain the connection - if(impl_->rd_remain > 0) + + // Read until a receiving a close frame + error_code ev; + if(impl.rd_remain > 0) goto read_payload; for(;;) { // Read frame header while(! parse_fh( - impl_->rd_fh, impl_->rd_buf, result)) + impl.rd_fh, impl.rd_buf, ev)) { - if(result) - return do_fail( - close_code::none, result, ec); - auto const bytes_transferred = - impl_->stream.read_some( - impl_->rd_buf.prepare(read_size(impl_->rd_buf, - impl_->rd_buf.max_size())), ec); - if(! impl_->check_ok(ec)) + if(ev) + { + // Protocol violation + return do_fail(close_code::none, ev, ec); + } + impl.rd_buf.commit(impl.stream.read_some( + impl.rd_buf.prepare(read_size( + impl.rd_buf, impl.rd_buf.max_size())), ec)); + if(impl.check_stop_now(ec)) return; - impl_->rd_buf.commit(bytes_transferred); } - if(detail::is_control(impl_->rd_fh.op)) + + if(detail::is_control(impl.rd_fh.op)) { - // Process control frame - if(impl_->rd_fh.op == detail::opcode::close) + // Discard ping/pong frame + if(impl.rd_fh.op != detail::opcode::close) { - BOOST_ASSERT(! impl_->rd_close); - impl_->rd_close = true; - auto const mb = buffers_prefix( - clamp(impl_->rd_fh.len), - impl_->rd_buf.data()); - if(impl_->rd_fh.len > 0 && impl_->rd_fh.mask) - detail::mask_inplace(mb, impl_->rd_key); - detail::read_close(impl_->cr, mb, result); - if(result) - { - // Protocol violation - return do_fail( - close_code::none, result, ec); - } - impl_->rd_buf.consume(clamp(impl_->rd_fh.len)); - break; + impl.rd_buf.consume(clamp(impl.rd_fh.len)); + continue; } - impl_->rd_buf.consume(clamp(impl_->rd_fh.len)); + + // Handle close frame + // TODO Should we invoke the control callback? + BOOST_ASSERT(! impl.rd_close); + impl.rd_close = true; + auto const mb = buffers_prefix( + clamp(impl.rd_fh.len), + impl.rd_buf.data()); + if(impl.rd_fh.len > 0 && impl.rd_fh.mask) + detail::mask_inplace(mb, impl.rd_key); + detail::read_close(impl.cr, mb, ev); + if(ev) + { + // Protocol violation + return do_fail(close_code::none, ev, ec); + } + impl.rd_buf.consume(clamp(impl.rd_fh.len)); + break; } - else + + read_payload: + // Discard message frame + while(impl.rd_buf.size() < impl.rd_remain) { - read_payload: - while(impl_->rd_buf.size() < impl_->rd_remain) - { - impl_->rd_remain -= impl_->rd_buf.size(); - impl_->rd_buf.consume(impl_->rd_buf.size()); - auto const bytes_transferred = - impl_->stream.read_some( - impl_->rd_buf.prepare(read_size(impl_->rd_buf, - impl_->rd_buf.max_size())), ec); - if(! impl_->check_ok(ec)) - return; - impl_->rd_buf.commit(bytes_transferred); - } - BOOST_ASSERT( - impl_->rd_buf.size() >= impl_->rd_remain); - impl_->rd_buf.consume(clamp(impl_->rd_remain)); - impl_->rd_remain = 0; + impl.rd_remain -= impl.rd_buf.size(); + impl.rd_buf.consume(impl.rd_buf.size()); + impl.rd_buf.commit( + impl.stream.read_some( + impl.rd_buf.prepare( + read_size( + impl.rd_buf, + impl.rd_buf.max_size())), + ec)); + if(impl.check_stop_now(ec)) + return; } + BOOST_ASSERT( + impl.rd_buf.size() >= impl.rd_remain); + impl.rd_buf.consume(clamp(impl.rd_remain)); + impl.rd_remain = 0; } // _Close the WebSocket Connection_ do_fail(close_code::none, error::closed, ec); @@ -374,9 +354,8 @@ async_close(close_reason const& cr, CloseHandler&& handler) BOOST_BEAST_HANDLER_INIT( CloseHandler, void(error_code)); close_op{ - std::move(init.completion_handler), *this, cr}( - {}, 0, false); + CloseHandler, void(error_code))>( + std::move(init.completion_handler), *this, cr); return init.result.get(); } diff --git a/include/boost/beast/websocket/impl/handshake.hpp b/include/boost/beast/websocket/impl/handshake.hpp index 33f8e4f2..2d3bb1af 100644 --- a/include/boost/beast/websocket/impl/handshake.hpp +++ b/include/boost/beast/websocket/impl/handshake.hpp @@ -39,43 +39,35 @@ class stream::handshake_op { struct data { - stream& ws; - response_type* res_p; - detail::sec_ws_key_type key; + // VFALCO This really should be two separate + // composed operations, to save on memory http::request req; response_type res; - - template - data( - stream& ws_, - response_type* res_p_, - string_view host, - string_view target, - Decorator const& decorator) - : ws(ws_) - , res_p(res_p_) - , req(ws.build_request(key, - host, target, decorator)) - { - ws.impl_->reset(); - } }; + stream& ws_; + detail::sec_ws_key_type key_; + response_type* res_p_; data& d_; public: - template< - class Handler_, - class... Args> + template handshake_op( Handler_&& h, - stream& ws, Args&&... args) + stream& ws, + response_type* res_p, + string_view host, string_view target, + Decorator const& decorator) : stable_async_op_base>( std::forward(h), ws.get_executor()) - , d_(beast::allocate_stable( - *this, ws, std::forward(args)...)) + , ws_(ws) + , res_p_(res_p) + , d_(beast::allocate_stable(*this)) { + d_.req = ws_.build_request( + key_, host, target, decorator); + ws_.impl_->reset(); // VFALCO I don't like this } void @@ -84,31 +76,26 @@ public: std::size_t bytes_used = 0) { boost::ignore_unused(bytes_used); - BOOST_ASIO_CORO_REENTER(*this) { // Send HTTP Upgrade - d_.ws.impl_->do_pmd_config(d_.req); + ws_.impl_->do_pmd_config(d_.req); BOOST_ASIO_CORO_YIELD - http::async_write(d_.ws.impl_->stream, + http::async_write(ws_.impl_->stream, d_.req, std::move(*this)); if(ec) goto upcall; - // VFALCO We could pre-serialize the request to - // a single buffer, send that instead, - // and delete the buffer here. - // Read HTTP response BOOST_ASIO_CORO_YIELD - http::async_read(d_.ws.next_layer(), - d_.ws.impl_->rd_buf, d_.res, + http::async_read(ws_.next_layer(), + ws_.impl_->rd_buf, d_.res, std::move(*this)); if(ec) goto upcall; - d_.ws.on_response(d_.res, d_.key, ec); - if(d_.res_p) - swap(d_.res, *d_.res_p); + ws_.on_response(d_.res, key_, ec); + if(res_p_) + swap(d_.res, *res_p_); upcall: this->invoke_now(ec); } diff --git a/include/boost/beast/websocket/impl/ping.hpp b/include/boost/beast/websocket/impl/ping.hpp index 42b8584d..8f995c58 100644 --- a/include/boost/beast/websocket/impl/ping.hpp +++ b/include/boost/beast/websocket/impl/ping.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -35,25 +36,8 @@ class stream::ping_op Handler, beast::executor_type> , public net::coroutine { - struct state - { - stream& ws; - detail::frame_buffer fb; - - state( - stream& ws_, - detail::opcode op, - ping_data const& payload) - : ws(ws_) - { - // Serialize the control frame - ws.template write_ping< - flat_static_buffer_base>( - fb, op, payload); - } - }; - - state& d_; + stream& ws_; + detail::frame_buffer& fb_; public: static constexpr int id = 3; // for soft_mutex @@ -67,65 +51,51 @@ public: : stable_async_op_base< Handler, beast::executor_type>( std::forward(h), ws.get_executor()) - , d_(beast::allocate_stable( - *this, ws, op, payload)) + , ws_(ws) + , fb_(beast::allocate_stable< + detail::frame_buffer>(*this)) { + // Serialize the ping or pong frame + ws.template write_ping< + flat_static_buffer_base>(fb_, op, payload); + (*this)({}, 0, false); } void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0) + std::size_t bytes_transferred = 0, + bool cont = true) { boost::ignore_unused(bytes_transferred); - + auto& impl = *ws_.impl_; BOOST_ASIO_CORO_REENTER(*this) { - // Maybe suspend - if(d_.ws.impl_->wr_block.try_lock(this)) + // Acquire the write lock + if(! impl.wr_block.try_lock(this)) { - // Make sure the stream is open - if(! d_.ws.impl_->check_open(ec)) - { - BOOST_ASIO_CORO_YIELD - net::post( - d_.ws.get_executor(), - beast::bind_front_handler(std::move(*this), ec)); - goto upcall; - } - } - else - { - // Suspend BOOST_ASIO_CORO_YIELD - d_.ws.impl_->paused_ping.emplace(std::move(*this)); - - // Acquire the write block - d_.ws.impl_->wr_block.lock(this); - - // Resume + impl.paused_ping.emplace(std::move(*this)); + impl.wr_block.lock(this); BOOST_ASIO_CORO_YIELD - net::post( - d_.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d_.ws.impl_->wr_block.is_locked(this)); - - // Make sure the stream is open - if(! d_.ws.impl_->check_open(ec)) - goto upcall; + net::post(std::move(*this)); + BOOST_ASSERT(impl.wr_block.is_locked(this)); } + if(impl.check_stop_now(ec)) + goto upcall; // Send ping frame BOOST_ASIO_CORO_YIELD - net::async_write(d_.ws.impl_->stream, - d_.fb.data(), std::move(*this)); - if(! d_.ws.impl_->check_ok(ec)) + net::async_write(impl.stream, fb_.data(), + beast::detail::bind_continuation(std::move(*this))); + if(impl.check_stop_now(ec)) goto upcall; upcall: - d_.ws.impl_->wr_block.unlock(this); - d_.ws.impl_->paused_close.maybe_invoke() || - d_.ws.impl_->paused_rd.maybe_invoke() || - d_.ws.impl_->paused_wr.maybe_invoke(); - this->invoke_now(ec); + impl.wr_block.unlock(this); + impl.paused_close.maybe_invoke() + || impl.paused_rd.maybe_invoke() + || impl.paused_wr.maybe_invoke(); + this->invoke(cont, ec); } } }; @@ -148,14 +118,13 @@ void stream:: ping(ping_data const& payload, error_code& ec) { - // Make sure the stream is open - if(! impl_->check_open(ec)) + if(impl_->check_stop_now(ec)) return; detail::frame_buffer fb; write_ping( fb, detail::opcode::ping, payload); net::write(impl_->stream, fb.data(), ec); - if(! impl_->check_ok(ec)) + if(impl_->check_stop_now(ec)) return; } @@ -175,14 +144,13 @@ void stream:: pong(ping_data const& payload, error_code& ec) { - // Make sure the stream is open - if(! impl_->check_open(ec)) + if(impl_->check_stop_now(ec)) return; detail::frame_buffer fb; write_ping( fb, detail::opcode::pong, payload); net::write(impl_->stream, fb.data(), ec); - if(! impl_->check_ok(ec)) + if(impl_->check_stop_now(ec)) return; } @@ -198,9 +166,9 @@ async_ping(ping_data const& payload, WriteHandler&& handler) BOOST_BEAST_HANDLER_INIT( WriteHandler, void(error_code)); ping_op{ + WriteHandler, void(error_code))>( std::move(init.completion_handler), *this, - detail::opcode::ping, payload}(); + detail::opcode::ping, payload); return init.result.get(); } @@ -216,9 +184,9 @@ async_pong(ping_data const& payload, WriteHandler&& handler) BOOST_BEAST_HANDLER_INIT( WriteHandler, void(error_code)); ping_op{ + WriteHandler, void(error_code))>( std::move(init.completion_handler), *this, - detail::opcode::pong, payload}(); + detail::opcode::pong, payload); return init.result.get(); } diff --git a/include/boost/beast/websocket/impl/read.hpp b/include/boost/beast/websocket/impl/read.hpp index 1860ef87..175b3623 100644 --- a/include/boost/beast/websocket/impl/read.hpp +++ b/include/boost/beast/websocket/impl/read.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -36,7 +37,7 @@ namespace boost { namespace beast { namespace websocket { -/* Read some message frame data. +/* Read some message data into a buffer sequence. Also reads and handles control frames. */ @@ -56,7 +57,6 @@ class stream::read_some_op error_code result_; close_code code_; bool did_read_ = false; - bool cont_ = false; public: static constexpr int id = 1; // for soft_mutex @@ -74,6 +74,7 @@ public: , cb_(bs) , code_(close_code::none) { + (*this)({}, 0, false); } void operator()( @@ -83,12 +84,37 @@ public: { using beast::detail::clamp; auto& impl = *ws_.impl_; - cont_ = cont; BOOST_ASIO_CORO_REENTER(*this) { - // Maybe suspend - do_maybe_suspend: - if(impl.rd_block.try_lock(this)) + acquire_read_lock: + // Acquire the read lock + if(! impl.rd_block.try_lock(this)) + { + do_suspend: + BOOST_ASIO_CORO_YIELD + impl.paused_r_rd.emplace(std::move(*this)); + impl.rd_block.lock(this); + BOOST_ASIO_CORO_YIELD + net::post(std::move(*this)); + BOOST_ASSERT(impl.rd_block.is_locked(this)); + + // VFALCO Is this check correct here? + BOOST_ASSERT(! ec && impl.check_stop_now(ec)); + if(impl.check_stop_now(ec)) + { + BOOST_ASSERT(ec == net::error::operation_aborted); + goto upcall; + } + // VFALCO Should never get here + + // The only way to get read blocked is if + // a `close_op` wrote a close frame + BOOST_ASSERT(impl.wr_close); + BOOST_ASSERT(impl.status_ != status::open); + ec = net::error::operation_aborted; + goto upcall; + } + else { // Make sure the stream is not closed if( impl.status_ == status::closed || @@ -98,29 +124,6 @@ public: goto upcall; } } - else - { - do_suspend: - // Suspend - BOOST_ASIO_CORO_YIELD - impl.paused_r_rd.emplace(std::move(*this)); - - // Acquire the read block - impl.rd_block.lock(this); - - // Resume - BOOST_ASIO_CORO_YIELD - net::post( - ws_.get_executor(), std::move(*this)); - BOOST_ASSERT(impl.rd_block.is_locked(this)); - - // The only way to get read blocked is if - // a `close_op` wrote a close frame - BOOST_ASSERT(impl.wr_close); - BOOST_ASSERT(impl.status_ != status::open); - ec = net::error::operation_aborted; - goto upcall; - } // if status_ == status::closing, we want to suspend // the read operation until the close completes, @@ -155,7 +158,7 @@ public: impl.rd_buf, impl.rd_buf.max_size())), std::move(*this)); BOOST_ASSERT(impl.rd_block.is_locked(this)); - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; impl.rd_buf.commit(bytes_transferred); @@ -189,13 +192,12 @@ public: { if(impl.ctrl_cb) { - if(! cont_) + if(! cont) { BOOST_ASIO_CORO_YIELD - net::post( - ws_.get_executor(), - std::move(*this)); - BOOST_ASSERT(cont_); + net::post(std::move(*this)); + BOOST_ASSERT(cont); + // VFALCO call check_stop_now() here? } } { @@ -224,54 +226,48 @@ public: impl.rd_block.unlock(this); impl.paused_r_close.maybe_invoke(); - // Maybe suspend + // Acquire the write lock if(! impl.wr_block.try_lock(this)) { - // Suspend BOOST_ASIO_CORO_YIELD impl.paused_rd.emplace(std::move(*this)); - - // Acquire the write block impl.wr_block.lock(this); - - // Resume BOOST_ASIO_CORO_YIELD - net::post( - ws_.get_executor(), std::move(*this)); + net::post(std::move(*this)); BOOST_ASSERT(impl.wr_block.is_locked(this)); - - // Make sure the stream is open - if(! impl.check_open(ec)) + if(impl.check_stop_now(ec)) goto upcall; } // Send pong BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASIO_CORO_YIELD - net::async_write(impl.stream, - impl.rd_fb.data(), std::move(*this)); + net::async_write( + impl.stream, impl.rd_fb.data(), + beast::detail::bind_continuation(std::move(*this))); BOOST_ASSERT(impl.wr_block.is_locked(this)); - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; impl.wr_block.unlock(this); impl.paused_close.maybe_invoke() || impl.paused_ping.maybe_invoke() || impl.paused_wr.maybe_invoke(); - goto do_maybe_suspend; + goto acquire_read_lock; } + // Handle pong frame if(impl.rd_fh.op == detail::opcode::pong) { // Ignore pong when closing if(! impl.wr_close && impl.ctrl_cb) { - if(! cont_) + if(! cont) { BOOST_ASIO_CORO_YIELD net::post( ws_.get_executor(), std::move(*this)); - BOOST_ASSERT(cont_); + BOOST_ASSERT(cont); } } auto const cb = buffers_prefix(clamp( @@ -286,18 +282,19 @@ public: impl.ctrl_cb(frame_type::pong, payload); goto loop; } + // Handle close frame BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); { if(impl.ctrl_cb) { - if(! cont_) + if(! cont) { BOOST_ASIO_CORO_YIELD net::post( ws_.get_executor(), std::move(*this)); - BOOST_ASSERT(cont_); + BOOST_ASSERT(cont); } } auto const cb = buffers_prefix(clamp( @@ -358,7 +355,7 @@ public: impl.rd_buf.prepare(read_size( impl.rd_buf, impl.rd_buf.max_size())), std::move(*this)); - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; impl.rd_buf.commit(bytes_transferred); if(impl.rd_fh.mask) @@ -401,7 +398,7 @@ public: BOOST_ASIO_CORO_YIELD impl.stream.async_read_some(buffers_prefix( clamp(impl.rd_remain), cb_), std::move(*this)); - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffers_prefix( @@ -444,7 +441,7 @@ public: impl.rd_buf.prepare(read_size( impl.rd_buf, impl.rd_buf.max_size())), std::move(*this)); - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; BOOST_ASSERT(bytes_transferred > 0); impl.rd_buf.commit(bytes_transferred); @@ -492,7 +489,7 @@ public: if(zs.total_out > 0) ec = error::partial_deflate_block; } - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; impl.do_context_takeover_read(impl.role); impl.rd_done = true; @@ -503,7 +500,7 @@ public: break; } impl.inflate(zs, zlib::Flush::sync, ec); - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; if(impl.rd_msg_max && beast::detail::sum_exceeds( impl.rd_size, zs.total_out, impl.rd_msg_max)) @@ -536,24 +533,16 @@ public: goto upcall; close: - // Try to acquire the write block + // Acquire the write lock if(! impl.wr_block.try_lock(this)) { - // Suspend BOOST_ASIO_CORO_YIELD impl.paused_rd.emplace(std::move(*this)); - - // Acquire the write block impl.wr_block.lock(this); - - // Resume BOOST_ASIO_CORO_YIELD - net::post( - ws_.get_executor(), std::move(*this)); + net::post(std::move(*this)); BOOST_ASSERT(impl.wr_block.is_locked(this)); - - // Make sure the stream is open - if(! impl.check_open(ec)) + if(impl.check_stop_now(ec)) goto upcall; } @@ -573,11 +562,10 @@ public: // Send close frame BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASIO_CORO_YIELD - net::async_write( - impl.stream, impl.rd_fb.data(), - std::move(*this)); + net::async_write(impl.stream, impl.rd_fb.data(), + beast::detail::bind_continuation(std::move(*this))); BOOST_ASSERT(impl.wr_block.is_locked(this)); - if(! impl.check_ok(ec)) + if(impl.check_stop_now(ec)) goto upcall; } @@ -585,8 +573,8 @@ public: using beast::websocket::async_teardown; BOOST_ASSERT(impl.wr_block.is_locked(this)); BOOST_ASIO_CORO_YIELD - async_teardown(impl.role, - impl.stream, std::move(*this)); + async_teardown(impl.role, impl.stream, + beast::detail::bind_continuation(std::move(*this))); BOOST_ASSERT(impl.wr_block.is_locked(this)); if(ec == net::error::eof) { @@ -606,10 +594,10 @@ public: impl.rd_block.try_unlock(this); impl.paused_r_close.maybe_invoke(); if(impl.wr_block.try_unlock(this)) - impl.paused_close.maybe_invoke() || - impl.paused_ping.maybe_invoke() || - impl.paused_wr.maybe_invoke(); - this->invoke(cont_, ec, bytes_written_); + impl.paused_close.maybe_invoke() + || impl.paused_ping.maybe_invoke() + || impl.paused_wr.maybe_invoke(); + this->invoke(cont, ec, bytes_written_); } } }; @@ -648,41 +636,37 @@ public: std::numeric_limits::max)()) , some_(some) { + (*this)({}, 0, false); } void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0) + std::size_t bytes_transferred = 0, + bool cont = true) { using beast::detail::clamp; + boost::optional mb; BOOST_ASIO_CORO_REENTER(*this) { do { - BOOST_ASIO_CORO_YIELD - { - auto mb = beast::detail::dynamic_buffer_prepare(b_, - clamp(ws_.read_size_hint(b_), limit_), - ec, error::buffer_overflow); - if(ec) - net::post( - ws_.get_executor(), - beast::bind_front_handler( - std::move(*this), ec, 0)); - else - read_some_op(std::move(*this), ws_, *mb)( - {}, 0, false); - } + mb = beast::detail::dynamic_buffer_prepare(b_, + clamp(ws_.read_size_hint(b_), limit_), + ec, error::buffer_overflow); if(ec) goto upcall; + BOOST_ASIO_CORO_YIELD + ws_.async_read_some(*mb, + beast::detail::bind_continuation(std::move(*this))); b_.commit(bytes_transferred); bytes_written_ += bytes_transferred; + if(ec) + goto upcall; } while(! some_ && ! ws_.is_message_done()); upcall: - this->invoke_now(ec, bytes_written_); + this->invoke(cont, ec, bytes_written_); } } }; @@ -743,15 +727,10 @@ async_read(DynamicBuffer& buffer, ReadHandler&& handler) "DynamicBuffer requirements not met"); BOOST_BEAST_HANDLER_INIT( ReadHandler, void(error_code, std::size_t)); - read_op< - DynamicBuffer, - BOOST_ASIO_HANDLER_TYPE( - ReadHandler, void(error_code, std::size_t))>{ - std::move(init.completion_handler), - *this, - buffer, - 0, - false}(); + read_op( + std::move(init.completion_handler), + *this, buffer, 0, false); return init.result.get(); } @@ -824,15 +803,10 @@ async_read_some( "DynamicBuffer requirements not met"); BOOST_BEAST_HANDLER_INIT( ReadHandler, void(error_code, std::size_t)); - read_op< - DynamicBuffer, - BOOST_ASIO_HANDLER_TYPE( - ReadHandler, void(error_code, std::size_t))>{ - std::move(init.completion_handler), - *this, - buffer, - limit, - true}({}, 0); + read_op( + std::move(init.completion_handler), + *this, buffer, limit, true); return init.result.get(); } @@ -871,23 +845,24 @@ read_some( MutableBufferSequence>::value, "MutableBufferSequence requirements not met"); using beast::detail::clamp; + auto& impl = *impl_; close_code code{}; std::size_t bytes_written = 0; ec = {}; // Make sure the stream is open - if(! impl_->check_open(ec)) - return 0; + if(impl.check_stop_now(ec)) + return bytes_written; loop: // See if we need to read a frame header. This // condition is structured to give the decompressor // a chance to emit the final empty deflate block // - if(impl_->rd_remain == 0 && ( - ! impl_->rd_fh.fin || impl_->rd_done)) + if(impl.rd_remain == 0 && ( + ! impl.rd_fh.fin || impl.rd_done)) { // Read frame header error_code result; - while(! parse_fh(impl_->rd_fh, impl_->rd_buf, result)) + while(! parse_fh(impl.rd_fh, impl.rd_buf, result)) { if(result) { @@ -900,68 +875,68 @@ loop: return bytes_written; } auto const bytes_transferred = - impl_->stream.read_some( - impl_->rd_buf.prepare(read_size( - impl_->rd_buf, impl_->rd_buf.max_size())), + impl.stream.read_some( + impl.rd_buf.prepare(read_size( + impl.rd_buf, impl.rd_buf.max_size())), ec); - if(! impl_->check_ok(ec)) + impl.rd_buf.commit(bytes_transferred); + if(impl.check_stop_now(ec)) return bytes_written; - impl_->rd_buf.commit(bytes_transferred); } // Immediately apply the mask to the portion // of the buffer holding payload data. - if(impl_->rd_fh.len > 0 && impl_->rd_fh.mask) + if(impl.rd_fh.len > 0 && impl.rd_fh.mask) detail::mask_inplace(buffers_prefix( - clamp(impl_->rd_fh.len), impl_->rd_buf.data()), - impl_->rd_key); - if(detail::is_control(impl_->rd_fh.op)) + clamp(impl.rd_fh.len), impl.rd_buf.data()), + impl.rd_key); + if(detail::is_control(impl.rd_fh.op)) { // Get control frame payload auto const b = buffers_prefix( - clamp(impl_->rd_fh.len), impl_->rd_buf.data()); + clamp(impl.rd_fh.len), impl.rd_buf.data()); auto const len = buffer_size(b); - BOOST_ASSERT(len == impl_->rd_fh.len); + BOOST_ASSERT(len == impl.rd_fh.len); // Clear this otherwise the next // frame will be considered final. - impl_->rd_fh.fin = false; + impl.rd_fh.fin = false; // Handle ping frame - if(impl_->rd_fh.op == detail::opcode::ping) + if(impl.rd_fh.op == detail::opcode::ping) { ping_data payload; detail::read_ping(payload, b); - impl_->rd_buf.consume(len); - if(impl_->wr_close) + impl.rd_buf.consume(len); + if(impl.wr_close) { // Ignore ping when closing goto loop; } - if(impl_->ctrl_cb) - impl_->ctrl_cb(frame_type::ping, payload); + if(impl.ctrl_cb) + impl.ctrl_cb(frame_type::ping, payload); detail::frame_buffer fb; write_ping(fb, detail::opcode::pong, payload); - net::write(impl_->stream, fb.data(), ec); - if(! impl_->check_ok(ec)) + net::write(impl.stream, fb.data(), ec); + if(impl.check_stop_now(ec)) return bytes_written; goto loop; } // Handle pong frame - if(impl_->rd_fh.op == detail::opcode::pong) + if(impl.rd_fh.op == detail::opcode::pong) { ping_data payload; detail::read_ping(payload, b); - impl_->rd_buf.consume(len); - if(impl_->ctrl_cb) - impl_->ctrl_cb(frame_type::pong, payload); + impl.rd_buf.consume(len); + if(impl.ctrl_cb) + impl.ctrl_cb(frame_type::pong, payload); goto loop; } // Handle close frame - BOOST_ASSERT(impl_->rd_fh.op == detail::opcode::close); + BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); { - BOOST_ASSERT(! impl_->rd_close); - impl_->rd_close = true; + BOOST_ASSERT(! impl.rd_close); + impl.rd_close = true; close_reason cr; detail::read_close(cr, b, result); if(result) @@ -971,11 +946,11 @@ loop: result, ec); return bytes_written; } - impl_->cr = cr; - impl_->rd_buf.consume(len); - if(impl_->ctrl_cb) - impl_->ctrl_cb(frame_type::close, impl_->cr.reason); - BOOST_ASSERT(! impl_->wr_close); + impl.cr = cr; + impl.rd_buf.consume(len); + if(impl.ctrl_cb) + impl.ctrl_cb(frame_type::close, impl.cr.reason); + BOOST_ASSERT(! impl.wr_close); // _Start the WebSocket Closing Handshake_ do_fail( cr.code == close_code::none ? @@ -985,52 +960,52 @@ loop: return bytes_written; } } - if(impl_->rd_fh.len == 0 && ! impl_->rd_fh.fin) + if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin) { // Empty non-final frame goto loop; } - impl_->rd_done = false; + impl.rd_done = false; } else { ec = {}; } - if(! impl_->rd_deflated()) + if(! impl.rd_deflated()) { - if(impl_->rd_remain > 0) + if(impl.rd_remain > 0) { - if(impl_->rd_buf.size() == 0 && impl_->rd_buf.max_size() > - (std::min)(clamp(impl_->rd_remain), + if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() > + (std::min)(clamp(impl.rd_remain), buffer_size(buffers))) { // Fill the read buffer first, otherwise we // get fewer bytes at the cost of one I/O. - impl_->rd_buf.commit(impl_->stream.read_some( - impl_->rd_buf.prepare(read_size(impl_->rd_buf, - impl_->rd_buf.max_size())), ec)); - if(! impl_->check_ok(ec)) + impl.rd_buf.commit(impl.stream.read_some( + impl.rd_buf.prepare(read_size(impl.rd_buf, + impl.rd_buf.max_size())), ec)); + if(impl.check_stop_now(ec)) return bytes_written; - if(impl_->rd_fh.mask) + if(impl.rd_fh.mask) detail::mask_inplace( - buffers_prefix(clamp(impl_->rd_remain), - impl_->rd_buf.data()), impl_->rd_key); + buffers_prefix(clamp(impl.rd_remain), + impl.rd_buf.data()), impl.rd_key); } - if(impl_->rd_buf.size() > 0) + if(impl.rd_buf.size() > 0) { // Copy from the read buffer. // The mask was already applied. auto const bytes_transferred = net::buffer_copy( - buffers, impl_->rd_buf.data(), - clamp(impl_->rd_remain)); + buffers, impl.rd_buf.data(), + clamp(impl.rd_remain)); auto const mb = buffers_prefix( bytes_transferred, buffers); - impl_->rd_remain -= bytes_transferred; - if(impl_->rd_op == detail::opcode::text) + impl.rd_remain -= bytes_transferred; + if(impl.rd_op == detail::opcode::text) { - if(! impl_->rd_utf8.write(mb) || - (impl_->rd_remain == 0 && impl_->rd_fh.fin && - ! impl_->rd_utf8.finish())) + if(! impl.rd_utf8.write(mb) || + (impl.rd_remain == 0 && impl.rd_fh.fin && + ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, @@ -1039,32 +1014,33 @@ loop: } } bytes_written += bytes_transferred; - impl_->rd_size += bytes_transferred; - impl_->rd_buf.consume(bytes_transferred); + impl.rd_size += bytes_transferred; + impl.rd_buf.consume(bytes_transferred); } else { // Read into caller's buffer - BOOST_ASSERT(impl_->rd_remain > 0); + BOOST_ASSERT(impl.rd_remain > 0); BOOST_ASSERT(buffer_size(buffers) > 0); BOOST_ASSERT(buffer_size(buffers_prefix( - clamp(impl_->rd_remain), buffers)) > 0); + clamp(impl.rd_remain), buffers)) > 0); auto const bytes_transferred = - impl_->stream.read_some(buffers_prefix( - clamp(impl_->rd_remain), buffers), ec); - if(! impl_->check_ok(ec)) + impl.stream.read_some(buffers_prefix( + clamp(impl.rd_remain), buffers), ec); + // VFALCO What if some bytes were written? + if(impl.check_stop_now(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); auto const mb = buffers_prefix( bytes_transferred, buffers); - impl_->rd_remain -= bytes_transferred; - if(impl_->rd_fh.mask) - detail::mask_inplace(mb, impl_->rd_key); - if(impl_->rd_op == detail::opcode::text) + impl.rd_remain -= bytes_transferred; + if(impl.rd_fh.mask) + detail::mask_inplace(mb, impl.rd_key); + if(impl.rd_op == detail::opcode::text) { - if(! impl_->rd_utf8.write(mb) || - (impl_->rd_remain == 0 && impl_->rd_fh.fin && - ! impl_->rd_utf8.finish())) + if(! impl.rd_utf8.write(mb) || + (impl.rd_remain == 0 && impl.rd_fh.fin && + ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, @@ -1073,10 +1049,10 @@ loop: } } bytes_written += bytes_transferred; - impl_->rd_size += bytes_transferred; + impl.rd_size += bytes_transferred; } } - impl_->rd_done = impl_->rd_remain == 0 && impl_->rd_fh.fin; + impl.rd_done = impl.rd_remain == 0 && impl.rd_fh.fin; } else { @@ -1095,14 +1071,14 @@ loop: zs.avail_out = out.size(); BOOST_ASSERT(zs.avail_out > 0); } - if(impl_->rd_remain > 0) + if(impl.rd_remain > 0) { - if(impl_->rd_buf.size() > 0) + if(impl.rd_buf.size() > 0) { // use what's there auto const in = buffers_prefix( - clamp(impl_->rd_remain), beast::buffers_front( - impl_->rd_buf.data())); + clamp(impl.rd_remain), beast::buffers_front( + impl.rd_buf.data())); zs.avail_in = in.size(); zs.next_in = in.data(); } @@ -1110,21 +1086,21 @@ loop: { // read new auto const bytes_transferred = - impl_->stream.read_some( - impl_->rd_buf.prepare(read_size( - impl_->rd_buf, impl_->rd_buf.max_size())), + impl.stream.read_some( + impl.rd_buf.prepare(read_size( + impl.rd_buf, impl.rd_buf.max_size())), ec); - if(! impl_->check_ok(ec)) + if(impl.check_stop_now(ec)) return bytes_written; BOOST_ASSERT(bytes_transferred > 0); - impl_->rd_buf.commit(bytes_transferred); - if(impl_->rd_fh.mask) + impl.rd_buf.commit(bytes_transferred); + if(impl.rd_fh.mask) detail::mask_inplace( - buffers_prefix(clamp(impl_->rd_remain), - impl_->rd_buf.data()), impl_->rd_key); + buffers_prefix(clamp(impl.rd_remain), + impl.rd_buf.data()), impl.rd_key); auto const in = buffers_prefix( - clamp(impl_->rd_remain), buffers_front( - impl_->rd_buf.data())); + clamp(impl.rd_remain), buffers_front( + impl.rd_buf.data())); zs.avail_in = in.size(); zs.next_in = in.data(); did_read = true; @@ -1134,7 +1110,7 @@ loop: break; } } - else if(impl_->rd_fh.fin) + else if(impl.rd_fh.fin) { // append the empty block codes static std::uint8_t constexpr @@ -1142,45 +1118,45 @@ loop: 0x00, 0x00, 0xff, 0xff }; zs.next_in = empty_block; zs.avail_in = sizeof(empty_block); - impl_->inflate(zs, zlib::Flush::sync, ec); + impl.inflate(zs, zlib::Flush::sync, ec); if(! ec) { // https://github.com/madler/zlib/issues/280 if(zs.total_out > 0) ec = error::partial_deflate_block; } - if(! impl_->check_ok(ec)) + if(impl.check_stop_now(ec)) return bytes_written; - impl_->do_context_takeover_read(impl_->role); - impl_->rd_done = true; + impl.do_context_takeover_read(impl.role); + impl.rd_done = true; break; } else { break; } - impl_->inflate(zs, zlib::Flush::sync, ec); - if(! impl_->check_ok(ec)) + impl.inflate(zs, zlib::Flush::sync, ec); + if(impl.check_stop_now(ec)) return bytes_written; - if(impl_->rd_msg_max && beast::detail::sum_exceeds( - impl_->rd_size, zs.total_out, impl_->rd_msg_max)) + if(impl.rd_msg_max && beast::detail::sum_exceeds( + impl.rd_size, zs.total_out, impl.rd_msg_max)) { do_fail(close_code::too_big, error::message_too_big, ec); return bytes_written; } cb.consume(zs.total_out); - impl_->rd_size += zs.total_out; - impl_->rd_remain -= zs.total_in; - impl_->rd_buf.consume(zs.total_in); + impl.rd_size += zs.total_out; + impl.rd_remain -= zs.total_in; + impl.rd_buf.consume(zs.total_in); bytes_written += zs.total_out; } - if(impl_->rd_op == detail::opcode::text) + if(impl.rd_op == detail::opcode::text) { // check utf8 - if(! impl_->rd_utf8.write(beast::buffers_prefix( + if(! impl.rd_utf8.write(beast::buffers_prefix( bytes_written, buffers)) || ( - impl_->rd_done && ! impl_->rd_utf8.finish())) + impl.rd_done && ! impl.rd_utf8.finish())) { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, @@ -1209,9 +1185,8 @@ async_read_some( BOOST_BEAST_HANDLER_INIT( ReadHandler, void(error_code, std::size_t)); read_some_op{ - std::move(init.completion_handler), *this, buffers}( - {}, 0, false); + ReadHandler, void(error_code, std::size_t))>( + std::move(init.completion_handler), *this, buffers); return init.result.get(); } diff --git a/include/boost/beast/websocket/impl/stream.hpp b/include/boost/beast/websocket/impl/stream.hpp index 2585021a..c634abbb 100644 --- a/include/boost/beast/websocket/impl/stream.hpp +++ b/include/boost/beast/websocket/impl/stream.hpp @@ -754,7 +754,7 @@ do_fail( write_close< flat_static_buffer_base>(fb, code); net::write(impl_->stream, fb.data(), ec); - if(! impl_->check_ok(ec)) + if(impl_->check_stop_now(ec)) return; } using beast::websocket::teardown; diff --git a/include/boost/beast/websocket/impl/stream_impl.hpp b/include/boost/beast/websocket/impl/stream_impl.hpp index 90a255a3..5cfd0154 100644 --- a/include/boost/beast/websocket/impl/stream_impl.hpp +++ b/include/boost/beast/websocket/impl/stream_impl.hpp @@ -10,12 +10,16 @@ #ifndef BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP #define BOOST_BEAST_WEBSOCKET_IMPL_STREAM_IMPL_HPP -#include #include +#include +#include #include #include #include #include +#include +#include +#include #include namespace boost { @@ -28,15 +32,8 @@ struct stream::impl_type : std::enable_shared_from_this , detail::impl_base { - using time_point = typename - std::chrono::steady_clock::time_point; - - static constexpr time_point never() - { - return (time_point::max)(); - } - NextLayer stream; // The underlying stream + net::steady_timer timer; // used for timeouts close_reason cr; // set from received close frame control_cb_type ctrl_cb; // control callback @@ -78,9 +75,15 @@ struct stream::impl_type saved_handler paused_r_rd; // paused read op (async read) saved_handler paused_r_close; // paused close op (async read) + boost::optional tm_auto_ping; + bool tm_opt /* true if auto-timeout option is set */ = false; + bool tm_idle; // set to false on incoming frames + time_point::duration tm_dur /* duration of timer */ = std::chrono::seconds(1); + template impl_type(Args&&... args) : stream(std::forward(args)...) + , timer(stream.get_executor().context()) { } @@ -108,12 +111,15 @@ struct stream::impl_type wr_cont = false; wr_buf_size = 0; + tm_idle = false; + this->open_pmd(role); } void close() { + timer.cancel(); wr_buf.reset(); this->close_pmd(); } @@ -136,6 +142,10 @@ struct stream::impl_type wr_block.reset(); rd_block.reset(); cr.code = close_code::none; + tm_idle = false; + + // VFALCO Is this needed? + timer.cancel(); } // Called before each write frame @@ -163,28 +173,185 @@ struct stream::impl_type } } +private: bool - check_open(error_code& ec) + is_timer_set() const { - if(status_ != status::open) + return timer.expiry() == never(); + } + + // returns `true` if we try sending a ping and + // getting a pong before closing an idle stream. + bool + is_auto_ping_enabled() const + { + if(tm_auto_ping.has_value()) + return *tm_auto_ping; + if(role == role_type::server) + return true; + return false; + } + + template + class timeout_handler + : boost::empty_value + { + std::weak_ptr wp_; + + public: + timeout_handler( + Executor const& ex, + std::shared_ptr const& sp) + : boost::empty_value( + boost::empty_init_t{}, ex) + , wp_(sp) { - ec = net::error::operation_aborted; - return false; } - ec = {}; + + using executor_type = Executor; + + executor_type + get_executor() const + { + return this->get(); + } + + void + operator()(error_code ec) + { + // timer canceled? + if(ec == net::error::operation_aborted) + return; + BOOST_ASSERT(! ec); + + // stream destroyed? + auto sp = wp_.lock(); + if(! sp) + return; + auto& impl = *sp; + + close_socket(get_lowest_layer(impl.stream)); +#if 0 + if(! impl.tm_idle) + { + impl.tm_idle = true; + BOOST_VERIFY( + impl.timer.expires_after(impl.tm_dur) == 0); + return impl.timer.async_wait(std::move(*this)); + } + if(impl.is_auto_ping_enabled()) + { + // send ping + } + else + { + // timeout + } +#endif + } + }; +public: + + // called when there is qualified activity + void + activity() + { + tm_idle = false; + } + + // Maintain the expiration timer + template + void + update_timer(Executor const& ex) + { + if(role == role_type::server) + { + if(! is_timer_set()) + { + // turn timer on + timer.expires_after(tm_dur); + timer.async_wait( + timeout_handler( + ex, this->shared_from_this())); + } + } + else if(tm_opt && ! is_timer_set()) + { + // turn timer on + timer.expires_after(tm_dur); + timer.async_wait( + timeout_handler( + ex, this->shared_from_this())); + } + else if(! tm_opt && is_timer_set()) + { + // turn timer off + timer.cancel(); + } + } + + //-------------------------------------------------------------------------- + + bool ec_delivered = false; + + // Determine if an operation should stop and + // deliver an error code to the completion handler. + // + // This function must be called at the beginning + // of every composed operation, and every time a + // composed operation receives an intermediate + // completion. + // + bool + check_stop_now(error_code& ec) + { + // If the stream is closed then abort + if( status_ == status::closed || + status_ == status::failed) + { + //BOOST_ASSERT(ec_delivered); + ec = net::error::operation_aborted; + return true; + } + + // If no error then keep going + if(! ec) + return false; + + // Is this the first error seen? + if(ec_delivered) + { + // No, so abort + ec = net::error::operation_aborted; + return true; + } + + // Deliver the error to the completion handler + ec_delivered = true; + if(status_ != status::closed) + status_ = status::failed; return true; } - bool - check_ok(error_code& ec) + // Change the status of the stream + void + change_status(status new_status) { - if(ec) + switch(new_status) { - if(status_ != status::closed) - status_ = status::failed; - return false; + case status::closing: + BOOST_ASSERT(status_ == status::open); + break; + + case status::failed: + case status::closed: + // this->close(); // Is this right? + break; + + default: + break; } - return true; + status_ = new_status; } }; diff --git a/include/boost/beast/websocket/impl/teardown.hpp b/include/boost/beast/websocket/impl/teardown.hpp index 6e87aef3..65861a56 100644 --- a/include/boost/beast/websocket/impl/teardown.hpp +++ b/include/boost/beast/websocket/impl/teardown.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -50,44 +51,38 @@ public: , s_(s) , role_(role) { + (*this)({}, 0, false); } void operator()( error_code ec = {}, - std::size_t bytes_transferred = 0) + std::size_t bytes_transferred = 0, + bool cont = true) { using tcp = net::ip::tcp; BOOST_ASIO_CORO_REENTER(*this) { nb_ = s_.non_blocking(); s_.non_blocking(true, ec); - if(! ec) - { - if(role_ == role_type::server) - s_.shutdown(tcp::socket::shutdown_send, ec); - } if(ec) - { - BOOST_ASIO_CORO_YIELD - net::post( - s_.get_executor(), - beast::bind_front_handler(std::move(*this), ec, 0)); goto upcall; - } + if(role_ == role_type::server) + s_.shutdown(tcp::socket::shutdown_send, ec); + if(ec) + goto upcall; for(;;) { { char buf[2048]; - s_.read_some( - net::buffer(buf), ec); + s_.read_some(net::buffer(buf), ec); } if(ec == net::error::would_block) { BOOST_ASIO_CORO_YIELD s_.async_wait( net::ip::tcp::socket::wait_read, - std::move(*this)); + beast::detail::bind_continuation(std::move(*this))); continue; } if(ec) @@ -110,6 +105,12 @@ public: goto upcall; s_.close(ec); upcall: + if(! cont) + { + BOOST_ASIO_CORO_YIELD + net::post(bind_front_handler( + std::move(*this), ec)); + } { error_code ignored; s_.non_blocking(nb_, ignored); @@ -173,8 +174,7 @@ async_teardown( "TeardownHandler requirements not met"); detail::teardown_tcp_op::type>(std::forward< - TeardownHandler>(handler), socket, - role)(); + TeardownHandler>(handler), socket, role); } } // websocket diff --git a/include/boost/beast/websocket/impl/write.hpp b/include/boost/beast/websocket/impl/write.hpp index 2a9d822d..9c562a1d 100644 --- a/include/boost/beast/websocket/impl/write.hpp +++ b/include/boost/beast/websocket/impl/write.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,15 @@ class stream::write_some_op Handler, beast::executor_type> , public net::coroutine { + enum + { + do_nomask_nofrag, + do_nomask_frag, + do_mask_nofrag, + do_mask_frag, + do_deflate + }; + stream& ws_; buffers_suffix cb_; detail::frame_header fh_; @@ -69,6 +79,63 @@ public: , cb_(bs) , fin_(fin) { + auto& impl = *ws_.impl_; + + // Set up the outgoing frame header + if(! impl.wr_cont) + { + impl.begin_msg(); + fh_.rsv1 = impl.wr_compress; + } + else + { + fh_.rsv1 = false; + } + fh_.rsv2 = false; + fh_.rsv3 = false; + fh_.op = impl.wr_cont ? + detail::opcode::cont : impl.wr_opcode; + fh_.mask = + impl.role == role_type::client; + + // Choose a write algorithm + if(impl.wr_compress) + { + how_ = do_deflate; + } + else if(! fh_.mask) + { + if(! impl.wr_frag) + { + how_ = do_nomask_nofrag; + } + else + { + BOOST_ASSERT(impl.wr_buf_size != 0); + remain_ = buffer_size(cb_); + if(remain_ > impl.wr_buf_size) + how_ = do_nomask_frag; + else + how_ = do_nomask_nofrag; + } + } + else + { + if(! impl.wr_frag) + { + how_ = do_mask_nofrag; + } + else + { + BOOST_ASSERT(impl.wr_buf_size != 0); + remain_ = buffer_size(cb_); + if(remain_ > impl.wr_buf_size) + how_ = do_mask_frag; + else + how_ = do_mask_nofrag; + } + } + (*this)({}, 0, false); } void operator()( @@ -88,283 +155,210 @@ operator()( bool cont) { using beast::detail::clamp; - enum - { - do_nomask_nofrag, - do_nomask_frag, - do_mask_nofrag, - do_mask_frag, - do_deflate - }; std::size_t n; net::mutable_buffer b; - cont_ = cont; + auto& impl = *ws_.impl_; BOOST_ASIO_CORO_REENTER(*this) { - // Set up the outgoing frame header - if(! ws_.impl_->wr_cont) - { - ws_.impl_->begin_msg(); - fh_.rsv1 = ws_.impl_->wr_compress; - } - else - { - fh_.rsv1 = false; - } - fh_.rsv2 = false; - fh_.rsv3 = false; - fh_.op = ws_.impl_->wr_cont ? - detail::opcode::cont : ws_.impl_->wr_opcode; - fh_.mask = - ws_.impl_->role == role_type::client; - - // Choose a write algorithm - if(ws_.impl_->wr_compress) - { - how_ = do_deflate; - } - else if(! fh_.mask) - { - if(! ws_.impl_->wr_frag) - { - how_ = do_nomask_nofrag; - } - else - { - BOOST_ASSERT(ws_.impl_->wr_buf_size != 0); - remain_ = buffer_size(cb_); - if(remain_ > ws_.impl_->wr_buf_size) - how_ = do_nomask_frag; - else - how_ = do_nomask_nofrag; - } - } - else - { - if(! ws_.impl_->wr_frag) - { - how_ = do_mask_nofrag; - } - else - { - BOOST_ASSERT(ws_.impl_->wr_buf_size != 0); - remain_ = buffer_size(cb_); - if(remain_ > ws_.impl_->wr_buf_size) - how_ = do_mask_frag; - else - how_ = do_mask_nofrag; - } - } - - // Maybe suspend - if(ws_.impl_->wr_block.try_lock(this)) - { - // Make sure the stream is open - if(! ws_.impl_->check_open(ec)) - goto upcall; - } - else + // Acquire the write lock + if(! impl.wr_block.try_lock(this)) { do_suspend: - // Suspend BOOST_ASIO_CORO_YIELD - ws_.impl_->paused_wr.emplace(std::move(*this)); - - // Acquire the write block - ws_.impl_->wr_block.lock(this); - - // Resume + impl.paused_wr.emplace(std::move(*this)); + impl.wr_block.lock(this); BOOST_ASIO_CORO_YIELD - net::post( - ws_.get_executor(), std::move(*this)); - BOOST_ASSERT(ws_.impl_->wr_block.is_locked(this)); - - // Make sure the stream is open - if(! ws_.impl_->check_open(ec)) - goto upcall; + net::post(std::move(*this)); + BOOST_ASSERT(impl.wr_block.is_locked(this)); } + if(impl.check_stop_now(ec)) + goto upcall; //------------------------------------------------------------------ if(how_ == do_nomask_nofrag) { + // send a single frame fh_.fin = fin_; fh_.len = buffer_size(cb_); - ws_.impl_->wr_fb.clear(); + impl.wr_fb.clear(); detail::write( - ws_.impl_->wr_fb, fh_); - ws_.impl_->wr_cont = ! fin_; - // Send frame + impl.wr_fb, fh_); + impl.wr_cont = ! fin_; BOOST_ASIO_CORO_YIELD - net::async_write(ws_.impl_->stream, - buffers_cat(ws_.impl_->wr_fb.data(), cb_), - std::move(*this)); - if(! ws_.impl_->check_ok(ec)) - goto upcall; + net::async_write(impl.stream, + buffers_cat(impl.wr_fb.data(), cb_), + beast::detail::bind_continuation(std::move(*this))); bytes_transferred_ += clamp(fh_.len); + if(impl.check_stop_now(ec)) + goto upcall; goto upcall; } //------------------------------------------------------------------ - else if(how_ == do_nomask_frag) + if(how_ == do_nomask_frag) { + // send multiple frames for(;;) { - n = clamp(remain_, ws_.impl_->wr_buf_size); + n = clamp(remain_, impl.wr_buf_size); fh_.len = n; remain_ -= n; fh_.fin = fin_ ? remain_ == 0 : false; - ws_.impl_->wr_fb.clear(); + impl.wr_fb.clear(); detail::write( - ws_.impl_->wr_fb, fh_); - ws_.impl_->wr_cont = ! fin_; + impl.wr_fb, fh_); + impl.wr_cont = ! fin_; // Send frame BOOST_ASIO_CORO_YIELD - net::async_write( - ws_.impl_->stream, buffers_cat( - ws_.impl_->wr_fb.data(), buffers_prefix( - clamp(fh_.len), cb_)), - std::move(*this)); - if(! ws_.impl_->check_ok(ec)) - goto upcall; - n = clamp(fh_.len); // because yield + net::async_write(impl.stream, buffers_cat( + impl.wr_fb.data(), + buffers_prefix(clamp(fh_.len), cb_)), + beast::detail::bind_continuation(std::move(*this))); + n = clamp(fh_.len); // restore `n` on yield bytes_transferred_ += n; + if(impl.check_stop_now(ec)) + goto upcall; if(remain_ == 0) break; cb_.consume(n); fh_.op = detail::opcode::cont; - // Allow outgoing control frames to - // be sent in between message frames - ws_.impl_->wr_block.unlock(this); - if( ws_.impl_->paused_close.maybe_invoke() || - ws_.impl_->paused_rd.maybe_invoke() || - ws_.impl_->paused_ping.maybe_invoke()) + + // Give up the write lock in between each frame + // so that outgoing control frames might be sent. + impl.wr_block.unlock(this); + if( impl.paused_close.maybe_invoke() || + impl.paused_rd.maybe_invoke() || + impl.paused_ping.maybe_invoke()) { - BOOST_ASSERT(ws_.impl_->wr_block.is_locked()); + BOOST_ASSERT(impl.wr_block.is_locked()); goto do_suspend; } - ws_.impl_->wr_block.lock(this); + impl.wr_block.lock(this); } goto upcall; } //------------------------------------------------------------------ - else if(how_ == do_mask_nofrag) + if(how_ == do_mask_nofrag) { - remain_ = buffer_size(cb_); + // send a single frame using multiple writes + remain_ = beast::buffer_size(cb_); fh_.fin = fin_; fh_.len = remain_; fh_.key = ws_.create_mask(); detail::prepare_key(key_, fh_.key); - ws_.impl_->wr_fb.clear(); + impl.wr_fb.clear(); detail::write( - ws_.impl_->wr_fb, fh_); - n = clamp(remain_, ws_.impl_->wr_buf_size); + impl.wr_fb, fh_); + n = clamp(remain_, impl.wr_buf_size); net::buffer_copy(net::buffer( - ws_.impl_->wr_buf.get(), n), cb_); + impl.wr_buf.get(), n), cb_); detail::mask_inplace(net::buffer( - ws_.impl_->wr_buf.get(), n), key_); + impl.wr_buf.get(), n), key_); remain_ -= n; - ws_.impl_->wr_cont = ! fin_; - // Send frame header and partial payload + impl.wr_cont = ! fin_; + // write frame header and some payload BOOST_ASIO_CORO_YIELD - net::async_write( - ws_.impl_->stream, buffers_cat(ws_.impl_->wr_fb.data(), - net::buffer(ws_.impl_->wr_buf.get(), n)), - std::move(*this)); - if(! ws_.impl_->check_ok(ec)) - goto upcall; + net::async_write(impl.stream, buffers_cat( + impl.wr_fb.data(), + net::buffer(impl.wr_buf.get(), n)), + beast::detail::bind_continuation(std::move(*this))); + // VFALCO What about consuming the buffer on error? bytes_transferred_ += - bytes_transferred - ws_.impl_->wr_fb.size(); + bytes_transferred - impl.wr_fb.size(); + if(impl.check_stop_now(ec)) + goto upcall; while(remain_ > 0) { - cb_.consume(ws_.impl_->wr_buf_size); - n = clamp(remain_, ws_.impl_->wr_buf_size); + cb_.consume(impl.wr_buf_size); + n = clamp(remain_, impl.wr_buf_size); net::buffer_copy(net::buffer( - ws_.impl_->wr_buf.get(), n), cb_); + impl.wr_buf.get(), n), cb_); detail::mask_inplace(net::buffer( - ws_.impl_->wr_buf.get(), n), key_); + impl.wr_buf.get(), n), key_); remain_ -= n; - // Send partial payload + // write more payload BOOST_ASIO_CORO_YIELD - net::async_write(ws_.impl_->stream, - net::buffer(ws_.impl_->wr_buf.get(), n), - std::move(*this)); - if(! ws_.impl_->check_ok(ec)) - goto upcall; + net::async_write(impl.stream, + net::buffer(impl.wr_buf.get(), n), + beast::detail::bind_continuation(std::move(*this))); bytes_transferred_ += bytes_transferred; + if(impl.check_stop_now(ec)) + goto upcall; } goto upcall; } //------------------------------------------------------------------ - else if(how_ == do_mask_frag) + if(how_ == do_mask_frag) { + // send multiple frames for(;;) { - n = clamp(remain_, ws_.impl_->wr_buf_size); + n = clamp(remain_, impl.wr_buf_size); remain_ -= n; fh_.len = n; fh_.key = ws_.create_mask(); fh_.fin = fin_ ? remain_ == 0 : false; detail::prepare_key(key_, fh_.key); net::buffer_copy(net::buffer( - ws_.impl_->wr_buf.get(), n), cb_); + impl.wr_buf.get(), n), cb_); detail::mask_inplace(net::buffer( - ws_.impl_->wr_buf.get(), n), key_); - ws_.impl_->wr_fb.clear(); + impl.wr_buf.get(), n), key_); + impl.wr_fb.clear(); detail::write( - ws_.impl_->wr_fb, fh_); - ws_.impl_->wr_cont = ! fin_; + impl.wr_fb, fh_); + impl.wr_cont = ! fin_; // Send frame BOOST_ASIO_CORO_YIELD - net::async_write(ws_.impl_->stream, - buffers_cat(ws_.impl_->wr_fb.data(), - net::buffer(ws_.impl_->wr_buf.get(), n)), - std::move(*this)); - if(! ws_.impl_->check_ok(ec)) - goto upcall; - n = bytes_transferred - ws_.impl_->wr_fb.size(); + net::async_write(impl.stream, buffers_cat( + impl.wr_fb.data(), + net::buffer(impl.wr_buf.get(), n)), + beast::detail::bind_continuation(std::move(*this))); + n = bytes_transferred - impl.wr_fb.size(); bytes_transferred_ += n; + if(impl.check_stop_now(ec)) + goto upcall; if(remain_ == 0) break; cb_.consume(n); fh_.op = detail::opcode::cont; - // Allow outgoing control frames to - // be sent in between message frames: - ws_.impl_->wr_block.unlock(this); - if( ws_.impl_->paused_close.maybe_invoke() || - ws_.impl_->paused_rd.maybe_invoke() || - ws_.impl_->paused_ping.maybe_invoke()) + // Give up the write lock in between each frame + // so that outgoing control frames might be sent. + impl.wr_block.unlock(this); + if( impl.paused_close.maybe_invoke() || + impl.paused_rd.maybe_invoke() || + impl.paused_ping.maybe_invoke()) { - BOOST_ASSERT(ws_.impl_->wr_block.is_locked()); + BOOST_ASSERT(impl.wr_block.is_locked()); goto do_suspend; } - ws_.impl_->wr_block.lock(this); + impl.wr_block.lock(this); } goto upcall; } //------------------------------------------------------------------ - else if(how_ == do_deflate) + if(how_ == do_deflate) { + // send compressed frames for(;;) { - b = net::buffer(ws_.impl_->wr_buf.get(), - ws_.impl_->wr_buf_size); - more_ = ws_.impl_->deflate(b, cb_, fin_, in_, ec); - if(! ws_.impl_->check_ok(ec)) + b = net::buffer(impl.wr_buf.get(), + impl.wr_buf_size); + more_ = impl.deflate(b, cb_, fin_, in_, ec); + if(impl.check_stop_now(ec)) goto upcall; n = buffer_size(b); if(n == 0) { - // The input was consumed, but there - // is no output due to compression - // latency. + // The input was consumed, but there is + // no output due to compression latency. BOOST_ASSERT(! fin_); BOOST_ASSERT(buffer_size(cb_) == 0); goto upcall; @@ -378,38 +372,38 @@ operator()( } fh_.fin = ! more_; fh_.len = n; - ws_.impl_->wr_fb.clear(); + impl.wr_fb.clear(); detail::write< - flat_static_buffer_base>(ws_.impl_->wr_fb, fh_); - ws_.impl_->wr_cont = ! fin_; + flat_static_buffer_base>(impl.wr_fb, fh_); + impl.wr_cont = ! fin_; // Send frame BOOST_ASIO_CORO_YIELD - net::async_write(ws_.impl_->stream, - buffers_cat(ws_.impl_->wr_fb.data(), b), - std::move(*this)); - if(! ws_.impl_->check_ok(ec)) - goto upcall; + net::async_write(impl.stream, buffers_cat( + impl.wr_fb.data(), b), + beast::detail::bind_continuation(std::move(*this))); bytes_transferred_ += in_; + if(impl.check_stop_now(ec)) + goto upcall; if(more_) { fh_.op = detail::opcode::cont; fh_.rsv1 = false; - // Allow outgoing control frames to - // be sent in between message frames: - ws_.impl_->wr_block.unlock(this); - if( ws_.impl_->paused_close.maybe_invoke() || - ws_.impl_->paused_rd.maybe_invoke() || - ws_.impl_->paused_ping.maybe_invoke()) + // Give up the write lock in between each frame + // so that outgoing control frames might be sent. + impl.wr_block.unlock(this); + if( impl.paused_close.maybe_invoke() || + impl.paused_rd.maybe_invoke() || + impl.paused_ping.maybe_invoke()) { - BOOST_ASSERT(ws_.impl_->wr_block.is_locked()); + BOOST_ASSERT(impl.wr_block.is_locked()); goto do_suspend; } - ws_.impl_->wr_block.lock(this); + impl.wr_block.lock(this); } else { if(fh_.fin) - ws_.impl_->do_context_takeover_write(ws_.impl_->role); + impl.do_context_takeover_write(impl.role); goto upcall; } } @@ -418,11 +412,17 @@ operator()( //-------------------------------------------------------------------------- upcall: - ws_.impl_->wr_block.unlock(this); - ws_.impl_->paused_close.maybe_invoke() || - ws_.impl_->paused_rd.maybe_invoke() || - ws_.impl_->paused_ping.maybe_invoke(); - this->invoke(cont_, ec, bytes_transferred_); + impl.wr_block.unlock(this); + impl.paused_close.maybe_invoke() + || impl.paused_rd.maybe_invoke() + || impl.paused_ping.maybe_invoke(); + if(! cont) + { + BOOST_ASIO_CORO_YIELD + net::post(bind_front_handler( + std::move(*this), ec, bytes_transferred_)); + } + this->invoke(cont, ec, bytes_transferred_); } } @@ -460,16 +460,16 @@ write_some(bool fin, ConstBufferSequence>::value, "ConstBufferSequence requirements not met"); using beast::detail::clamp; + auto& impl = *impl_; std::size_t bytes_transferred = 0; ec = {}; - // Make sure the stream is open - if(! impl_->check_open(ec)) + if(impl.check_stop_now(ec)) return bytes_transferred; detail::frame_header fh; - if(! impl_->wr_cont) + if(! impl.wr_cont) { - impl_->begin_msg(); - fh.rsv1 = impl_->wr_compress; + impl.begin_msg(); + fh.rsv1 = impl.wr_compress; } else { @@ -477,21 +477,22 @@ write_some(bool fin, } fh.rsv2 = false; fh.rsv3 = false; - fh.op = impl_->wr_cont ? - detail::opcode::cont : impl_->wr_opcode; - fh.mask = impl_->role == role_type::client; + fh.op = impl.wr_cont ? + detail::opcode::cont : impl.wr_opcode; + fh.mask = impl.role == role_type::client; auto remain = buffer_size(buffers); - if(impl_->wr_compress) + if(impl.wr_compress) { + buffers_suffix< - ConstBufferSequence> cb{buffers}; + ConstBufferSequence> cb(buffers); for(;;) { auto b = net::buffer( - impl_->wr_buf.get(), impl_->wr_buf_size); - auto const more = impl_->deflate( + impl.wr_buf.get(), impl.wr_buf_size); + auto const more = impl.deflate( b, cb, fin, bytes_transferred, ec); - if(! impl_->check_ok(ec)) + if(impl.check_stop_now(ec)) return bytes_transferred; auto const n = buffer_size(b); if(n == 0) @@ -516,10 +517,10 @@ write_some(bool fin, detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); - impl_->wr_cont = ! fin; - net::write(impl_->stream, + impl.wr_cont = ! fin; + net::write(impl.stream, buffers_cat(fh_buf.data(), b), ec); - if(! impl_->check_ok(ec)) + if(impl.check_stop_now(ec)) return bytes_transferred; if(! more) break; @@ -527,11 +528,11 @@ write_some(bool fin, fh.rsv1 = false; } if(fh.fin) - impl_->do_context_takeover_write(impl_->role); + impl.do_context_takeover_write(impl.role); } else if(! fh.mask) { - if(! impl_->wr_frag) + if(! impl.wr_frag) { // no mask, no autofrag fh.fin = fin; @@ -539,35 +540,35 @@ write_some(bool fin, detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); - impl_->wr_cont = ! fin; - net::write(impl_->stream, + impl.wr_cont = ! fin; + net::write(impl.stream, buffers_cat(fh_buf.data(), buffers), ec); - if(! impl_->check_ok(ec)) + if(impl.check_stop_now(ec)) return bytes_transferred; bytes_transferred += remain; } else { // no mask, autofrag - BOOST_ASSERT(impl_->wr_buf_size != 0); + BOOST_ASSERT(impl.wr_buf_size != 0); buffers_suffix< ConstBufferSequence> cb{buffers}; for(;;) { - auto const n = clamp(remain, impl_->wr_buf_size); + auto const n = clamp(remain, impl.wr_buf_size); remain -= n; fh.len = n; fh.fin = fin ? remain == 0 : false; detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); - impl_->wr_cont = ! fin; - net::write(impl_->stream, + impl.wr_cont = ! fin; + net::write(impl.stream, beast::buffers_cat(fh_buf.data(), beast::buffers_prefix(n, cb)), ec); - if(! impl_->check_ok(ec)) - return bytes_transferred; bytes_transferred += n; + if(impl.check_stop_now(ec)) + return bytes_transferred; if(remain == 0) break; fh.op = detail::opcode::cont; @@ -575,7 +576,7 @@ write_some(bool fin, } } } - else if(! impl_->wr_frag) + else if(! impl.wr_frag) { // mask, no autofrag fh.fin = fin; @@ -590,40 +591,40 @@ write_some(bool fin, ConstBufferSequence> cb{buffers}; { auto const n = - clamp(remain, impl_->wr_buf_size); + clamp(remain, impl.wr_buf_size); auto const b = - net::buffer(impl_->wr_buf.get(), n); + net::buffer(impl.wr_buf.get(), n); net::buffer_copy(b, cb); cb.consume(n); remain -= n; detail::mask_inplace(b, key); - impl_->wr_cont = ! fin; - net::write(impl_->stream, + impl.wr_cont = ! fin; + net::write(impl.stream, buffers_cat(fh_buf.data(), b), ec); - if(! impl_->check_ok(ec)) - return bytes_transferred; bytes_transferred += n; + if(impl.check_stop_now(ec)) + return bytes_transferred; } while(remain > 0) { auto const n = - clamp(remain, impl_->wr_buf_size); + clamp(remain, impl.wr_buf_size); auto const b = - net::buffer(impl_->wr_buf.get(), n); + net::buffer(impl.wr_buf.get(), n); net::buffer_copy(b, cb); cb.consume(n); remain -= n; detail::mask_inplace(b, key); - net::write(impl_->stream, b, ec); - if(! impl_->check_ok(ec)) - return bytes_transferred; + net::write(impl.stream, b, ec); bytes_transferred += n; + if(impl.check_stop_now(ec)) + return bytes_transferred; } } else { // mask, autofrag - BOOST_ASSERT(impl_->wr_buf_size != 0); + BOOST_ASSERT(impl.wr_buf_size != 0); buffers_suffix< ConstBufferSequence> cb(buffers); for(;;) @@ -632,23 +633,23 @@ write_some(bool fin, detail::prepared_key key; detail::prepare_key(key, fh.key); auto const n = - clamp(remain, impl_->wr_buf_size); + clamp(remain, impl.wr_buf_size); auto const b = - net::buffer(impl_->wr_buf.get(), n); + net::buffer(impl.wr_buf.get(), n); net::buffer_copy(b, cb); detail::mask_inplace(b, key); fh.len = n; remain -= n; fh.fin = fin ? remain == 0 : false; - impl_->wr_cont = ! fh.fin; + impl.wr_cont = ! fh.fin; detail::fh_buffer fh_buf; detail::write< flat_static_buffer_base>(fh_buf, fh); - net::write(impl_->stream, + net::write(impl.stream, buffers_cat(fh_buf.data(), b), ec); - if(! impl_->check_ok(ec)) - return bytes_transferred; bytes_transferred += n; + if(impl.check_stop_now(ec)) + return bytes_transferred; if(remain == 0) break; fh.op = detail::opcode::cont; @@ -674,9 +675,8 @@ async_write_some(bool fin, BOOST_BEAST_HANDLER_INIT( WriteHandler, void(error_code, std::size_t)); write_some_op{ - std::move(init.completion_handler), *this, fin, bs}( - {}, 0, false); + WriteHandler, void(error_code, std::size_t))>( + std::move(init.completion_handler), *this, fin, bs); return init.result.get(); } @@ -730,9 +730,8 @@ async_write( BOOST_BEAST_HANDLER_INIT( WriteHandler, void(error_code, std::size_t)); write_some_op{ - std::move(init.completion_handler), *this, true, bs}( - {}, 0, false); + WriteHandler, void(error_code, std::size_t))>( + std::move(init.completion_handler), *this, true, bs); return init.result.get(); } diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index 2d91d14d..f8d7c9c3 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -151,18 +151,18 @@ class stream using control_cb_type = std::function; - enum class status - { - open, - closing, - closed, - failed - }; - struct impl_type; std::shared_ptr impl_; + using time_point = typename + std::chrono::steady_clock::time_point; + + static constexpr time_point never() + { + return (time_point::max)(); + } + public: /// Indicates if the permessage-deflate extension is supported using is_deflate_supported = @@ -579,6 +579,18 @@ public: bool text() const; + /* + timer settings + + * Timer is disabled + * Close on timeout + - no complete frame received, OR + - no complete frame sent + * Ping on timeout + - ping on no complete frame received + * if can't ping? + */ + //-------------------------------------------------------------------------- // // Handshaking (Client) diff --git a/test/beast/core/CMakeLists.txt b/test/beast/core/CMakeLists.txt index b7223c12..3885f08e 100644 --- a/test/beast/core/CMakeLists.txt +++ b/test/beast/core/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable (tests-beast-core stream_tests.hpp test_handler.hpp _detail_base64.cpp + _detail_bind_continuation.cpp _detail_buffer.cpp _detail_clamp.cpp _detail_is_invocable.cpp @@ -31,7 +32,6 @@ add_executable (tests-beast-core _detail_varint.cpp async_op_base.cpp basic_stream.cpp - bind_continuation.cpp bind_handler.cpp buffer_size.cpp buffer_traits.cpp diff --git a/test/beast/core/Jamfile b/test/beast/core/Jamfile index 33e7a201..bda57c47 100644 --- a/test/beast/core/Jamfile +++ b/test/beast/core/Jamfile @@ -9,6 +9,7 @@ local SOURCES = _detail_base64.cpp + _detail_bind_continuation.cpp _detail_buffer.cpp _detail_clamp.cpp _detail_is_invocable.cpp @@ -19,7 +20,6 @@ local SOURCES = _detail_varint.cpp async_op_base.cpp basic_stream.cpp - bind_continuation.cpp bind_handler.cpp buffer_size.cpp buffer_traits.cpp diff --git a/test/beast/core/bind_continuation.cpp b/test/beast/core/_detail_bind_continuation.cpp similarity index 98% rename from test/beast/core/bind_continuation.cpp rename to test/beast/core/_detail_bind_continuation.cpp index f5ef983f..d96dbec4 100644 --- a/test/beast/core/bind_continuation.cpp +++ b/test/beast/core/_detail_bind_continuation.cpp @@ -8,7 +8,7 @@ // // Test that header file is self-contained. -#include +#include #include "test_executor.hpp" #include "test_handler.hpp" @@ -24,6 +24,7 @@ namespace boost { namespace beast { +namespace detail { class bind_continuation_test : public beast::unit_test::suite @@ -182,5 +183,6 @@ public: BEAST_DEFINE_TESTSUITE(beast,core,bind_continuation); +} // detail } // beast } // boost diff --git a/test/beast/websocket/CMakeLists.txt b/test/beast/websocket/CMakeLists.txt index 2a937abe..6beb5279 100644 --- a/test/beast/websocket/CMakeLists.txt +++ b/test/beast/websocket/CMakeLists.txt @@ -35,6 +35,7 @@ add_executable (tests-beast-websocket stream_explicit.cpp stream_fwd.cpp teardown.cpp + timer.cpp utf8_checker.cpp write.cpp ) diff --git a/test/beast/websocket/Jamfile b/test/beast/websocket/Jamfile index 9271a889..c296b7df 100644 --- a/test/beast/websocket/Jamfile +++ b/test/beast/websocket/Jamfile @@ -25,6 +25,7 @@ local SOURCES = stream_explicit.cpp stream_fwd.cpp teardown.cpp + timer.cpp utf8_checker.cpp write.cpp ; diff --git a/test/beast/websocket/close.cpp b/test/beast/websocket/close.cpp index 44474dcd..aa945797 100644 --- a/test/beast/websocket/close.cpp +++ b/test/beast/websocket/close.cpp @@ -608,21 +608,6 @@ public: }); } - void - testContHook() - { - struct handler - { - void operator()(error_code) {} - }; - - stream ws{ioc_}; - stream::close_op op{ - handler{}, ws, {}}; - using net::asio_handler_is_continuation; - asio_handler_is_continuation(&op); - } - void testMoveOnly() { @@ -660,7 +645,6 @@ public: { testClose(); testSuspend(); - testContHook(); testMoveOnly(); testAsioHandlerInvoke(); } diff --git a/test/beast/websocket/ping.cpp b/test/beast/websocket/ping.cpp index 99efe00a..4e05b36d 100644 --- a/test/beast/websocket/ping.cpp +++ b/test/beast/websocket/ping.cpp @@ -423,21 +423,6 @@ public: } } - void - testContHook() - { - struct handler - { - void operator()(error_code) {} - }; - - stream ws{ioc_}; - stream::ping_op op{ - handler{}, ws, detail::opcode::ping, {}}; - using net::asio_handler_is_continuation; - asio_handler_is_continuation(&op); - } - void testMoveOnly() { @@ -475,7 +460,6 @@ public: { testPing(); testSuspend(); - testContHook(); testMoveOnly(); testAsioHandlerInvoke(); } diff --git a/test/beast/websocket/read2.cpp b/test/beast/websocket/read2.cpp index 57b21910..2cd4f64d 100644 --- a/test/beast/websocket/read2.cpp +++ b/test/beast/websocket/read2.cpp @@ -339,43 +339,6 @@ public: bad(string_view("\x81\x7f\x00\x00\x00\x00\x00\x00\xff\xff", 10)); } - void - testContHook() - { - { - struct handler - { - void operator()(error_code, std::size_t) {} - }; - - char buf[32]; - stream ws{ioc_}; - stream::read_some_op< - net::mutable_buffer, - handler> op{handler{}, ws, - net::mutable_buffer{ - buf, sizeof(buf)}}; - using net::asio_handler_is_continuation; - asio_handler_is_continuation(&op); - pass(); - } - { - struct handler - { - void operator()(error_code, std::size_t) {} - }; - - multi_buffer b; - stream ws{ioc_}; - stream::read_op< - multi_buffer, handler> op{ - handler{}, ws, b, 32, true}; - using net::asio_handler_is_continuation; - asio_handler_is_continuation(&op); - pass(); - } - } - void testIssue802() { @@ -675,7 +638,6 @@ public: run() override { testParseFrame(); - testContHook(); testIssue802(); testIssue807(); testIssue954(); diff --git a/test/beast/websocket/timer.cpp b/test/beast/websocket/timer.cpp new file mode 100644 index 00000000..df033aa4 --- /dev/null +++ b/test/beast/websocket/timer.cpp @@ -0,0 +1,138 @@ + +// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +// Test that header file is self-contained. +#include + +#include +#include + +#include "test.hpp" + +namespace boost { +namespace beast { +namespace websocket { + +class timer_test + : public websocket_test_suite +{ +public: + using tcp = boost::asio::ip::tcp; + + static + void + connect( + stream& ws1, + stream& ws2) + { + struct handler + { + void + operator()(error_code ec) + { + BEAST_EXPECTS(! ec, ec.message()); + } + }; + + tcp::acceptor a(ws1.get_executor().context()); + auto ep = tcp::endpoint( + net::ip::make_address_v4("127.0.0.1"), 0); + a.open(ep.protocol()); + a.set_option( + net::socket_base::reuse_address(true)); + a.bind(ep); + a.listen(0); + ep = a.local_endpoint(); + a.async_accept(ws2.next_layer(), handler{}); + ws1.next_layer().async_connect(ep, handler{}); + for(;;) + { + if( ws1.get_executor().context().run_one() + + ws2.get_executor().context().run_one() == 0) + { + ws1.get_executor().context().restart(); + ws2.get_executor().context().restart(); + break; + } + } + BEAST_EXPECT( + ws1.next_layer().remote_endpoint() == + ws2.next_layer().local_endpoint()); + BEAST_EXPECT( + ws2.next_layer().remote_endpoint() == + ws1.next_layer().local_endpoint()); + ws2.async_accept(handler{}); + ws1.async_handshake("test", "/", handler{}); + for(;;) + { + if( ws1.get_executor().context().run_one() + + ws2.get_executor().context().run_one() == 0) + { + ws1.get_executor().context().restart(); + ws2.get_executor().context().restart(); + break; + } + } + BEAST_EXPECT(ws1.is_open()); + BEAST_EXPECT(ws2.is_open()); + BEAST_EXPECT(! ws1.get_executor().context().stopped()); + BEAST_EXPECT(! ws2.get_executor().context().stopped()); + } + +#if 0 + void + testRead0() + { + echo_server es(log, kind::async); + + net::io_context ioc; + stream ws(ioc); + ws.next_layer().connect(es.stream()); + + ws.handshake("localhost", "/"); + flat_buffer b; + ws.async_read(b, + [&](error_code ec, std::size_t) + { + log << ec.message() << "\n"; + }); + ioc.run(); + } +#endif + + void + testRead() + { + net::io_context ioc; + stream ws1(ioc); + stream ws2(ioc); + connect(ws1, ws2); + flat_buffer b; + + ws2.async_read(b, + [&](error_code ec, std::size_t) + { + log << "ws1.async_read: " << ec.message() << "\n"; + }); + ioc.run(); + } + + void + run() override + { + testRead(); + pass(); + } +}; + +BEAST_DEFINE_TESTSUITE(beast,websocket,timer); + +} // websocket +} // beast +} // boost diff --git a/test/beast/websocket/write.cpp b/test/beast/websocket/write.cpp index ad72db36..20887192 100644 --- a/test/beast/websocket/write.cpp +++ b/test/beast/websocket/write.cpp @@ -605,25 +605,6 @@ public: } } - void - testContHook() - { - struct handler - { - void operator()(error_code) {} - }; - - char buf[32]; - stream ws{ioc_}; - stream::write_some_op< - net::const_buffer, - handler> op{handler{}, ws, true, - net::const_buffer{ - buf, sizeof(buf)}}; - using net::asio_handler_is_continuation; - asio_handler_is_continuation(&op); - } - void testMoveOnly() { @@ -669,7 +650,6 @@ public: testWriteSuspend(); testAsyncWriteFrame(); testIssue300(); - testContHook(); testMoveOnly(); } };