From ff5672ec077b97e2f2ad4c671c9d2a17c6bb139e Mon Sep 17 00:00:00 2001 From: Mohammad Nejati Date: Fri, 5 Jul 2024 11:02:20 +0000 Subject: [PATCH] awaitable examples are simplified No need for `rebind_executor` as `asio::deferred` is now the default completion token. --- .../advanced_server_flex_awaitable.cpp | 1196 +++++------------ .../http_client_awaitable_ssl.cpp | 163 ++- .../awaitable/http_client_awaitable.cpp | 129 +- .../awaitable/http_server_awaitable.cpp | 197 ++- .../awaitable/websocket_client_awaitable.cpp | 139 +- .../awaitable/websocket_server_awaitable.cpp | 180 ++- 6 files changed, 695 insertions(+), 1309 deletions(-) diff --git a/example/advanced/server-flex-awaitable/advanced_server_flex_awaitable.cpp b/example/advanced/server-flex-awaitable/advanced_server_flex_awaitable.cpp index 6e1c5b90..5e2c7586 100644 --- a/example/advanced/server-flex-awaitable/advanced_server_flex_awaitable.cpp +++ b/example/advanced/server-flex-awaitable/advanced_server_flex_awaitable.cpp @@ -1,5 +1,6 @@ // // Copyright (c) 2022 Klemens D. Morgenstern (klemens dot morgenstern at gmx dot net) +// Copyright (c) 2024 Mohammad Nejati // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -15,50 +16,31 @@ #include "example/common/server_certificate.hpp" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include -#include -#include +#include +#include +#include + #include #include #include #include -#include -#include #include #include #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) +namespace beast = boost::beast; +namespace http = beast::http; +namespace websocket = beast::websocket; +namespace net = boost::asio; +namespace ssl = boost::asio::ssl; -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace websocket = beast::websocket; // from -namespace net = boost::asio; // from -namespace ssl = boost::asio::ssl; // from -using tcp = boost::asio::ip::tcp; // from - -using executor_type = net::io_context::executor_type; -using executor_with_default = net::as_tuple_t>::executor_with_default; - - +using executor_type = net::strand; +using stream_type = typename beast::tcp_stream::rebind_executor::other; +using acceptor_type = typename net::ip::tcp::acceptor::rebind_executor::other; // Return a reasonable mime type based on the extension of a file. beast::string_view @@ -227,944 +209,416 @@ handle_request( return res; } -//------------------------------------------------------------------------------ - -// Report a failure -void -fail(beast::error_code ec, char const* what) +/** A thread-safe task group that tracks child tasks, allows emitting + cancellation signals to them, and waiting for their completion. +*/ +class task_group { - // ssl::error::stream_truncated, also known as an SSL "short read", - // indicates the peer closed the connection without performing the - // required closing handshake (for example, Google does this to - // improve performance). Generally this can be a security issue, - // but if your communication protocol is self-terminated (as - // it is with both HTTP and WebSocket) then you may simply - // ignore the lack of close_notify. - // - // https://github.com/boostorg/beast/issues/38 - // - // https://security.stackexchange.com/questions/91435/how-to-handle-a-malicious-ssl-tls-shutdown - // - // When a short read would cut off the end of an HTTP message, - // Beast returns the error beast::http::error::partial_message. - // Therefore, if we see a short read here, it has occurred - // after the message has been completed, so it is safe to ignore it. - - if(ec == net::ssl::error::stream_truncated) - return; - - std::cerr << what << ": " << ec.message() << "\n"; -} - -// A simple helper for cancellation_slot -struct cancellation_signals -{ - std::list sigs; - std::mutex mtx; - void emit(net::cancellation_type ct = net::cancellation_type::all) - { - std::lock_guard _(mtx); - - for (auto & sig : sigs) - sig.emit(ct); - } - - net::cancellation_slot slot() - { - std::lock_guard _(mtx); - - auto itr = std::find_if(sigs.begin(), sigs.end(), - [](net::cancellation_signal & sig) - { - return !sig.slot().has_handler(); - }); - - if (itr != sigs.end()) - return itr->slot(); - else - return sigs.emplace_back().slot(); - } -}; - -//------------------------------------------------------------------------------ - -// Echoes back all received WebSocket messages. -// This uses the Curiously Recurring Template Pattern so that -// the same code works with both SSL streams and regular sockets. -template -class websocket_session -{ - // Access the derived class, this is part of - // the Curiously Recurring Template Pattern idiom. - Derived& - derived() - { - return static_cast(*this); - } - - beast::flat_buffer buffer_; - - // Start the asynchronous operation - template - void - do_accept(http::request> req) - { - // Set suggested timeout settings for the websocket - derived().ws().set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::server)); - - // Set a decorator to change the Server of the handshake - derived().ws().set_option( - websocket::stream_base::decorator( - [](websocket::response_type& res) - { - res.set(http::field::server, - std::string(BOOST_BEAST_VERSION_STRING) + - " advanced-server-flex"); - })); - - // Accept the websocket handshake - derived().ws().async_accept( - req, - beast::bind_front_handler( - &websocket_session::on_accept, - derived().shared_from_this())); - } - -private: - void - on_accept(beast::error_code ec) - { - if(ec) - return fail(ec, "accept"); - - // Read a message - do_read(); - } - - void - do_read() - { - // Read a message into our buffer - derived().ws().async_read( - buffer_, - beast::bind_front_handler( - &websocket_session::on_read, - derived().shared_from_this())); - } - - void - on_read( - beast::error_code ec, - std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - // This indicates that the websocket_session was closed - if(ec == websocket::error::closed) - return; - - if(ec) - return fail(ec, "read"); - - // Echo the message - derived().ws().text(derived().ws().got_text()); - derived().ws().async_write( - buffer_.data(), - beast::bind_front_handler( - &websocket_session::on_write, - derived().shared_from_this())); - } - - void - on_write( - beast::error_code ec, - std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - if(ec) - return fail(ec, "write"); - - // Clear the buffer - buffer_.consume(buffer_.size()); - - // Do another read - do_read(); - } + std::mutex mtx_; + net::steady_timer cv_; + std::list css_; public: - // Start the asynchronous operation - template + task_group(net::any_io_executor exec) + : cv_{ std::move(exec), net::steady_timer::time_point::max() } + { + } + + task_group(task_group const&) = delete; + task_group(task_group&&) = delete; + + /** Adds a cancellation slot and a wrapper object that will remove the child + task from the list when it completes. + + @param completion_token The completion token that will be adapted. + + @par Thread Safety + @e Distinct @e objects: Safe.@n + @e Shared @e objects: Safe. + */ + template + auto + adapt(CompletionToken&& completion_token) + { + auto lg = std::lock_guard{ mtx_ }; + auto cs = css_.emplace(css_.end()); + + return net::bind_cancellation_slot( + cs->slot(), + net::consign( + std::forward(completion_token), + boost::scope::make_scope_exit( + [this, cs]() + { + auto lg = std::lock_guard{ mtx_ }; + if(css_.erase(cs) == css_.end()) + cv_.cancel(); + }))); + } + + /** Emits the signal to all child tasks and invokes the slot's + handler, if any. + + @param type The completion type that will be emitted to child tasks. + + @par Thread Safety + @e Distinct @e objects: Safe.@n + @e Shared @e objects: Safe. + */ void - run(http::request> req) - { - // Accept the WebSocket upgrade request - do_accept(std::move(req)); - } -}; - -//------------------------------------------------------------------------------ - -// Handles a plain WebSocket connection -class plain_websocket_session - : public websocket_session - , public std::enable_shared_from_this -{ - websocket::stream ws_; - -public: - // Create the session - explicit - plain_websocket_session( - beast::tcp_stream&& stream) - : ws_(std::move(stream)) + emit(net::cancellation_type type) { + auto lg = std::lock_guard{ mtx_ }; + for(auto& cs : css_) + cs.emit(type); } - // Called by the base class - websocket::stream& - ws() + /** Starts an asynchronous wait on the task_group. + + The completion handler will be called when: + + @li All the child tasks completed. + @li The operation was cancelled. + + @param completion_token The completion token that will be used to + produce a completion handler. The function signature of the completion + handler must be: + @code + void handler( + boost::system::error_code const& error // result of operation + ); + @endcode + + @par Thread Safety + @e Distinct @e objects: Safe.@n + @e Shared @e objects: Safe. + */ + template< + typename CompletionToken = + net::default_completion_token_t> + auto + async_wait( + CompletionToken&& completion_token = + net::default_completion_token_t{}) { - return ws_; - } -}; - -//------------------------------------------------------------------------------ - -// Handles an SSL WebSocket connection -class ssl_websocket_session - : public websocket_session - , public std::enable_shared_from_this -{ - websocket::stream> ws_; - -public: - // Create the ssl_websocket_session - explicit - ssl_websocket_session(ssl::stream&& stream) - : ws_(std::move(stream)) - { - } - - // Called by the base class - websocket::stream>& - ws() - { - return ws_; - } -}; - -//------------------------------------------------------------------------------ - -template -void -make_websocket_session( - beast::tcp_stream stream, - http::request> req) -{ - std::make_shared( - std::move(stream))->run(std::move(req)); -} - -template -void -make_websocket_session( - ssl::stream stream, - http::request> req) -{ - std::make_shared( - std::move(stream))->run(std::move(req)); -} - -//------------------------------------------------------------------------------ - -// Handles an HTTP server connection. -// This uses the Curiously Recurring Template Pattern so that -// the same code works with both SSL streams and regular sockets. -template -class http_session -{ - std::shared_ptr doc_root_; - - // Access the derived class, this is part of - // the Curiously Recurring Template Pattern idiom. - Derived& - derived() - { - return static_cast(*this); - } - - static constexpr std::size_t queue_limit = 8; // max responses - std::queue response_queue_; - - // The parser is stored in an optional container so we can - // construct it from scratch it at the beginning of each new message. - boost::optional> parser_; - -protected: - beast::flat_buffer buffer_; - -public: - // Construct the session - http_session( - beast::flat_buffer buffer, - std::shared_ptr const& doc_root) - : doc_root_(doc_root) - , buffer_(std::move(buffer)) - { - } - - void - do_read() - { - // Construct a new parser for each message - parser_.emplace(); - - // Apply a reasonable limit to the allowed size - // of the body in bytes to prevent abuse. - parser_->body_limit(10000); - - // Set the timeout. - beast::get_lowest_layer( - derived().stream()).expires_after(std::chrono::seconds(30)); - - // Read a request using the parser-oriented interface - http::async_read( - derived().stream(), - buffer_, - *parser_, - beast::bind_front_handler( - &http_session::on_read, - derived().shared_from_this())); - } - - void - on_read(beast::error_code ec, std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - // This means they closed the connection - if(ec == http::error::end_of_stream) - return derived().do_eof(); - - if(ec) - return fail(ec, "read"); - - // See if it is a WebSocket Upgrade - if(websocket::is_upgrade(parser_->get())) - { - // Disable the timeout. - // The websocket::stream uses its own timeout settings. - beast::get_lowest_layer(derived().stream()).expires_never(); - - // Create a websocket session, transferring ownership - // of both the socket and the HTTP request. - return make_websocket_session( - derived().release_stream(), - parser_->release()); - } - - // Send the response - queue_write(handle_request(*doc_root_, parser_->release())); - - // If we aren't at the queue limit, try to pipeline another request - if (response_queue_.size() < queue_limit) - do_read(); - } - - void - queue_write(http::message_generator response) - { - // Allocate and store the work - response_queue_.push(std::move(response)); - - // If there was no previous work, start the write loop - if (response_queue_.size() == 1) - do_write(); - } - - // Called to start/continue the write-loop. Should not be called when - // write_loop is already active. - void - do_write() - { - if(! response_queue_.empty()) - { - bool keep_alive = response_queue_.front().keep_alive(); - - beast::async_write( - derived().stream(), - std::move(response_queue_.front()), - beast::bind_front_handler( - &http_session::on_write, - derived().shared_from_this(), - keep_alive)); - } - } - - void - on_write( - bool keep_alive, - beast::error_code ec, - std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - if(ec) - return fail(ec, "write"); - - if(! keep_alive) - { - // This means we should close the connection, usually because - // the response indicated the "Connection: close" semantic. - return derived().do_eof(); - } - - // Resume the read if it has been paused - if(response_queue_.size() == queue_limit) - do_read(); - - response_queue_.pop(); - - do_write(); - } -}; - -//------------------------------------------------------------------------------ - -// Handles a plain HTTP connection -class plain_http_session - : public http_session - , public std::enable_shared_from_this -{ - beast::tcp_stream stream_; - -public: - // Create the session - plain_http_session( - beast::tcp_stream&& stream, - beast::flat_buffer&& buffer, - std::shared_ptr const& doc_root) - : http_session( - std::move(buffer), - doc_root) - , stream_(std::move(stream)) - { - } - - // Start the session - void - run() - { - this->do_read(); - } - - // Called by the base class - beast::tcp_stream& - stream() - { - return stream_; - } - - // Called by the base class - beast::tcp_stream - release_stream() - { - return std::move(stream_); - } - - // Called by the base class - void - do_eof() - { - // Send a TCP shutdown - beast::error_code ec; - stream_.socket().shutdown(tcp::socket::shutdown_send, ec); - - // At this point the connection is closed gracefully - } -}; - -//------------------------------------------------------------------------------ - -// Handles an SSL HTTP connection -class ssl_http_session - : public http_session - , public std::enable_shared_from_this -{ - ssl::stream stream_; - -public: - // Create the http_session - ssl_http_session( - beast::tcp_stream&& stream, - ssl::context& ctx, - beast::flat_buffer&& buffer, - std::shared_ptr const& doc_root) - : http_session( - std::move(buffer), - doc_root) - , stream_(std::move(stream), ctx) - { - } - - // Start the session - void - run() - { - // Set the timeout. - beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); - - // Perform the SSL handshake - // Note, this is the buffered version of the handshake. - stream_.async_handshake( - ssl::stream_base::server, - buffer_.data(), - beast::bind_front_handler( - &ssl_http_session::on_handshake, - shared_from_this())); - } - - // Called by the base class - ssl::stream& - stream() - { - return stream_; - } - - // Called by the base class - ssl::stream - release_stream() - { - return std::move(stream_); - } - - // Called by the base class - void - do_eof() - { - // Set the timeout. - beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); - - // Perform the SSL shutdown - stream_.async_shutdown( - beast::bind_front_handler( - &ssl_http_session::on_shutdown, - shared_from_this())); - } - -private: - void - on_handshake( - beast::error_code ec, - std::size_t bytes_used) - { - if(ec) - return fail(ec, "handshake"); - - // Consume the portion of the buffer used by the handshake - buffer_.consume(bytes_used); - - do_read(); - } - - void - on_shutdown(beast::error_code ec) - { - if(ec) - return fail(ec, "shutdown"); - - // At this point the connection is closed gracefully - } -}; - -//------------------------------------------------------------------------------ - -// Detects SSL handshakes -class detect_session : public std::enable_shared_from_this -{ - beast::tcp_stream stream_; - ssl::context& ctx_; - std::shared_ptr doc_root_; - beast::flat_buffer buffer_; - -public: - explicit - detect_session( - tcp::socket&& socket, - ssl::context& ctx, - std::shared_ptr const& doc_root) - : stream_(std::move(socket)) - , ctx_(ctx) - , doc_root_(doc_root) - { - } - - // Launch the detector - void - run() - { - // We need to be executing within a strand to perform async operations - // on the I/O objects in this session. Although not strictly necessary - // for single-threaded contexts, this example code is written to be - // thread-safe by default. - net::dispatch( - stream_.get_executor(), - beast::bind_front_handler( - &detect_session::on_run, - this->shared_from_this())); - } - - void - on_run() - { - // Set the timeout. - stream_.expires_after(std::chrono::seconds(30)); - - beast::async_detect_ssl( - stream_, - buffer_, - beast::bind_front_handler( - &detect_session::on_detect, - this->shared_from_this())); - } - - void - on_detect(beast::error_code ec, bool result) - { - if(ec) - return fail(ec, "detect"); - - if(result) - { - // Launch SSL session - std::make_shared( - std::move(stream_), - ctx_, - std::move(buffer_), - doc_root_)->run(); - return; - } - - // Launch plain session - std::make_shared( - std::move(stream_), - std::move(buffer_), - doc_root_)->run(); + return net:: + async_compose( + [this, scheduled = false]( + auto&& self, boost::system::error_code ec = {}) mutable + { + if(!scheduled) + self.reset_cancellation_state( + net::enable_total_cancellation()); + + if(!self.cancelled() && ec == net::error::operation_aborted) + ec = {}; + + { + auto lg = std::lock_guard{ mtx_ }; + + if(!css_.empty() && !ec) + { + scheduled = true; + return cv_.async_wait(std::move(self)); + } + } + + if(!std::exchange(scheduled, true)) + return net::post(net::append(std::move(self), ec)); + + self.complete(ec); + }, + completion_token, + cv_); } }; template -net::awaitable do_eof(Stream & stream) +net::awaitable +run_websocket_session( + Stream& stream, + beast::flat_buffer& buffer, + http::request req, + beast::string_view doc_root) { - beast::error_code ec; - stream.socket().shutdown(tcp::socket::shutdown_send, ec); - co_return ; -} - -template -BOOST_ASIO_NODISCARD net::awaitable -do_eof(ssl::stream & stream) -{ - co_await stream.async_shutdown(); -} - - -template -BOOST_ASIO_NODISCARD net::awaitable -run_websocket_session(Stream & stream, - beast::flat_buffer & buffer, - http::request> req, - const std::shared_ptr & doc_root) -{ - beast::websocket::stream ws{stream}; + auto cs = co_await net::this_coro::cancellation_state; + auto ws = websocket::stream{ stream }; // Set suggested timeout settings for the websocket ws.set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::server)); + websocket::stream_base::timeout::suggested(beast::role_type::server)); // Set a decorator to change the Server of the handshake - ws.set_option( - websocket::stream_base::decorator( - [](websocket::response_type& res) - { - res.set(http::field::server, - std::string(BOOST_BEAST_VERSION_STRING) + - " advanced-server-flex"); - })); + ws.set_option(websocket::stream_base::decorator( + [](websocket::response_type& res) + { + res.set( + http::field::server, + std::string(BOOST_BEAST_VERSION_STRING) + + " advanced-server-flex"); + })); // Accept the websocket handshake - auto [ec] = co_await ws.async_accept(req); - if (ec) - co_return fail(ec, "accept"); + co_await ws.async_accept(req); - while (true) + while(!cs.cancelled()) { - - // Read a message - std::size_t bytes_transferred = 0u; - std::tie(ec, bytes_transferred) = co_await ws.async_read(buffer); + auto [ec, _] = co_await ws.async_read(buffer, net::as_tuple); - // This indicates that the websocket_session was closed - if (ec == websocket::error::closed) + if(ec == websocket::error::closed || ec == ssl::error::stream_truncated) co_return; - if (ec) - co_return fail(ec, "read"); + if(ec) + throw boost::system::system_error{ ec }; + + // Echo the message back ws.text(ws.got_text()); - std::tie(ec, bytes_transferred) = co_await ws.async_write(buffer.data()); - - if (ec) - co_return fail(ec, "write"); + co_await ws.async_write(buffer.data()); // Clear the buffer buffer.consume(buffer.size()); } + + // A cancellation has been requested, gracefully close the session. + auto [ec] = co_await ws.async_close( + websocket::close_code::service_restart, net::as_tuple); + + if(ec && ec != ssl::error::stream_truncated) + throw boost::system::system_error{ ec }; } - template -BOOST_ASIO_NODISCARD net::awaitable -run_session(Stream & stream, beast::flat_buffer & buffer, const std::shared_ptr & doc_root) +net::awaitable +run_session( + Stream& stream, + beast::flat_buffer& buffer, + beast::string_view doc_root) { - // a new parser must be used for every message - // so we use an optional to reconstruct it every time. - std::optional> parser; - parser.emplace(); - // Apply a reasonable limit to the allowed size - // of the body in bytes to prevent abuse. - parser->body_limit(10000); + auto cs = co_await net::this_coro::cancellation_state; - auto [ec, bytes_transferred] = co_await http::async_read(stream, buffer, *parser); - - if(ec == http::error::end_of_stream) - co_await do_eof(stream); - - if(ec) - co_return fail(ec, "read"); - - // this can be - // while ((co_await net::this_coro::cancellation_state).cancelled() == net::cancellation_type::none) - // on most compilers - for (auto cs = co_await net::this_coro::cancellation_state; - cs.cancelled() == net::cancellation_type::none; - cs = co_await net::this_coro::cancellation_state) + while(!cs.cancelled()) { - if(websocket::is_upgrade(parser->get())) + http::request_parser parser; + parser.body_limit(10000); + + auto [ec, _] = + co_await http::async_read(stream, buffer, parser, net::as_tuple); + + if(ec == http::error::end_of_stream) + co_return; + + if(websocket::is_upgrade(parser.get())) { - // Disable the timeout. // The websocket::stream uses its own timeout settings. beast::get_lowest_layer(stream).expires_never(); - co_await run_websocket_session(stream, buffer, parser->release(), doc_root); - co_return ; + co_await run_websocket_session( + stream, buffer, parser.release(), doc_root); + + co_return; } - // we follow a different strategy then the other example: instead of queue responses, - // we always to one read & write in parallel. - - auto res = handle_request(*doc_root, parser->release()); - if (!res.keep_alive()) + auto res = handle_request(doc_root, parser.release()); + if(!res.keep_alive()) { - http::message_generator msg{std::move(res)}; - auto [ec, sz] = co_await beast::async_write(stream, std::move(msg)); - if (ec) - fail(ec, "write"); - co_return ; + co_await beast::async_write(stream, std::move(res)); + co_return; } - // we must use a new parser for every async_read - parser.reset(); - parser.emplace(); - parser->body_limit(10000); - - http::message_generator msg{std::move(res)}; - - auto [_, ec_r, sz_r, ec_w, sz_w ] = - co_await net::experimental::make_parallel_group( - http::async_read(stream, buffer, *parser, net::deferred), - beast::async_write(stream, std::move(msg), net::deferred)) - .async_wait(net::experimental::wait_for_all(), - net::as_tuple(net::use_awaitable_t{})); - - if (ec_r) - co_return fail(ec_r, "read"); - - if (ec_w) - co_return fail(ec_w, "write"); - + co_await beast::async_write(stream, std::move(res)); } - - } -BOOST_ASIO_NODISCARD net::awaitable -detect_session(typename beast::tcp_stream::rebind_executor::other stream, - net::ssl::context & ctx, - std::shared_ptr doc_root) +net::awaitable +detect_session( + stream_type stream, + ssl::context& ctx, + beast::string_view doc_root) { beast::flat_buffer buffer; - // Set the timeout. + // Allow total cancellation to change the cancellation state of this + // coroutine, but only allow terminal cancellation to propagate to async + // operations. This setting will be inherited by all child coroutines. + co_await net::this_coro::reset_cancellation_state( + net::enable_total_cancellation(), net::enable_terminal_cancellation()); + + // We want to be able to continue performing new async operations, such as + // cleanups, even after the coroutine is cancelled. This setting will be + // inherited by all child coroutines. + co_await net::this_coro::throw_if_cancelled(false); + stream.expires_after(std::chrono::seconds(30)); - // on_run - auto [ec, result] = co_await beast::async_detect_ssl(stream, buffer); - // on_detect - if (ec) - co_return fail(ec, "detect"); - if(result) + if(co_await beast::async_detect_ssl(stream, buffer)) { - using stream_type = typename beast::tcp_stream::rebind_executor::other; - ssl::stream ssl_stream{std::move(stream), ctx}; - auto [ec, bytes_used] = co_await ssl_stream.async_handshake(net::ssl::stream_base::server, buffer.data()); + ssl::stream ssl_stream{ std::move(stream), ctx }; - if(ec) - co_return fail(ec, "handshake"); + auto bytes_transferred = co_await ssl_stream.async_handshake( + ssl::stream_base::server, buffer.data()); + + buffer.consume(bytes_transferred); - buffer.consume(bytes_used); co_await run_session(ssl_stream, buffer, doc_root); + + if(!ssl_stream.lowest_layer().is_open()) + co_return; + + // Gracefully close the stream + auto [ec] = co_await ssl_stream.async_shutdown(net::as_tuple); + if(ec && ec != ssl::error::stream_truncated) + throw boost::system::system_error{ ec }; } else + { co_await run_session(stream, buffer, doc_root); + if(!stream.socket().is_open()) + co_return; + stream.socket().shutdown(net::ip::tcp::socket::shutdown_send); + } } -bool init_listener(typename tcp::acceptor::rebind_executor::other & acceptor, - const tcp::endpoint &endpoint) +net::awaitable +listen( + task_group& task_group, + ssl::context& ctx, + net::ip::tcp::endpoint endpoint, + beast::string_view doc_root) { - beast::error_code ec; - // Open the acceptor - acceptor.open(endpoint.protocol(), ec); - if(ec) - { - fail(ec, "open"); - return false; - } + auto cs = co_await net::this_coro::cancellation_state; + auto executor = co_await net::this_coro::executor; + auto acceptor = acceptor_type{ executor, endpoint }; - // Allow address reuse - acceptor.set_option(net::socket_base::reuse_address(true), ec); - if(ec) - { - fail(ec, "set_option"); - return false; - } + // Allow total cancellation to propagate to async operations. + co_await net::this_coro::reset_cancellation_state( + net::enable_total_cancellation()); - // Bind to the server address - acceptor.bind(endpoint, ec); - if(ec) + while(!cs.cancelled()) { - fail(ec, "bind"); - return false; - } + auto [ec, socket] = co_await acceptor.async_accept(net::as_tuple); - // Start listening for connections - acceptor.listen( - net::socket_base::max_listen_connections, ec); - if(ec) - { - fail(ec, "listen"); - return false; - } - return true; + if(ec == net::error::operation_aborted) + co_return; + if(ec) + throw boost::system::system_error{ ec }; + + net::co_spawn( + net::make_strand(executor.get_inner_executor()), + detect_session(stream_type{ std::move(socket) }, ctx, doc_root), + task_group.adapt( + [](std::exception_ptr e) + { + if(e) + { + try + { + std::rethrow_exception(e); + } + catch(std::exception& e) + { + std::cerr << "Error in session: " << e.what() << "\n"; + } + } + })); + } } -// Accepts incoming connections and launches the sessions. -BOOST_ASIO_NODISCARD net::awaitable - listen(ssl::context& ctx, - tcp::endpoint endpoint, - std::shared_ptr doc_root, - cancellation_signals & sig) +net::awaitable +handle_signals(task_group& task_group) { - typename tcp::acceptor::rebind_executor::other acceptor{co_await net::this_coro::executor}; - if (!init_listener(acceptor, endpoint)) - co_return; + auto executor = co_await net::this_coro::executor; + auto signal_set = net::signal_set{ executor, SIGINT, SIGTERM }; - while ((co_await net::this_coro::cancellation_state).cancelled() == net::cancellation_type::none) + auto sig = co_await signal_set.async_wait(); + + if(sig == SIGINT) { - auto [ec, sock] = co_await acceptor.async_accept(); - const auto exec = sock.get_executor(); - using stream_type = typename beast::tcp_stream::rebind_executor::other; - if (!ec) - // We dont't need a strand, since the awaitable is an implicit strand. - net::co_spawn(exec, - detect_session(stream_type(std::move(sock)), ctx, doc_root), - net::bind_cancellation_slot(sig.slot(), net::detached)); + std::cout << "Gracefully cancelling child tasks...\n"; + task_group.emit(net::cancellation_type::total); + + // Wait a limited time for child tasks to gracefully cancell + auto [ec] = co_await task_group.async_wait( + net::as_tuple(net::cancel_after(std::chrono::seconds{ 10 }))); + + if(ec == net::error::operation_aborted) // Timeout occurred + { + std::cout << "Sending a terminal cancellation signal...\n"; + task_group.emit(net::cancellation_type::terminal); + co_await task_group.async_wait(); + } + + std::cout << "Child tasks completed.\n"; + } + else // SIGTERM + { + executor.get_inner_executor().context().stop(); } } -//------------------------------------------------------------------------------ - -int main(int argc, char* argv[]) +int +main(int argc, char* argv[]) { // Check command line arguments. - if (argc != 5) + if(argc != 5) { - std::cerr << - "Usage: advanced-server-flex-awaitable
\n" << - "Example:\n" << - " advanced-server-flex-awaitable 0.0.0.0 8080 . 1\n"; + std::cerr << "Usage: advanced-server-flex-awaitable
\n" + << "Example:\n" + << " advanced-server-flex-awaitable 0.0.0.0 8080 . 1\n"; return EXIT_FAILURE; } - auto const address = net::ip::make_address(argv[1]); - auto const port = static_cast(std::atoi(argv[2])); - auto const doc_root = std::make_shared(argv[3]); - auto const threads = std::max(1, std::atoi(argv[4])); + auto const address = net::ip::make_address(argv[1]); + auto const port = static_cast(std::atoi(argv[2])); + auto const endpoint = net::ip::tcp::endpoint{ address, port }; + auto const doc_root = beast::string_view{ argv[3] }; + auto const threads = std::max(1, std::atoi(argv[4])); // The io_context is required for all I/O - net::io_context ioc{threads}; + net::io_context ioc{ threads }; // The SSL context is required, and holds certificates - ssl::context ctx{ssl::context::tlsv12}; + ssl::context ctx{ ssl::context::tlsv12 }; // This holds the self-signed certificate used by the server load_server_certificate(ctx); - // Cancellation-signal for SIGINT - cancellation_signals cancellation; + // Track coroutines + task_group task_group{ ioc.get_executor() }; - // Create and launch a listening routine + // Create and launch a listening coroutine net::co_spawn( - ioc, - listen(ctx, tcp::endpoint{address, port}, doc_root, cancellation), - net::bind_cancellation_slot(cancellation.slot(), net::detached)); - - - // Capture SIGINT and SIGTERM to perform a clean shutdown - net::signal_set signals(ioc, SIGINT, SIGTERM); - signals.async_wait( - [&](beast::error_code const&, int sig) - { - if (sig == SIGINT) - cancellation.emit(net::cancellation_type::all); - else + net::make_strand(ioc), + listen(task_group, ctx, endpoint, doc_root), + task_group.adapt( + [](std::exception_ptr e) { - // Stop the `io_context`. This will cause `run()` - // to return immediately, eventually destroying the - // `io_context` and all of the sockets in it. - ioc.stop(); - } - }); + if(e) + { + try + { + std::rethrow_exception(e); + } + catch(std::exception& e) + { + std::cerr << "Error in listener: " << e.what() << "\n"; + } + } + })); + + // Create and launch a signal handler coroutine + net::co_spawn( + net::make_strand(ioc), handle_signals(task_group), net::detached); // Run the I/O service on the requested number of threads std::vector v; v.reserve(threads - 1); for(auto i = threads - 1; i > 0; --i) - v.emplace_back( - [&ioc] - { - ioc.run(); - }); + v.emplace_back([&ioc] { ioc.run(); }); ioc.run(); - // (If we get here, it means we got a SIGINT or SIGTERM) - // Block until all the threads exit for(auto& t : v) t.join(); @@ -1172,13 +626,13 @@ int main(int argc, char* argv[]) return EXIT_SUCCESS; } - #else -int main(int, char * []) +int +main(int, char*[]) { std::printf("awaitables require C++20\n"); - return 1; + return EXIT_FAILURE; } #endif diff --git a/example/http/client/awaitable-ssl/http_client_awaitable_ssl.cpp b/example/http/client/awaitable-ssl/http_client_awaitable_ssl.cpp index e30d996b..3c3606af 100644 --- a/example/http/client/awaitable-ssl/http_client_awaitable_ssl.cpp +++ b/example/http/client/awaitable-ssl/http_client_awaitable_ssl.cpp @@ -13,28 +13,27 @@ // //------------------------------------------------------------------------------ -#include -#include -#include #include #include #include -#include +#include #include -#include +#include +#include +#include -#if defined(BOOST_ASIO_HAS_CO_AWAIT) +#include "example/common/root_certificates.hpp" #include #include #include -#include "example/common/root_certificates.hpp" -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace net = boost::asio; // from -namespace ssl = boost::asio::ssl; // from -using tcp = boost::asio::ip::tcp; // from +#if defined(BOOST_ASIO_HAS_CO_AWAIT) + +namespace beast = boost::beast; +namespace http = beast::http; +namespace net = boost::asio; +namespace ssl = boost::asio::ssl; //------------------------------------------------------------------------------ @@ -47,23 +46,17 @@ do_session( int version, ssl::context& ctx) { - // These objects perform our I/O - // They use an executor with a default completion token of use_awaitable - // This makes our code easy, but will use exceptions as the default error handling, - // i.e. if the connection drops, we might see an exception. - // See async_shutdown for error handling with an error_code. - auto resolver = net::use_awaitable.as_default_on(tcp::resolver(co_await net::this_coro::executor)); - using executor_with_default = net::use_awaitable_t<>::executor_with_default; - using tcp_stream = typename beast::tcp_stream::rebind_executor::other; - - // We construct the ssl stream from the already rebound tcp_stream. - ssl::stream stream{ - net::use_awaitable.as_default_on(beast::tcp_stream(co_await net::this_coro::executor)), - ctx}; + auto executor = co_await net::this_coro::executor; + auto resolver = net::ip::tcp::resolver{ executor }; + auto stream = ssl::stream{ executor, ctx }; // Set SNI Hostname (many hosts need this to handshake successfully) - if(! SSL_set_tlsext_host_name(stream.native_handle(), host.c_str())) - throw boost::system::system_error(static_cast(::ERR_get_error()), net::error::get_ssl_category()); + if(!SSL_set_tlsext_host_name(stream.native_handle(), host.c_str())) + { + throw boost::system::system_error( + static_cast(::ERR_get_error()), + net::error::get_ssl_category()); + } // Look up the domain name auto const results = co_await resolver.async_resolve(host, port); @@ -81,7 +74,7 @@ do_session( co_await stream.async_handshake(ssl::stream_base::client); // Set up an HTTP GET request message - http::request req{http::verb::get, target, version}; + http::request req{ http::verb::get, target, version }; req.set(http::field::host, host); req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); @@ -92,13 +85,13 @@ do_session( co_await http::async_write(stream, req); // This buffer is used for reading and must be persisted - beast::flat_buffer b; + beast::flat_buffer buffer; // Declare a container to hold the response http::response res; // Receive the HTTP response - co_await http::async_read(stream, b, res); + co_await http::async_read(stream, buffer, res); // Write the message to standard out std::cout << res << std::endl; @@ -107,7 +100,7 @@ do_session( beast::get_lowest_layer(stream).expires_after(std::chrono::seconds(30)); // Gracefully close the stream - do not threat every error as an exception! - auto [ec] = co_await stream.async_shutdown(net::as_tuple(net::use_awaitable)); + auto [ec] = co_await stream.async_shutdown(net::as_tuple); // ssl::error::stream_truncated, also known as an SSL "short read", // indicates the peer closed the connection without performing the @@ -126,75 +119,77 @@ do_session( // Therefore, if we see a short read here, it has occurred // after the message has been completed, so it is safe to ignore it. - if(ec != net::ssl::error::stream_truncated) + if(ec && ec != net::ssl::error::stream_truncated) throw boost::system::system_error(ec, "shutdown"); } //------------------------------------------------------------------------------ -int main(int argc, char** argv) +int +main(int argc, char** argv) { - // Check command line arguments. - if(argc != 4 && argc != 5) + try { - std::cerr << - "Usage: http-client-awaitable []\n" << - "Example:\n" << - " http-client-awaitable www.example.com 443 /\n" << - " http-client-awaitable www.example.com 443 / 1.0\n"; + // Check command line arguments. + if(argc != 4 && argc != 5) + { + std::cerr + << "Usage: http-client-awaitable []\n" + << "Example:\n" + << " http-client-awaitable www.example.com 443 /\n" + << " http-client-awaitable www.example.com 443 / 1.0\n"; + return EXIT_FAILURE; + } + auto const host = argv[1]; + auto const port = argv[2]; + auto const target = argv[3]; + auto const version = + argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11; + + // The io_context is required for all I/O + net::io_context ioc; + + // The SSL context is required, and holds certificates + ssl::context ctx{ ssl::context::tlsv12_client }; + + // This holds the root certificate used for verification + load_root_certificates(ctx); + + // Verify the remote server's certificate + ctx.set_verify_mode(ssl::verify_peer); + + // Launch the asynchronous operation + net::co_spawn( + ioc, + do_session(host, port, target, version, ctx), + // If the awaitable exists with an exception, it gets delivered here + // as `e`. This can happen for regular errors, such as connection + // drops. + [](std::exception_ptr e) + { + if(e) + std::rethrow_exception(e); + }); + + // Run the I/O service. The call will return when + // the get operation is complete. + ioc.run(); + } + catch(std::exception const& e) + { + std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } - auto const host = argv[1]; - auto const port = argv[2]; - auto const target = argv[3]; - int version = argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11; - - // The io_context is required for all I/O - net::io_context ioc; - - // The SSL context is required, and holds certificates - ssl::context ctx{ssl::context::tlsv12_client}; - - // This holds the root certificate used for verification - load_root_certificates(ctx); - - // Verify the remote server's certificate - ctx.set_verify_mode(ssl::verify_peer); - - - // Launch the asynchronous operation - net::co_spawn( - ioc, - do_session(host, port, target, version, ctx), - // If the awaitable exists with an exception, it gets delivered here as `e`. - // This can happen for regular errors, such as connection drops. - [](std::exception_ptr e) - { - if (!e) - return ; - try - { - std::rethrow_exception(e); - } - catch(std::exception & ex) - { - std::cerr << "Error: " << ex.what() << "\n"; - } - }); - - // Run the I/O service. The call will return when - // the get operation is complete. - ioc.run(); - return EXIT_SUCCESS; } #else -int main(int, char * []) +int +main(int, char*[]) { std::printf("awaitables require C++20\n"); - return 1; + return EXIT_FAILURE; } #endif diff --git a/example/http/client/awaitable/http_client_awaitable.cpp b/example/http/client/awaitable/http_client_awaitable.cpp index 85675716..4f0c2899 100644 --- a/example/http/client/awaitable/http_client_awaitable.cpp +++ b/example/http/client/awaitable/http_client_awaitable.cpp @@ -13,44 +13,30 @@ // //------------------------------------------------------------------------------ - - +#include +#include +#include #include #include #include -#include -#include -#include -#include - -#if defined(BOOST_ASIO_HAS_CO_AWAIT) #include -#include #include #include -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace net = boost::asio; // from -using tcp = boost::asio::ip::tcp; // from +#if defined(BOOST_ASIO_HAS_CO_AWAIT) -//------------------------------------------------------------------------------ +namespace beast = boost::beast; +namespace http = beast::http; +namespace net = boost::asio; // Performs an HTTP GET and prints the response net::awaitable -do_session( - std::string host, - std::string port, - std::string target, - int version) +do_session(std::string host, std::string port, std::string target, int version) { - // These objects perform our I/O - // They use an executor with a default completion token of use_awaitable - // This makes our code easy, but will use exceptions as the default error handling, - // i.e. if the connection drops, we might see an exception. - auto resolver = net::use_awaitable.as_default_on(tcp::resolver(co_await net::this_coro::executor)); - auto stream = net::use_awaitable.as_default_on(beast::tcp_stream(co_await net::this_coro::executor)); + auto executor = co_await net::this_coro::executor; + auto resolver = net::ip::tcp::resolver{ executor }; + auto stream = beast::tcp_stream{ executor }; // Look up the domain name auto const results = co_await resolver.async_resolve(host, port); @@ -62,7 +48,7 @@ do_session( co_await stream.async_connect(results); // Set up an HTTP GET request message - http::request req{http::verb::get, target, version}; + http::request req{ http::verb::get, target, version }; req.set(http::field::host, host); req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); @@ -73,20 +59,20 @@ do_session( co_await http::async_write(stream, req); // This buffer is used for reading and must be persisted - beast::flat_buffer b; + beast::flat_buffer buffer; // Declare a container to hold the response http::response res; // Receive the HTTP response - co_await http::async_read(stream, b, res); + co_await http::async_read(stream, buffer, res); // Write the message to standard out std::cout << res << std::endl; // Gracefully close the socket beast::error_code ec; - stream.socket().shutdown(tcp::socket::shutdown_both, ec); + stream.socket().shutdown(net::ip::tcp::socket::shutdown_both, ec); // not_connected happens sometimes // so don't bother reporting it. @@ -99,58 +85,61 @@ do_session( //------------------------------------------------------------------------------ -int main(int argc, char** argv) +int +main(int argc, char** argv) { - // Check command line arguments. - if(argc != 4 && argc != 5) + try { - std::cerr << - "Usage: http-client-awaitable []\n" << - "Example:\n" << - " http-client-awaitable www.example.com 80 /\n" << - " http-client-awaitable www.example.com 80 / 1.0\n"; + // Check command line arguments. + if(argc != 4 && argc != 5) + { + std::cerr << "Usage: http-client-awaitable []\n" + << "Example:\n" + << " http-client-awaitable www.example.com 80 /\n" + << " http-client-awaitable www.example.com 80 / 1.0\n"; + return EXIT_FAILURE; + } + auto const host = argv[1]; + auto const port = argv[2]; + auto const target = argv[3]; + auto const version = + argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11; + + // The io_context is required for all I/O + net::io_context ioc; + + // Launch the asynchronous operation + net::co_spawn( + ioc, + do_session(host, port, target, version), + // If the awaitable exists with an exception, it gets delivered here + // as `e`. This can happen for regular errors, such as connection + // drops. + [](std::exception_ptr e) + { + if(e) + std::rethrow_exception(e); + }); + + // Run the I/O service. The call will return when + // the get operation is complete. + ioc.run(); + } + catch(std::exception const& e) + { + std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } - auto const host = argv[1]; - auto const port = argv[2]; - auto const target = argv[3]; - int version = argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11; - - // The io_context is required for all I/O - net::io_context ioc; - - // Launch the asynchronous operation - net::co_spawn( - ioc, - do_session(host, port, target, version), - // If the awaitable exists with an exception, it gets delivered here as `e`. - // This can happen for regular errors, such as connection drops. - [](std::exception_ptr e) - { - if (e) - try - { - std::rethrow_exception(e); - } - catch(std::exception & ex) - { - std::cerr << "Error: " << ex.what() << "\n"; - } - }); - - // Run the I/O service. The call will return when - // the get operation is complete. - ioc.run(); - return EXIT_SUCCESS; } #else -int main(int, char * []) +int +main(int, char*[]) { std::printf("awaitables require C++20\n"); - return 1; + return EXIT_FAILURE; } #endif diff --git a/example/http/server/awaitable/http_server_awaitable.cpp b/example/http/server/awaitable/http_server_awaitable.cpp index 47ae01dc..0933bd54 100644 --- a/example/http/server/awaitable/http_server_awaitable.cpp +++ b/example/http/server/awaitable/http_server_awaitable.cpp @@ -13,14 +13,15 @@ // //------------------------------------------------------------------------------ +#include +#include +#include +#include #include #include #include -#include -#include -#include -#include #include + #include #include #include @@ -31,13 +32,9 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace net = boost::asio; // from -using tcp = boost::asio::ip::tcp; // from - -using tcp_stream = typename beast::tcp_stream::rebind_executor< - net::use_awaitable_t<>::executor_with_default>::other; +namespace beast = boost::beast; +namespace http = beast::http; +namespace net = boost::asio; // Return a reasonable mime type based on the extension of a file. beast::string_view @@ -206,146 +203,123 @@ handle_request( return res; } -//------------------------------------------------------------------------------ - - // Handles an HTTP server connection net::awaitable do_session( - tcp_stream stream, + beast::tcp_stream stream, std::shared_ptr doc_root) { // This buffer is required to persist across reads beast::flat_buffer buffer; - // This lambda is used to send messages - try + for(;;) { - for(;;) + // Set the timeout. + stream.expires_after(std::chrono::seconds(30)); + + // Read a request + http::request req; + co_await http::async_read(stream, buffer, req); + + // Handle the request + http::message_generator msg = handle_request(*doc_root, std::move(req)); + + // Determine if we should close the connection + bool keep_alive = msg.keep_alive(); + + // Send the response + co_await beast::async_write(stream, std::move(msg)); + + if(!keep_alive) { - // Set the timeout. - stream.expires_after(std::chrono::seconds(30)); - - // Read a request - http::request req; - co_await http::async_read(stream, buffer, req); - - // Handle the request - http::message_generator msg = - handle_request(*doc_root, std::move(req)); - - // Determine if we should close the connection - bool keep_alive = msg.keep_alive(); - - // Send the response - co_await beast::async_write(stream, std::move(msg), net::use_awaitable); - - if(! keep_alive) - { - // This means we should close the connection, usually because - // the response indicated the "Connection: close" semantic. - break; - } + // This means we should close the connection, usually because + // the response indicated the "Connection: close" semantic. + break; } } - catch (boost::system::system_error & se) - { - if (se.code() != http::error::end_of_stream ) - throw ; - } // Send a TCP shutdown - beast::error_code ec; - stream.socket().shutdown(tcp::socket::shutdown_send, ec); + stream.socket().shutdown(net::ip::tcp::socket::shutdown_send); // At this point the connection is closed gracefully // we ignore the error because the client might have // dropped the connection already. } -//------------------------------------------------------------------------------ - // Accepts incoming connections and launches the sessions net::awaitable -do_listen( - tcp::endpoint endpoint, - std::shared_ptr doc_root) +do_listen(net::ip::tcp::endpoint endpoint, std::shared_ptr doc_root) { - // Open the acceptor - auto acceptor = net::use_awaitable.as_default_on(tcp::acceptor(co_await net::this_coro::executor)); - acceptor.open(endpoint.protocol()); - - // Allow address reuse - acceptor.set_option(net::socket_base::reuse_address(true)); - - // Bind to the server address - acceptor.bind(endpoint); - - // Start listening for connections - acceptor.listen(net::socket_base::max_listen_connections); + auto executor = co_await net::this_coro::executor; + auto acceptor = net::ip::tcp::acceptor{ executor, endpoint }; for(;;) - boost::asio::co_spawn( - acceptor.get_executor(), - do_session(tcp_stream(co_await acceptor.async_accept()), doc_root), - [](std::exception_ptr e) + { + net::co_spawn( + executor, + do_session( + beast::tcp_stream{ co_await acceptor.async_accept() }, + doc_root), + [](std::exception_ptr e) + { + if(e) { - if (e) - try - { - std::rethrow_exception(e); - } - catch (std::exception &e) { - std::cerr << "Error in session: " << e.what() << "\n"; - } - }); - + try + { + std::rethrow_exception(e); + } + catch(std::exception const& e) + { + std::cerr << "Error in session: " << e.what() << "\n"; + } + } + }); + } } -int main(int argc, char* argv[]) +int +main(int argc, char* argv[]) { // Check command line arguments. - if (argc != 5) + if(argc != 5) { - std::cerr << - "Usage: http-server-awaitable
\n" << - "Example:\n" << - " http-server-awaitable 0.0.0.0 8080 . 1\n"; + std::cerr << "Usage: http-server-awaitable
\n" + << "Example:\n" + << " http-server-awaitable 0.0.0.0 8080 . 1\n"; return EXIT_FAILURE; } - auto const address = net::ip::make_address(argv[1]); - auto const port = static_cast(std::atoi(argv[2])); + auto const address = net::ip::make_address(argv[1]); + auto const port = static_cast(std::atoi(argv[2])); auto const doc_root = std::make_shared(argv[3]); - auto const threads = std::max(1, std::atoi(argv[4])); + auto const threads = std::max(1, std::atoi(argv[4])); // The io_context is required for all I/O - net::io_context ioc{threads}; + net::io_context ioc{ threads }; // Spawn a listening port - boost::asio::co_spawn(ioc, - do_listen(tcp::endpoint{address, port}, doc_root), - [](std::exception_ptr e) - { - if (e) - try - { - std::rethrow_exception(e); - } - catch(std::exception & e) - { - std::cerr << "Error in acceptor: " << e.what() << "\n"; - } - }); + net::co_spawn( + ioc, + do_listen(net::ip::tcp::endpoint{ address, port }, doc_root), + [](std::exception_ptr e) + { + if(e) + { + try + { + std::rethrow_exception(e); + } + catch(std::exception const& e) + { + std::cerr << "Error: " << e.what() << std::endl; + } + } + }); // Run the I/O service on the requested number of threads std::vector v; v.reserve(threads - 1); for(auto i = threads - 1; i > 0; --i) - v.emplace_back( - [&ioc] - { - ioc.run(); - }); + v.emplace_back([&ioc] { ioc.run(); }); ioc.run(); return EXIT_SUCCESS; @@ -353,10 +327,11 @@ int main(int argc, char* argv[]) #else -int main(int, char * []) +int +main(int, char*[]) { std::printf("awaitables require C++20\n"); - return 1; + return EXIT_FAILURE; } -#endif +#endif \ No newline at end of file diff --git a/example/websocket/client/awaitable/websocket_client_awaitable.cpp b/example/websocket/client/awaitable/websocket_client_awaitable.cpp index 211e7663..684d2978 100644 --- a/example/websocket/client/awaitable/websocket_client_awaitable.cpp +++ b/example/websocket/client/awaitable/websocket_client_awaitable.cpp @@ -13,49 +13,39 @@ // //------------------------------------------------------------------------------ -#include -#include -#include -#include -#include -#include #include #include -#include -#include +#include +#include +#include + +#include +#include +#include #if defined(BOOST_ASIO_HAS_CO_AWAIT) - -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace websocket = beast::websocket; // from -namespace net = boost::asio; // from -using tcp = boost::asio::ip::tcp; // from - -//------------------------------------------------------------------------------ +namespace beast = boost::beast; +namespace http = beast::http; +namespace websocket = beast::websocket; +namespace net = boost::asio; // Sends a WebSocket message and prints the response net::awaitable -do_session( - std::string host, - std::string port, - std::string text) +do_session(std::string host, std::string port, std::string text) { - // These objects perform our I/O - auto resolver = net::use_awaitable.as_default_on( - tcp::resolver(co_await net::this_coro::executor)); - auto ws = net::use_awaitable.as_default_on( - websocket::stream(co_await net::this_coro::executor)); + auto executor = co_await net::this_coro::executor; + auto resolver = net::ip::tcp::resolver{ executor }; + auto stream = websocket::stream{ executor }; // Look up the domain name auto const results = co_await resolver.async_resolve(host, port); // Set a timeout on the operation - beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30)); + beast::get_lowest_layer(stream).expires_after(std::chrono::seconds(30)); // Make the connection on the IP address we get from a lookup - auto ep = co_await beast::get_lowest_layer(ws).async_connect(results); + auto ep = co_await beast::get_lowest_layer(stream).async_connect(results); // Update the host_ string. This will provide the value of the // Host HTTP header during the WebSocket handshake. @@ -64,36 +54,36 @@ do_session( // Turn off the timeout on the tcp_stream, because // the websocket stream has its own timeout system. - beast::get_lowest_layer(ws).expires_never(); + beast::get_lowest_layer(stream).expires_never(); // Set suggested timeout settings for the websocket - ws.set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::client)); + stream.set_option( + websocket::stream_base::timeout::suggested(beast::role_type::client)); // Set a decorator to change the User-Agent of the handshake - ws.set_option(websocket::stream_base::decorator( + stream.set_option(websocket::stream_base::decorator( [](websocket::request_type& req) { - req.set(http::field::user_agent, + req.set( + http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro"); })); // Perform the websocket handshake - co_await ws.async_handshake(host, "/"); + co_await stream.async_handshake(host, "/"); // Send the message - co_await ws.async_write(net::buffer(std::string(text))); + co_await stream.async_write(net::buffer(text)); // This buffer will hold the incoming message beast::flat_buffer buffer; // Read a message into our buffer - co_await ws.async_read(buffer); + co_await stream.async_read(buffer); // Close the WebSocket connection - co_await ws.async_close(websocket::close_code::normal); + co_await stream.async_close(websocket::close_code::normal); // If we get here then the connection is closed gracefully @@ -103,53 +93,56 @@ do_session( //------------------------------------------------------------------------------ -int main(int argc, char** argv) +int +main(int argc, char** argv) { - // Check command line arguments. - if(argc != 4) + try { - std::cerr << - "Usage: websocket-client-awaitable \n" << - "Example:\n" << - " websocket-client-awaitable echo.websocket.org 80 \"Hello, world!\"\n"; + // Check command line arguments. + if(argc != 4) + { + std::cerr + << "Usage: websocket-client-awaitable \n" + << "Example:\n" + << " websocket-client-awaitable echo.websocket.org 80 \"Hello, world!\"\n"; + return EXIT_FAILURE; + } + auto const host = argv[1]; + auto const port = argv[2]; + auto const text = argv[3]; + + // The io_context is required for all I/O + net::io_context ioc; + + // Launch the asynchronous operation + net::co_spawn( + ioc, + do_session(host, port, text), + [](std::exception_ptr e) + { + if(e) + std::rethrow_exception(e); + }); + + // Run the I/O service. The call will return when + // the socket is closed. + ioc.run(); + } + catch(std::exception const& e) + { + std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } - auto const host = argv[1]; - auto const port = argv[2]; - auto const text = argv[3]; - - // The io_context is required for all I/O - net::io_context ioc; - - // Launch the asynchronous operation - net::co_spawn(ioc, - do_session(host, port, text), - [](std::exception_ptr e) - { - if (e) - try - { - std::rethrow_exception(e); - } - catch(std::exception & e) - { - std::cerr << "Error: " << e.what() << "\n"; - } - }); - - // Run the I/O service. The call will return when - // the socket is closed. - ioc.run(); - return EXIT_SUCCESS; } #else -int main(int, char * []) +int +main(int, char*[]) { std::printf("awaitables require C++20\n"); - return 1; + return EXIT_FAILURE; } #endif diff --git a/example/websocket/server/awaitable/websocket_server_awaitable.cpp b/example/websocket/server/awaitable/websocket_server_awaitable.cpp index 3baa2056..9d021f26 100644 --- a/example/websocket/server/awaitable/websocket_server_awaitable.cpp +++ b/example/websocket/server/awaitable/websocket_server_awaitable.cpp @@ -13,162 +13,141 @@ // //------------------------------------------------------------------------------ -#include -#include +#include #include #include -#include -#include +#include +#include +#include + #include #include -#include #include -#include #include #include #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace websocket = beast::websocket; // from -namespace net = boost::asio; // from -using tcp = boost::asio::ip::tcp; // from - -using stream = websocket::stream< - typename beast::tcp_stream::rebind_executor< - typename net::use_awaitable_t<>::executor_with_default>::other>; - -//------------------------------------------------------------------------------ +namespace beast = boost::beast; +namespace http = beast::http; +namespace websocket = beast::websocket; +namespace net = boost::asio; // Echoes back all received WebSocket messages net::awaitable -do_session(stream ws) +do_session(websocket::stream stream) { // Set suggested timeout settings for the websocket - ws.set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::server)); + stream.set_option( + websocket::stream_base::timeout::suggested(beast::role_type::server)); // Set a decorator to change the Server of the handshake - ws.set_option(websocket::stream_base::decorator( + stream.set_option(websocket::stream_base::decorator( [](websocket::response_type& res) { - res.set(http::field::server, + res.set( + http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-coro"); })); // Accept the websocket handshake - co_await ws.async_accept(); + co_await stream.async_accept(); - try + for(;;) { - for(;;) - { - // This buffer will hold the incoming message - beast::flat_buffer buffer; + // This buffer will hold the incoming message + beast::flat_buffer buffer; - // Read a message - co_await ws.async_read(buffer); + // Read a message + auto [ec, _] = co_await stream.async_read(buffer, net::as_tuple); - // Echo the message back - ws.text(ws.got_text()); - co_await ws.async_write(buffer.data()); - } - } - catch(const boost::system::system_error & se) - { - if (se.code() != websocket::error::closed) - throw; + if(ec == websocket::error::closed) + co_return; + + if(ec) + throw boost::system::system_error{ ec }; + + // Echo the message back + stream.text(stream.got_text()); + co_await stream.async_write(buffer.data()); } } -//------------------------------------------------------------------------------ - // Accepts incoming connections and launches the sessions net::awaitable -do_listen( - tcp::endpoint endpoint) +do_listen(net::ip::tcp::endpoint endpoint) { - - // Open the acceptor - auto acceptor = net::use_awaitable.as_default_on(tcp::acceptor(co_await net::this_coro::executor)); - acceptor.open(endpoint.protocol()); - - // Allow address reuse - acceptor.set_option(net::socket_base::reuse_address(true)); - - // Bind to the server address - acceptor.bind(endpoint); - - // Start listening for connections - acceptor.listen(net::socket_base::max_listen_connections); + auto executor = co_await net::this_coro::executor; + auto acceptor = net::ip::tcp::acceptor{ executor, endpoint }; for(;;) - boost::asio::co_spawn( - acceptor.get_executor(), - do_session(stream(co_await acceptor.async_accept())), - [](std::exception_ptr e) + { + net::co_spawn( + executor, + do_session(websocket::stream{ + co_await acceptor.async_accept() }), + [](std::exception_ptr e) + { + if(e) { - if (e) + try { - try - { - std::rethrow_exception(e); - } - catch (std::exception &e) { - std::cerr << "Error in session: " << e.what() << "\n"; - } + std::rethrow_exception(e); } - }); + catch(std::exception& e) + { + std::cerr << "Error in session: " << e.what() << "\n"; + } + } + }); + } } -int main(int argc, char* argv[]) +int +main(int argc, char* argv[]) { // Check command line arguments. - if (argc != 4) + if(argc != 4) { - std::cerr << - "Usage: websocket-server-awaitable
\n" << - "Example:\n" << - " websocket-server-awaitable 0.0.0.0 8080 1\n"; + std::cerr + << "Usage: websocket-server-awaitable
\n" + << "Example:\n" + << " websocket-server-awaitable 0.0.0.0 8080 1\n"; return EXIT_FAILURE; } auto const address = net::ip::make_address(argv[1]); - auto const port = static_cast(std::atoi(argv[2])); + auto const port = static_cast(std::atoi(argv[2])); auto const threads = std::max(1, std::atoi(argv[3])); // The io_context is required for all I/O net::io_context ioc(threads); // Spawn a listening port - boost::asio::co_spawn( - ioc, - do_listen(tcp::endpoint{address, port}), - [](std::exception_ptr e) - { - if (e) - try - { - std::rethrow_exception(e); - } - catch(std::exception & e) - { - std::cerr << "Error: " << e.what() << "\n"; - } - }); + net::co_spawn( + ioc, + do_listen(net::ip::tcp::endpoint{ address, port }), + [](std::exception_ptr e) + { + if(e) + { + try + { + std::rethrow_exception(e); + } + catch(std::exception const& e) + { + std::cerr << "Error: " << e.what() << std::endl; + } + } + }); // Run the I/O service on the requested number of threads std::vector v; v.reserve(threads - 1); for(auto i = threads - 1; i > 0; --i) - v.emplace_back( - [&ioc] - { - ioc.run(); - }); + v.emplace_back([&ioc] { ioc.run(); }); ioc.run(); return EXIT_SUCCESS; @@ -176,10 +155,11 @@ int main(int argc, char* argv[]) #else -int main(int, char * []) +int +main(int, char*[]) { std::printf("awaitables require C++20\n"); - return 1; + return EXIT_FAILURE; } -#endif \ No newline at end of file +#endif