diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d781ce1..d189cb04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +Version 217: + +* websocket idle pings + +-------------------------------------------------------------------------------- + Version 216: * Refactor websocket::stream operations diff --git a/include/boost/beast/websocket/impl/ping.hpp b/include/boost/beast/websocket/impl/ping.hpp index e035d18a..15c5eb3e 100644 --- a/include/boost/beast/websocket/impl/ping.hpp +++ b/include/boost/beast/websocket/impl/ping.hpp @@ -109,100 +109,96 @@ public: //------------------------------------------------------------------------------ -#if 0 -template< - class NextLayer, - bool deflateSupported, - class Handler> -void -async_auto_ping( - stream& ws, - Handler&& handler) +// sends the idle ping +template +template +class stream::idle_ping_op + : public net::coroutine + , public boost::empty_value { - using handler_type = - typename std::decay::type; + boost::weak_ptr wp_; + std::unique_ptr fb_; - using base_type = - beast::stable_async_op_base< - handler_type, beast::executor_type< - stream>>; +public: + static constexpr int id = 4; // for soft_mutex - struct async_op : base_type, net::coroutine + using executor_type = Executor; + + executor_type + get_executor() const noexcept { - boost::weak_ptr impl_; - detail::frame_buffer& fb_; + return this->get(); + } - public: - static constexpr int id = 4; // for soft_mutex - - async_op( - Handler&& h, - stream& ws) - : base_type(std::move(h), ws.get_executor()) - , impl_(ws.impl_) - , fb_(beast::allocate_stable< - detail::frame_buffer>(*this)) + idle_ping_op( + boost::shared_ptr const& sp, + Executor const& ex) + : boost::empty_value( + boost::empty_init_t{}, ex) + , wp_(sp) + , fb_(new detail::frame_buffer) + { + if(! sp->idle_pinging) { - // Serialize the ping or pong frame - ping_data payload; - ws.template write_ping< - flat_static_buffer_base>(fb_, op, payload); - (*this)({}, 0, false); + // Create the ping frame + ping_data payload; // empty for now + sp->template write_ping< + flat_static_buffer_base>(*fb_, + detail::opcode::ping, payload); + + sp->idle_pinging = true; + (*this)({}, 0); } - - void operator()( - error_code ec = {}, - std::size_t bytes_transferred = 0, - bool cont = true) + else { - boost::ignore_unused(bytes_transferred); - auto sp = impl_.lock(); - if(! sp) - return; + // if we are already in the middle of sending + // an idle ping, don't bother sending another. + } + } - auto& impl = *ws_.impl_; - BOOST_ASIO_CORO_REENTER(*this) + void operator()( + error_code ec = {}, + std::size_t bytes_transferred = 0) + { + boost::ignore_unused(bytes_transferred); + auto sp = wp_.lock(); + if(! sp) + return; + auto& impl = *sp; + BOOST_ASIO_CORO_REENTER(*this) + { + // Acquire the write lock + if(! impl.wr_block.try_lock(this)) { - // Acquire the write lock - if(! impl.wr_block.try_lock(this)) - { - BOOST_ASIO_CORO_YIELD - impl.op_idle_ping.emplace(std::move(*this)); - impl.wr_block.lock(this); - BOOST_ASIO_CORO_YIELD - net::post(std::move(*this)); - BOOST_ASSERT(impl.wr_block.is_locked(this)); - } - if(impl.check_stop_now(ec)) - goto upcall; - - // Send ping frame BOOST_ASIO_CORO_YIELD - net::async_write(impl.stream, fb_.data(), - beast::detail::bind_continuation(std::move(*this))); - if(impl.check_stop_now(ec)) - goto upcall; - - upcall: - impl.wr_block.unlock(this); - impl.op_close.maybe_invoke() - || impl.op_ping.maybe_invoke() - || impl.op_rd.maybe_invoke() - || impl.op_wr.maybe_invoke(); - if(! cont) - { - BOOST_ASIO_CORO_YIELD - net::post(bind_front_handler( - std::move(*this), ec)); - } - this->invoke(ec); + impl.op_idle_ping.emplace(std::move(*this)); + impl.wr_block.lock(this); + BOOST_ASIO_CORO_YIELD + net::post(this->get(), std::move(*this)); + BOOST_ASSERT(impl.wr_block.is_locked(this)); } - } - }; + if(impl.check_stop_now(ec)) + goto upcall; - async_op op(ws, std::forward(handler)); -} -#endif + // Send ping frame + BOOST_ASIO_CORO_YIELD + net::async_write(impl.stream, fb_->data(), + //beast::detail::bind_continuation(std::move(*this))); + std::move(*this)); + if(impl.check_stop_now(ec)) + goto upcall; + + upcall: + BOOST_ASSERT(sp->idle_pinging); + sp->idle_pinging = false; + impl.wr_block.unlock(this); + impl.op_close.maybe_invoke() + || impl.op_ping.maybe_invoke() + || impl.op_rd.maybe_invoke() + || impl.op_wr.maybe_invoke(); + } + } +}; //------------------------------------------------------------------------------ diff --git a/include/boost/beast/websocket/impl/stream_impl.hpp b/include/boost/beast/websocket/impl/stream_impl.hpp index 1976b3b7..726330cd 100644 --- a/include/boost/beast/websocket/impl/stream_impl.hpp +++ b/include/boost/beast/websocket/impl/stream_impl.hpp @@ -91,6 +91,7 @@ struct stream::impl_type saved_handler op_r_rd; // paused read op (async read) saved_handler op_r_close; // paused close op (async read) + bool idle_pinging = false; bool secure_prng_ = true; bool ec_delivered = false; bool timed_out = false; @@ -335,6 +336,9 @@ struct stream::impl_type case status::handshake: break; + case status::open: + break; + case status::closing: //BOOST_ASSERT(status_ == status::open); break; @@ -445,7 +449,7 @@ private: using executor_type = Executor; executor_type - get_executor() const + get_executor() const noexcept { return this->get(); } @@ -479,7 +483,7 @@ private: if( impl.timeout_opt.keep_alive_pings && impl.idle_counter < 1) { - // <- send ping + idle_ping_op(sp, get_executor()); ++impl.idle_counter; impl.timer.expires_after( diff --git a/include/boost/beast/websocket/stream.hpp b/include/boost/beast/websocket/stream.hpp index 62bc50a5..87936683 100644 --- a/include/boost/beast/websocket/stream.hpp +++ b/include/boost/beast/websocket/stream.hpp @@ -2603,7 +2603,7 @@ private: template class close_op; template class handshake_op; template class ping_op; - template class auto_ping_op; + template class idle_ping_op; template class read_some_op; template class read_op; template class response_op; diff --git a/test/beast/websocket/accept.cpp b/test/beast/websocket/accept.cpp index 41430c3b..5a317e17 100644 --- a/test/beast/websocket/accept.cpp +++ b/test/beast/websocket/accept.cpp @@ -15,9 +15,6 @@ #include #include "test.hpp" -#include -#include - namespace boost { namespace beast { namespace websocket { diff --git a/test/beast/websocket/handshake.cpp b/test/beast/websocket/handshake.cpp index 08dc8f72..0c38b851 100644 --- a/test/beast/websocket/handshake.cpp +++ b/test/beast/websocket/handshake.cpp @@ -238,96 +238,6 @@ public: ); } - void - testTimeout() - { - using tcp = net::ip::tcp; - - net::io_context ioc; - - // success - - { - stream ws1(ioc); - stream ws2(ioc); - test::connect(ws1.next_layer(), ws2.next_layer()); - - ws1.async_handshake("test", "/", test::success_handler()); - ws2.async_accept(test::success_handler()); - test::run_for(ioc, std::chrono::seconds(1)); - } - - { - stream ws1(ioc); - stream ws2(ioc); - test::connect(ws1.next_layer(), ws2.next_layer()); - - ws1.async_handshake("test", "/", test::success_handler()); - ws2.async_accept(test::success_handler()); - test::run_for(ioc, std::chrono::seconds(1)); - } - - // success, timeout enabled - - { - stream ws1(ioc); - stream ws2(ioc); - test::connect(ws1.next_layer(), ws2.next_layer()); - - ws1.set_option(stream_base::timeout{ - std::chrono::milliseconds(50), - stream_base::none(), - false}); - ws1.async_handshake("test", "/", test::success_handler()); - ws2.async_accept(test::success_handler()); - test::run_for(ioc, std::chrono::seconds(1)); - } - - { - stream ws1(ioc); - stream ws2(ioc); - test::connect(ws1.next_layer(), ws2.next_layer()); - - ws1.set_option(stream_base::timeout{ - std::chrono::milliseconds(50), - stream_base::none(), - false}); - ws1.async_handshake("test", "/", test::success_handler()); - ws2.async_accept(test::success_handler()); - test::run_for(ioc, std::chrono::seconds(1)); - } - - // timeout - - { - stream ws1(ioc); - stream ws2(ioc); - test::connect(ws1.next_layer(), ws2.next_layer()); - - ws1.set_option(stream_base::timeout{ - std::chrono::milliseconds(50), - stream_base::none(), - false}); - ws1.async_handshake("test", "/", - test::fail_handler(beast::error::timeout)); - test::run_for(ioc, std::chrono::seconds(1)); - } - - { - stream ws1(ioc); - stream ws2(ioc); - test::connect(ws1.next_layer(), ws2.next_layer()); - - ws1.set_option(stream_base::timeout{ - std::chrono::milliseconds(50), - stream_base::none(), - false}); - ws1.async_handshake("test", "/", - test::fail_handler(beast::error::timeout)); - test::run_for(ioc, std::chrono::seconds(1)); - } - } - // Compression Extensions for WebSocket // // https://tools.ietf.org/html/rfc7692 @@ -597,15 +507,117 @@ public: } }; + void + testAsync() + { + using tcp = net::ip::tcp; + + net::io_context ioc; + + // success, no timeout + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + + ws1.async_handshake("test", "/", test::success_handler()); + ws2.async_accept(test::success_handler()); + test::run_for(ioc, std::chrono::seconds(1)); + } + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + + ws1.async_handshake("test", "/", test::success_handler()); + ws2.async_accept(test::success_handler()); + test::run_for(ioc, std::chrono::seconds(1)); + } + + // success, timeout enabled + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + + ws1.set_option(stream_base::timeout{ + std::chrono::milliseconds(50), + stream_base::none(), + false}); + ws1.async_handshake("test", "/", test::success_handler()); + ws2.async_accept(test::success_handler()); + test::run_for(ioc, std::chrono::seconds(1)); + } + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + + ws1.set_option(stream_base::timeout{ + std::chrono::milliseconds(50), + stream_base::none(), + false}); + ws1.async_handshake("test", "/", test::success_handler()); + ws2.async_accept(test::success_handler()); + test::run_for(ioc, std::chrono::seconds(1)); + } + + // timeout + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + + ws1.set_option(stream_base::timeout{ + std::chrono::milliseconds(50), + stream_base::none(), + false}); + ws1.async_handshake("test", "/", + test::fail_handler(beast::error::timeout)); + test::run_for(ioc, std::chrono::seconds(1)); + } + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + + ws1.set_option(stream_base::timeout{ + std::chrono::milliseconds(50), + stream_base::none(), + false}); + ws1.async_handshake("test", "/", + test::fail_handler(beast::error::timeout)); + test::run_for(ioc, std::chrono::seconds(1)); + } + + // abandoned operation + + { + { + stream ws1(ioc); + ws1.async_handshake("test", "/", + test::fail_handler( + net::error::operation_aborted)); + } + test::run(ioc); + } + } + void run() override { testHandshake(); - testTimeout(); testExtRead(); testExtWrite(); testExtNegotiate(); testMoveOnly(); + testAsync(); } }; diff --git a/test/beast/websocket/timer.cpp b/test/beast/websocket/timer.cpp index 1a1e83a3..a67ad58f 100644 --- a/test/beast/websocket/timer.cpp +++ b/test/beast/websocket/timer.cpp @@ -10,24 +10,85 @@ // Test that header file is self-contained. #include +#include +#include +#include +#include #include -#include "test.hpp" - namespace boost { namespace beast { namespace websocket { -class timer_test - : public websocket_test_suite +struct timer_test : unit_test::suite { -public: using tcp = boost::asio::ip::tcp; + void + testIdlePing() + { + net::io_context ioc; + + // idle ping, no timeout + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + ws1.async_accept(test::success_handler()); + ws2.async_handshake("test", "/", test::success_handler()); + test::run(ioc); + + ws2.set_option(stream_base::timeout{ + stream_base::none(), + std::chrono::milliseconds(50), + true}); + flat_buffer b1; + flat_buffer b2; + bool received = false; + ws1.control_callback( + [&received](frame_type ft, string_view) + { + received = true; + BEAST_EXPECT(ft == frame_type::ping); + }); + ws1.async_read(b1, test::fail_handler( + net::error::operation_aborted)); + ws2.async_read(b2, test::fail_handler( + net::error::operation_aborted)); + test::run_for(ioc, std::chrono::milliseconds(100)); + BEAST_EXPECT(received); + } + + test::run(ioc); + + // idle ping, timeout + + { + stream ws1(ioc); + stream ws2(ioc); + test::connect(ws1.next_layer(), ws2.next_layer()); + ws1.async_accept(test::success_handler()); + ws2.async_handshake("test", "/", test::success_handler()); + test::run(ioc); + + ws2.set_option(stream_base::timeout{ + stream_base::none(), + std::chrono::milliseconds(50), + true}); + flat_buffer b; + ws2.async_read(b, + test::fail_handler(beast::error::timeout)); + test::run(ioc); + } + + test::run(ioc); + } + void run() override { - pass(); + testIdlePing(); } };