diff --git a/include/async_mqtt5/detail/cancellable_handler.hpp b/include/async_mqtt5/detail/cancellable_handler.hpp index efa09ef..cbfee05 100644 --- a/include/async_mqtt5/detail/cancellable_handler.hpp +++ b/include/async_mqtt5/detail/cancellable_handler.hpp @@ -56,7 +56,7 @@ public: return _cancellation_state.slot(); } - asio::cancellation_type_t cancelled() const { + asio::cancellation_type_t cancelled() const noexcept { return _cancellation_state.cancelled(); } diff --git a/include/async_mqtt5/detail/rebind_executor.hpp b/include/async_mqtt5/detail/rebind_executor.hpp new file mode 100644 index 0000000..9210120 --- /dev/null +++ b/include/async_mqtt5/detail/rebind_executor.hpp @@ -0,0 +1,40 @@ +// +// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASYNC_MQTT5_REBIND_EXECUTOR_HPP +#define ASYNC_MQTT5_REBIND_EXECUTOR_HPP + +#include + +#include + +namespace async_mqtt5::detail { + +namespace asio = boost::asio; + +template +struct rebind_executor { + using other = typename Stream::template rebind_executor::other; +}; + +// asio::ssl::stream does not define a rebind_executor member type +template +struct rebind_executor, Executor> { + using other = typename asio::ssl::stream::other>; +}; + +template +struct rebind_executor>, Executor> { + using other = typename boost::beast::websocket::stream< + asio::ssl::stream::other>, + boost::beast::websocket::stream>::is_deflate_supported::value + >; +}; + +} // end namespace async_mqtt5::detail + +#endif // !ASYNC_MQTT5_REBIND_EXECUTOR_HPP diff --git a/include/async_mqtt5/impl/connect_op.hpp b/include/async_mqtt5/impl/connect_op.hpp index 6ca8071..36e9787 100644 --- a/include/async_mqtt5/impl/connect_op.hpp +++ b/include/async_mqtt5/impl/connect_op.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -248,7 +249,7 @@ public: auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz); asio::async_read( - _stream, buff, + _stream, buff, asio::transfer_all(), asio::prepend(std::move(*this), on_fixed_header {}) ); } @@ -287,7 +288,7 @@ public: auto last = first + *varlen; asio::async_read( - _stream, buff, + _stream, buff, asio::transfer_all(), asio::prepend( asio::append(std::move(*this), code, first, last), on_read_packet {} @@ -407,7 +408,7 @@ public: auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz); asio::async_read( - _stream, buff, + _stream, buff, asio::transfer_all(), asio::prepend(std::move(*this), on_fixed_header {}) ); } diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index 6c1c172..ea31817 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -8,11 +8,14 @@ #ifndef ASYNC_MQTT5_MQTT_CLIENT_HPP #define ASYNC_MQTT5_MQTT_CLIENT_HPP +#include #include #include #include +#include + #include #include #include @@ -46,6 +49,17 @@ class mqtt_client { public: /// The executor type associated with the client. using executor_type = typename StreamType::executor_type; + + /// Rebinds the client type to another executor. + template + struct rebind_executor { + /// The client type when rebound to the specified executor. + using other = mqtt_client< + typename detail::rebind_executor::other, + TlsContext + >; + }; + private: using stream_type = StreamType; using tls_context_type = TlsContext; @@ -182,9 +196,12 @@ public: * This asynchronous operation supports cancellation for the following \__CANCELLATION_TYPE\__ values:\n * - `cancellation_type::terminal` - invokes \ref mqtt_client::cancel \n */ - template - decltype(auto) async_run(CompletionToken&& token) { - using Signature = void(error_code); + template < + typename CompletionToken = + typename asio::default_completion_token::type + > + decltype(auto) async_run(CompletionToken&& token = {}) { + using Signature = void (error_code); auto initiation = [] (auto handler, const impl_type& impl) { auto ex = asio::get_associated_executor(handler, impl->get_executor()); @@ -484,11 +501,14 @@ public: * - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__PUBLISH\__ packet \n * */ - template + template ::type + > decltype(auto) async_publish( std::string topic, std::string payload, retain_e retain, const publish_props& props, - CompletionToken&& token + CompletionToken&& token = {} ) { using Signature = detail::on_publish_signature; @@ -567,11 +587,14 @@ public: * - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__SUBSCRIBE\__ packet \n * */ - template + template < + typename CompletionToken = + typename asio::default_completion_token::type + > decltype(auto) async_subscribe( const std::vector& topics, const subscribe_props& props, - CompletionToken&& token + CompletionToken&& token = {} ) { using Signature = void ( error_code, std::vector, suback_props @@ -592,7 +615,7 @@ public: /** * \brief Send a \__SUBSCRIBE\__ packet to Broker to create a subscription - * to one Topics of interest. + * to one Topic of interest. * * \details After the subscription has been established, the Broker will send * \__PUBLISH\__ packets to the Client to forward Application Messages that were published @@ -645,10 +668,13 @@ public: * - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__SUBSCRIBE\__ packet \n * */ - template + template < + typename CompletionToken = + typename asio::default_completion_token::type + > decltype(auto) async_subscribe( const subscribe_topic& topic, const subscribe_props& props, - CompletionToken&& token + CompletionToken&& token = {} ) { return async_subscribe( std::vector { topic }, props, @@ -708,10 +734,13 @@ public: * - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__UNSUBSCRIBE\__ packet \n * */ - template + template < + typename CompletionToken = + typename asio::default_completion_token::type + > decltype(auto) async_unsubscribe( const std::vector& topics, const unsubscribe_props& props, - CompletionToken&& token + CompletionToken&& token = {} ) { using Signature = void ( error_code, std::vector, unsuback_props @@ -782,10 +811,13 @@ public: * - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__UNSUBSCRIBE\__ packet \n * */ - template + template < + typename CompletionToken = + typename asio::default_completion_token::type + > decltype(auto) async_unsubscribe( const std::string& topic, const unsubscribe_props& props, - CompletionToken&& token + CompletionToken&& token = {} ) { return async_unsubscribe( std::vector { topic }, props, @@ -842,8 +874,11 @@ public: * - `cancellation_type::partial` \n * - `cancellation_type::total` \n */ - template - decltype(auto) async_receive(CompletionToken&& token) { + template < + typename CompletionToken = + typename asio::default_completion_token::type + > + decltype(auto) async_receive(CompletionToken&& token = {}) { // Sig = void (error_code, std::string, std::string, publish_props) return _impl->async_channel_receive( std::forward(token) @@ -900,10 +935,13 @@ public: * - `cancellation_type::terminal` - invokes \ref mqtt_client::cancel \n * */ - template + template < + typename CompletionToken = + typename asio::default_completion_token::type + > decltype(auto) async_disconnect( disconnect_rc_e reason_code, const disconnect_props& props, - CompletionToken&& token + CompletionToken&& token = {} ) { auto impl = _impl; _impl = impl->dup(); @@ -960,8 +998,11 @@ public: * This asynchronous operation supports cancellation for the following \__CANCELLATION_TYPE\__ values:\n * - `cancellation_type::terminal` - invokes \ref mqtt_client::cancel \n */ - template - decltype(auto) async_disconnect(CompletionToken&& token) { + template < + typename CompletionToken = + typename asio::default_completion_token::type + > + decltype(auto) async_disconnect(CompletionToken&& token = {}) { return async_disconnect( disconnect_rc_e::normal_disconnection, disconnect_props {}, std::forward(token) diff --git a/test/src/run_tests.cpp b/test/src/run_tests.cpp index 221c03b..6971e19 100644 --- a/test/src/run_tests.cpp +++ b/test/src/run_tests.cpp @@ -5,6 +5,8 @@ // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) // +#define BOOST_TEST_MODULE async_mqtt5_tests + #include #include diff --git a/test/unit/default_completion_tokens.cpp b/test/unit/default_completion_tokens.cpp new file mode 100644 index 0000000..b2bd1d1 --- /dev/null +++ b/test/unit/default_completion_tokens.cpp @@ -0,0 +1,105 @@ +// +// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#ifdef BOOST_ASIO_HAS_CO_AWAIT + +#include +#include // std::monostate +#include + +#include + +#include +#include +#include +#include // async_teardown for asio::ssl::socket + +#include + +namespace async_mqtt5 { + +namespace asio = boost::asio; + +template +struct tls_handshake_type> { + static constexpr auto client = asio::ssl::stream_base::client; + static constexpr auto server = asio::ssl::stream_base::server; +}; + +template +void assign_tls_sni( + const authority_path& /* ap */, + asio::ssl::context& /* ctx */, + asio::ssl::stream& /* stream */ +) {} + +namespace test { + +// the following code needs to compile + +template +asio::awaitable test_default_completion_tokens_impl( + TlsContextType tls_context = {} +) { + asio::io_context ioc; + + using client_type = asio::use_awaitable_t<>::as_default_on_t< + mqtt_client + >; + client_type c(ioc, std::move(tls_context)); + + co_await c.async_run(); + + auto pub_props = publish_props {}; + co_await c.template async_publish( + "topic", "payload", retain_e::no, pub_props + ); + + auto sub_topic = subscribe_topic {}; + auto sub_topics = std::vector { sub_topic }; + auto sub_props = subscribe_props {}; + co_await c.async_subscribe(sub_topics, sub_props); + co_await c.async_subscribe(sub_topic, sub_props); + + auto unsub_topics = std::vector {}; + auto unsub_props = unsubscribe_props {}; + co_await c.async_unsubscribe(unsub_topics, unsub_props); + co_await c.async_unsubscribe("topic", unsub_props); + + co_await c.async_receive(); + + auto dc_props = disconnect_props {}; + co_await c.async_disconnect(); + co_await c.async_disconnect(disconnect_rc_e::normal_disconnection, dc_props); +} + +asio::awaitable test_default_completion_tokens() { + co_await test_default_completion_tokens_impl(); + + co_await test_default_completion_tokens_impl< + asio::ssl::stream, + asio::ssl::context + >(asio::ssl::context(asio::ssl::context::tls_client)); + + co_await test_default_completion_tokens_impl< + boost::beast::websocket::stream + >(); + + co_await test_default_completion_tokens_impl< + boost::beast::websocket::stream>, + asio::ssl::context + >(asio::ssl::context(asio::ssl::context::tls_client)); +} + +} // end namespace test + +} // end namespace async_mqtt5 + +#endif // BOOST_ASIO_HAS_CO_AWAIT