diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f277532..f875b2b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Version 213: * Use timeouts in HTTP client examples * Use tcp_stream in WebSocket client examples * Use tcp_stream in WebSocket server examples +* Use tcp_stream, HTTP timeouts in advanced servers -------------------------------------------------------------------------------- diff --git a/example/advanced/server-flex/advanced_server_flex.cpp b/example/advanced/server-flex/advanced_server_flex.cpp index b6b0d375..7c9f74b7 100644 --- a/example/advanced/server-flex/advanced_server_flex.cpp +++ b/example/advanced/server-flex/advanced_server_flex.cpp @@ -22,11 +22,9 @@ #include #include #include -#include #include -#include -#include #include +#include #include #include #include @@ -244,16 +242,13 @@ class websocket_session char ping_state_ = 0; protected: - net::strand< - net::io_context::executor_type> strand_; net::steady_timer timer_; public: // Construct the session explicit websocket_session(net::io_context& ioc) - : strand_(ioc.get_executor()) - , timer_(ioc, + : timer_(ioc, (std::chrono::steady_clock::time_point::max)()) { } @@ -272,18 +267,15 @@ public: std::placeholders::_1, std::placeholders::_2)); - // Set the timer - timer_.expires_after(std::chrono::seconds(15)); + // VFALCO What about the timer? // Accept the websocket handshake derived().ws().async_accept( req, - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_accept, - derived().shared_from_this(), - std::placeholders::_1))); + std::bind( + &websocket_session::on_accept, + derived().shared_from_this(), + std::placeholders::_1)); } void @@ -322,12 +314,10 @@ public: // Now send the ping derived().ws().async_ping({}, - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_ping, - derived().shared_from_this(), - std::placeholders::_1))); + std::bind( + &websocket_session::on_ping, + derived().shared_from_this(), + std::placeholders::_1)); } else { @@ -343,7 +333,7 @@ public: // Wait on the timer timer_.async_wait( net::bind_executor( - strand_, + derived().ws().get_executor(), // use the strand std::bind( &websocket_session::on_timer, derived().shared_from_this(), @@ -403,13 +393,11 @@ public: // Read a message into our buffer derived().ws().async_read( buffer_, - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_read, - derived().shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); + std::bind( + &websocket_session::on_read, + derived().shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); } void @@ -437,13 +425,11 @@ public: derived().ws().text(derived().ws().got_text()); derived().ws().async_write( buffer_.data(), - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_write, - derived().shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); + std::bind( + &websocket_session::on_write, + derived().shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); } void @@ -473,21 +459,24 @@ class plain_websocket_session : public websocket_session , public std::enable_shared_from_this { - websocket::stream ws_; + websocket::stream< + beast::tcp_stream> ws_; bool close_ = false; public: // Create the session explicit - plain_websocket_session(tcp::socket socket) + plain_websocket_session( + beast::tcp_stream&& stream) : websocket_session( - socket.get_executor().context()) - , ws_(std::move(socket)) + stream.get_executor().context()) + , ws_(std::move(stream)) { } // Called by the base class - websocket::stream& + websocket::stream< + beast::tcp_stream>& ws() { return ws_; @@ -514,18 +503,17 @@ public: return; close_ = true; + // VFALCO This doesn't look right... // Set the timer timer_.expires_after(std::chrono::seconds(15)); // Close the WebSocket Connection ws_.async_close( websocket::close_code::normal, - net::bind_executor( - strand_, - std::bind( - &plain_websocket_session::on_close, - shared_from_this(), - std::placeholders::_1))); + std::bind( + &plain_websocket_session::on_close, + shared_from_this(), + std::placeholders::_1)); } void @@ -547,13 +535,15 @@ class ssl_websocket_session : public websocket_session , public std::enable_shared_from_this { - websocket::stream> ws_; + websocket::stream>> ws_; bool eof_ = false; public: // Create the http_session explicit - ssl_websocket_session(beast::ssl_stream stream) + ssl_websocket_session(beast::ssl_stream< + beast::tcp_stream>&& stream) : websocket_session( stream.get_executor().context()) , ws_(std::move(stream)) @@ -561,7 +551,8 @@ public: } // Called by the base class - websocket::stream>& + websocket::stream>>& ws() { return ws_; @@ -590,12 +581,10 @@ public: // Perform the SSL shutdown ws_.next_layer().async_shutdown( - net::bind_executor( - strand_, - std::bind( - &ssl_websocket_session::on_shutdown, - shared_from_this(), - std::placeholders::_1))); + std::bind( + &ssl_websocket_session::on_shutdown, + shared_from_this(), + std::placeholders::_1)); } void @@ -629,17 +618,17 @@ public: template void make_websocket_session( - tcp::socket socket, + beast::tcp_stream stream, http::request> req) { std::make_shared( - std::move(socket))->run(std::move(req)); + std::move(stream))->run(std::move(req)); } template void make_websocket_session( - beast::ssl_stream stream, + beast::ssl_stream> stream, http::request> req) { std::make_shared( @@ -735,13 +724,11 @@ class http_session http::async_write( self_.derived().stream(), msg_, - net::bind_executor( - self_.strand_, - std::bind( - &http_session::on_write, - self_.derived().shared_from_this(), - std::placeholders::_1, - msg_.need_eof()))); + std::bind( + &http_session::on_write, + self_.derived().shared_from_this(), + std::placeholders::_1, + msg_.need_eof())); } }; @@ -761,8 +748,6 @@ class http_session protected: net::steady_timer timer_; - net::strand< - net::io_context::executor_type> strand_; beast::flat_buffer buffer_; public: @@ -775,7 +760,6 @@ public: , queue_(*this) , timer_(ioc, (std::chrono::steady_clock::time_point::max)()) - , strand_(ioc.get_executor()) , buffer_(std::move(buffer)) { } @@ -783,58 +767,28 @@ public: void do_read() { - // Set the timer - timer_.expires_after(std::chrono::seconds(15)); - // Make the request empty before reading, // otherwise the operation behavior is undefined. req_ = {}; + // Set the timeout. + beast::get_lowest_layer( + derived().stream()).expires_after(std::chrono::seconds(30)); + // Read a request http::async_read( derived().stream(), buffer_, req_, - net::bind_executor( - strand_, - std::bind( - &http_session::on_read, - derived().shared_from_this(), - std::placeholders::_1))); - } - - // Called when the timer expires. - void - on_timer(beast::error_code ec) - { - if(ec && ec != net::error::operation_aborted) - return fail(ec, "timer"); - - // Check if this has been upgraded to Websocket - if(timer_.expiry() == (std::chrono::steady_clock::time_point::min)()) - return; - - // Verify that the timer really expired since the deadline may have moved. - if(timer_.expiry() <= std::chrono::steady_clock::now()) - return derived().do_timeout(); - - // Wait on the timer - timer_.async_wait( - net::bind_executor( - strand_, - std::bind( - &http_session::on_timer, - derived().shared_from_this(), - std::placeholders::_1))); + std::bind( + &http_session::on_read, + derived().shared_from_this(), + std::placeholders::_1)); } void on_read(beast::error_code ec) { - // Happens when the timer closes the socket - if(ec == net::error::operation_aborted) - return; - // This means they closed the connection if(ec == http::error::end_of_stream) return derived().do_eof(); @@ -845,10 +799,6 @@ public: // See if it is a WebSocket Upgrade if(websocket::is_upgrade(req_)) { - // Make timer expire immediately, by setting expiry to time_point::min we can detect - // the upgrade to websocket in the timer handler - timer_.expires_at((std::chrono::steady_clock::time_point::min)()); - // Transfer the stream to a new WebSocket session return make_websocket_session( derived().release_stream(), @@ -894,115 +844,31 @@ class plain_http_session : public http_session , public std::enable_shared_from_this { - tcp::socket socket_; - net::strand< - net::io_context::executor_type> strand_; + beast::tcp_stream stream_; public: // Create the http_session plain_http_session( - tcp::socket socket, - beast::flat_buffer buffer, + beast::tcp_stream&& stream, + beast::flat_buffer&& buffer, std::shared_ptr const& doc_root) : http_session( - socket.get_executor().context(), + stream.get_executor().context(), std::move(buffer), doc_root) - , socket_(std::move(socket)) - , strand_(socket_.get_executor()) + , stream_(std::move(stream)) { } // Called by the base class - tcp::socket& - stream() - { - return socket_; - } - - // Called by the base class - tcp::socket - release_stream() - { - return std::move(socket_); - } - - // Start the asynchronous operation - void - run() - { - // Make sure we run on the strand - if(! strand_.running_in_this_thread()) - return net::post( - net::bind_executor( - strand_, - std::bind( - &plain_http_session::run, - shared_from_this()))); - - // Run the timer. The timer is operated - // continuously, this simplifies the code. - on_timer({}); - - do_read(); - } - - void - do_eof() - { - // Send a TCP shutdown - beast::error_code ec; - socket_.shutdown(tcp::socket::shutdown_send, ec); - - // At this point the connection is closed gracefully - } - - void - do_timeout() - { - // Closing the socket cancels all outstanding operations. They - // will complete with net::error::operation_aborted - beast::error_code ec; - socket_.shutdown(tcp::socket::shutdown_both, ec); - socket_.close(ec); - } -}; - -// Handles an SSL HTTP connection -class ssl_http_session - : public http_session - , public std::enable_shared_from_this -{ - beast::ssl_stream stream_; - net::strand< - net::io_context::executor_type> strand_; - bool eof_ = false; - -public: - // Create the http_session - ssl_http_session( - tcp::socket socket, - ssl::context& ctx, - beast::flat_buffer buffer, - std::shared_ptr const& doc_root) - : http_session( - socket.get_executor().context(), - std::move(buffer), - doc_root) - , stream_(std::move(socket), ctx) - , strand_(stream_.get_executor()) - { - } - - // Called by the base class - beast::ssl_stream& + beast::tcp_stream& stream() { return stream_; } // Called by the base class - beast::ssl_stream + beast::tcp_stream release_stream() { return std::move(stream_); @@ -1013,43 +879,108 @@ public: run() { // Make sure we run on the strand - if(! strand_.running_in_this_thread()) + if(! stream_.get_executor().running_in_this_thread()) return net::post( - net::bind_executor( - strand_, - std::bind( - &ssl_http_session::run, - shared_from_this()))); + stream_.get_executor(), + std::bind( + &plain_http_session::run, + shared_from_this())); - // Run the timer. The timer is operated - // continuously, this simplifies the code. - on_timer({}); + do_read(); + } - // Set the timer - timer_.expires_after(std::chrono::seconds(15)); + void + do_eof() + { + // Send a TCP shutdown + beast::error_code ec; + stream_.socket().shutdown(tcp::socket::shutdown_send, ec); + + // At this point the connection is closed gracefully + } + + void + do_timeout() + { + // Closing the socket cancels all outstanding operations. They + // will complete with net::error::operation_aborted + beast::error_code ec; + stream_.socket().shutdown(tcp::socket::shutdown_both, ec); + stream_.socket().close(ec); + } +}; + +// Handles an SSL HTTP connection +class ssl_http_session + : public http_session + , public std::enable_shared_from_this +{ + beast::ssl_stream< + beast::tcp_stream> stream_; + bool eof_ = false; + +public: + // Create the http_session + ssl_http_session( + beast::tcp_stream&& stream, + ssl::context& ctx, + beast::flat_buffer&& buffer, + std::shared_ptr const& doc_root) + : http_session( + stream.get_executor().context(), + std::move(buffer), + doc_root) + , stream_(std::move(stream), ctx) + { + } + + // Called by the base class + beast::ssl_stream< + beast::tcp_stream>& + stream() + { + return stream_; + } + + // Called by the base class + beast::ssl_stream< + beast::tcp_stream> + release_stream() + { + return std::move(stream_); + } + + // Start the asynchronous operation + void + run() + { + // Make sure we run on the strand + if(! stream_.get_executor().running_in_this_thread()) + return net::post( + stream_.get_executor(), + std::bind( + &ssl_http_session::run, + shared_from_this())); + + // Set the timeout. + beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); // Perform the SSL handshake // Note, this is the buffered version of the handshake. stream_.async_handshake( ssl::stream_base::server, buffer_.data(), - net::bind_executor( - strand_, - std::bind( - &ssl_http_session::on_handshake, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); + std::bind( + &ssl_http_session::on_handshake, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); } void on_handshake( beast::error_code ec, std::size_t bytes_used) { - // Happens when the handshake times out - if(ec == net::error::operation_aborted) - return; - if(ec) return fail(ec, "handshake"); @@ -1064,17 +995,15 @@ public: { eof_ = true; - // Set the timer - timer_.expires_after(std::chrono::seconds(15)); + // Set the timeout. + beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); // Perform the SSL shutdown stream_.async_shutdown( - net::bind_executor( - strand_, - std::bind( - &ssl_http_session::on_shutdown, - shared_from_this(), - std::placeholders::_1))); + std::bind( + &ssl_http_session::on_shutdown, + shared_from_this(), + std::placeholders::_1)); } void @@ -1089,20 +1018,6 @@ public: // At this point the connection is closed gracefully } - - void - do_timeout() - { - // If this is true it means we timed out performing the shutdown - if(eof_) - return; - - // Start the timer again - timer_.expires_at( - (std::chrono::steady_clock::time_point::max)()); - on_timer({}); - do_eof(); - } }; //------------------------------------------------------------------------------ @@ -1110,10 +1025,8 @@ public: // Detects SSL handshakes class detect_session : public std::enable_shared_from_this { - tcp::socket socket_; + beast::tcp_stream stream_; ssl::context& ctx_; - net::strand< - net::io_context::executor_type> strand_; std::shared_ptr doc_root_; beast::flat_buffer buffer_; @@ -1123,9 +1036,8 @@ public: tcp::socket socket, ssl::context& ctx, std::shared_ptr const& doc_root) - : socket_(std::move(socket)) + : stream_(std::move(socket)) , ctx_(ctx) - , strand_(socket_.get_executor()) , doc_root_(doc_root) { } @@ -1134,17 +1046,17 @@ public: void run() { - async_detect_ssl( - socket_, - buffer_, - net::bind_executor( - strand_, - std::bind( - &detect_session::on_detect, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); + // Set the timeout. + stream_.expires_after(std::chrono::seconds(30)); + async_detect_ssl( + stream_, + buffer_, + std::bind( + &detect_session::on_detect, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); } void @@ -1157,7 +1069,7 @@ public: { // Launch SSL session std::make_shared( - std::move(socket_), + std::move(stream_), ctx_, std::move(buffer_), doc_root_)->run(); @@ -1166,7 +1078,7 @@ public: // Launch plain session std::make_shared( - std::move(socket_), + std::move(stream_), std::move(buffer_), doc_root_)->run(); } @@ -1177,7 +1089,6 @@ class listener : public std::enable_shared_from_this { ssl::context& ctx_; tcp::acceptor acceptor_; - tcp::socket socket_; std::shared_ptr doc_root_; public: @@ -1188,7 +1099,6 @@ public: std::shared_ptr const& doc_root) : ctx_(ctx) , acceptor_(ioc) - , socket_(ioc) , doc_root_(doc_root) { beast::error_code ec; @@ -1240,15 +1150,15 @@ public: do_accept() { acceptor_.async_accept( - socket_, std::bind( &listener::on_accept, shared_from_this(), - std::placeholders::_1)); + std::placeholders::_1, + std::placeholders::_2)); } void - on_accept(beast::error_code ec) + on_accept(beast::error_code ec, tcp::socket socket) { if(ec) { @@ -1258,7 +1168,7 @@ public: { // Create the detector http_session and run it std::make_shared( - std::move(socket_), + std::move(socket), ctx_, doc_root_)->run(); } diff --git a/example/advanced/server/advanced_server.cpp b/example/advanced/server/advanced_server.cpp index 5cbbdbed..a417095e 100644 --- a/example/advanced/server/advanced_server.cpp +++ b/example/advanced/server/advanced_server.cpp @@ -18,10 +18,9 @@ #include #include #include -#include #include -#include #include +#include #include #include #include @@ -221,9 +220,7 @@ fail(beast::error_code ec, char const* what) // Echoes back all received WebSocket messages class websocket_session : public std::enable_shared_from_this { - websocket::stream ws_; - net::strand< - net::io_context::executor_type> strand_; + websocket::stream> ws_; net::steady_timer timer_; beast::flat_buffer buffer_; char ping_state_ = 0; @@ -233,13 +230,12 @@ public: explicit websocket_session(tcp::socket socket) : ws_(std::move(socket)) - , strand_(ws_.get_executor()) , timer_(ws_.get_executor().context(), (std::chrono::steady_clock::time_point::max)()) { } - // Start the asynchronous operation + // Start the asynchronous accept operation template void do_accept(http::request> req) @@ -263,12 +259,10 @@ public: // Accept the websocket handshake ws_.async_accept( req, - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_accept, - shared_from_this(), - std::placeholders::_1))); + std::bind( + &websocket_session::on_accept, + shared_from_this(), + std::placeholders::_1)); } void @@ -307,12 +301,10 @@ public: // Now send the ping ws_.async_ping({}, - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_ping, - shared_from_this(), - std::placeholders::_1))); + std::bind( + &websocket_session::on_ping, + shared_from_this(), + std::placeholders::_1)); } else { @@ -322,8 +314,8 @@ public: // Closing the socket cancels all outstanding operations. They // will complete with net::error::operation_aborted - ws_.next_layer().shutdown(tcp::socket::shutdown_both, ec); - ws_.next_layer().close(ec); + beast::get_lowest_layer(ws_).socket().shutdown(tcp::socket::shutdown_both, ec); + beast::get_lowest_layer(ws_).socket().close(ec); return; } } @@ -331,7 +323,7 @@ public: // Wait on the timer timer_.async_wait( net::bind_executor( - strand_, + ws_.get_executor(), // use the strand std::bind( &websocket_session::on_timer, shared_from_this(), @@ -391,13 +383,11 @@ public: // Read a message into our buffer ws_.async_read( buffer_, - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_read, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); + std::bind( + &websocket_session::on_read, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); } void @@ -425,13 +415,11 @@ public: ws_.text(ws_.got_text()); ws_.async_write( buffer_.data(), - net::bind_executor( - strand_, - std::bind( - &websocket_session::on_write, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); + std::bind( + &websocket_session::on_write, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); } void @@ -530,15 +518,13 @@ class http_session : public std::enable_shared_from_this operator()() { http::async_write( - self_.socket_, + self_.stream_, msg_, - net::bind_executor( - self_.strand_, - std::bind( - &http_session::on_write, - self_.shared_from_this(), - std::placeholders::_1, - msg_.need_eof()))); + std::bind( + &http_session::on_write, + self_.shared_from_this(), + std::placeholders::_1, + msg_.need_eof())); } }; @@ -552,10 +538,7 @@ class http_session : public std::enable_shared_from_this } }; - tcp::socket socket_; - net::strand< - net::io_context::executor_type> strand_; - net::steady_timer timer_; + beast::tcp_stream stream_; beast::flat_buffer buffer_; std::shared_ptr doc_root_; http::request req_; @@ -567,10 +550,7 @@ public: http_session( tcp::socket socket, std::shared_ptr const& doc_root) - : socket_(std::move(socket)) - , strand_(socket_.get_executor()) - , timer_(socket_.get_executor().context(), - (std::chrono::steady_clock::time_point::max)()) + : stream_(std::move(socket)) , doc_root_(doc_root) , queue_(*this) { @@ -581,17 +561,12 @@ public: run() { // Make sure we run on the strand - if(! strand_.running_in_this_thread()) + if(! stream_.get_executor().running_in_this_thread()) return net::post( - net::bind_executor( - strand_, - std::bind( - &http_session::run, - shared_from_this()))); - - // Run the timer. The timer is operated - // continuously, this simplifies the code. - on_timer({}); + stream_.get_executor(), + std::bind( + &http_session::run, + shared_from_this())); do_read(); } @@ -599,61 +574,25 @@ public: void do_read() { - // Set the timer - timer_.expires_after(std::chrono::seconds(15)); // Make the request empty before reading, // otherwise the operation behavior is undefined. req_ = {}; + // Set the timeout. + stream_.expires_after(std::chrono::seconds(30)); + // Read a request - http::async_read(socket_, buffer_, req_, - net::bind_executor( - strand_, - std::bind( - &http_session::on_read, - shared_from_this(), - std::placeholders::_1))); - } - - // Called when the timer expires. - void - on_timer(beast::error_code ec) - { - if(ec && ec != net::error::operation_aborted) - return fail(ec, "timer"); - - // Check if this has been upgraded to Websocket - if(timer_.expiry() == (std::chrono::steady_clock::time_point::min)()) - return; - - // Verify that the timer really expired since the deadline may have moved. - if(timer_.expiry() <= std::chrono::steady_clock::now()) - { - // Closing the socket cancels all outstanding operations. They - // will complete with net::error::operation_aborted - socket_.shutdown(tcp::socket::shutdown_both, ec); - socket_.close(ec); - return; - } - - // Wait on the timer - timer_.async_wait( - net::bind_executor( - strand_, - std::bind( - &http_session::on_timer, - shared_from_this(), - std::placeholders::_1))); + http::async_read(stream_, buffer_, req_, + std::bind( + &http_session::on_read, + shared_from_this(), + std::placeholders::_1)); } void on_read(beast::error_code ec) { - // Happens when the timer closes the socket - if(ec == net::error::operation_aborted) - return; - // This means they closed the connection if(ec == http::error::end_of_stream) return do_close(); @@ -664,13 +603,9 @@ public: // See if it is a WebSocket Upgrade if(websocket::is_upgrade(req_)) { - // Make timer expire immediately, by setting expiry to time_point::min we can detect - // the upgrade to websocket in the timer handler - timer_.expires_at((std::chrono::steady_clock::time_point::min)()); - - // Create a WebSocket websocket_session by transferring the socket + // Create a websocket session by transferring the socket std::make_shared( - std::move(socket_))->do_accept(std::move(req_)); + stream_.release_socket())->do_accept(std::move(req_)); return; } @@ -712,7 +647,7 @@ public: { // Send a TCP shutdown beast::error_code ec; - socket_.shutdown(tcp::socket::shutdown_send, ec); + stream_.socket().shutdown(tcp::socket::shutdown_send, ec); // At this point the connection is closed gracefully } @@ -785,15 +720,15 @@ public: do_accept() { acceptor_.async_accept( - socket_, std::bind( &listener::on_accept, shared_from_this(), - std::placeholders::_1)); + std::placeholders::_1, + std::placeholders::_2)); } void - on_accept(beast::error_code ec) + on_accept(beast::error_code ec, tcp::socket socket) { if(ec) { @@ -801,9 +736,9 @@ public: } else { - // Create the http_session and run it + // Create the http session and run it std::make_shared( - std::move(socket_), + std::move(socket), doc_root_)->run(); } diff --git a/include/boost/beast/core/impl/basic_stream.hpp b/include/boost/beast/core/impl/basic_stream.hpp index 2958e2e5..c0cb0743 100644 --- a/include/boost/beast/core/impl/basic_stream.hpp +++ b/include/boost/beast/core/impl/basic_stream.hpp @@ -718,6 +718,9 @@ async_connect( } //------------------------------------------------------------------------------ +// +// Customization points +// #if ! BOOST_BEAST_DOXYGEN