diff --git a/doc/qbk/00_main.qbk b/doc/qbk/00_main.qbk index 5f0ae6b..b6d0391 100644 --- a/doc/qbk/00_main.qbk +++ b/doc/qbk/00_main.qbk @@ -40,6 +40,7 @@ [def __POST__ [@boost:doc/html/boost_asio/reference/post.html `boost::asio::post`]] [def __CO_SPAWN__ [@boost:/doc/html/boost_asio/reference/co_spawn.html `boost::asio::co_spawn`]] [def __USE_AWAITABLE__ [@boost:/doc/html/boost_asio/reference/use_awaitable.html `boost::asio::use_awaitable`]] +[def __USE_FUTURE__ [@boost:/doc/html/boost_asio/reference/use_future.html `boost::asio::use_future`]] [def __Self__ [reflink2 mqtt_client `mqtt_client`]] @@ -92,6 +93,9 @@ [def __REASON_CODES__ [reflink2 Reason_codes `Reason Codes`]] [def __ERROR_HANDLING__ [reflink2 Error_handling `Error handling`]] +[def __EXAMPLE_CALLBACK__ [link async_mqtt5.async_examples.callbacks Async functions with callbacks]] +[def __EXAMPLE_COROUTINE__ [link async_mqtt5.async_examples.cpp20_coroutines Async functions with C++20 coroutines]] +[def __EXAMPLE_FUTURE__ [link async_mqtt5.async_examples.futures Async functions with futures]] [include 01_intro.qbk] [include 02_examples.qbk] diff --git a/doc/qbk/02_examples.qbk b/doc/qbk/02_examples.qbk index d95f403..992fe41 100644 --- a/doc/qbk/02_examples.qbk +++ b/doc/qbk/02_examples.qbk @@ -12,6 +12,7 @@ functions with different __CompletionToken__. # [link async_mqtt5.async_examples.callbacks Async functions with callbacks] # [link async_mqtt5.async_examples.cpp20_coroutines Async functions with C++20 coroutines] +# [link async_mqtt5.async_examples.futures Async functions with futures] [section:callbacks Async functions with callbacks] @@ -29,4 +30,11 @@ using __USE_AWAITABLE__ and __CO_SPAWN__. [cpp20_coroutines_examples] [endsect] +[section:futures Async functions with futures] +This example demonstrates how to use __Self__ asynchrous functions with __USE_FUTURE__ +completion token. +[import ../../example/futures.cpp] +[futures_examples] +[endsect] + [endsect] diff --git a/doc/reference.xsl b/doc/reference.xsl index 69a29c6..3ea5940 100644 --- a/doc/reference.xsl +++ b/doc/reference.xsl @@ -1349,7 +1349,7 @@ static - = + = ; diff --git a/example/cpp20_coroutines.cpp b/example/cpp20_coroutines.cpp index d21df63..24ca02a 100644 --- a/example/cpp20_coroutines.cpp +++ b/example/cpp20_coroutines.cpp @@ -22,7 +22,7 @@ using client_type = async_mqtt5::mqtt_client; * * In this example, each asynchronous function is invoked with boost::asio::use_awaitable completion token. * When using this completion token, co_await will throw exceptions instead of returning an error code. - * If you do not wish to throw exceptions, refer to the following nothrow_awaitable and nothrow_coroutine() example. + * If you do not wish to throw exceptions, refer to the following use_nothrow_awaitable and nothrow_coroutine() example. */ asio::awaitable coroutine(client_type& client) { // Publish an Application Message with QoS 0. @@ -75,14 +75,15 @@ asio::awaitable coroutine(client_type& client) { // Note: the coroutine will be suspended until an Application Message is ready to be received // or an error has occurred. In theory, the coroutine could be suspended indefinitely. // Avoid calling this if you have not successfully subscribed to a Topic. - auto [topic, payload, publish_props] = co_await client.async_receive(asio::use_awaitable); + if (!sub_codes[0]) + auto [topic, payload, publish_props] = co_await client.async_receive(asio::use_awaitable); // Unsubscribe from the Topic. // The handler signature for this function is void (error_code, std::vector, unsuback_props). // With asio::use_awaitable as a completion token, the co_await // will return std::vector and unsuback_props. auto [unsub_codes, unsub_props] = co_await client.async_unsubscribe( - std::vector{ "test/mqtt-test" }, async_mqtt5::unsubscribe_props {}, + "test/mqtt-test", async_mqtt5::unsubscribe_props {}, asio::use_awaitable ); @@ -103,13 +104,12 @@ asio::awaitable coroutine(client_type& client) { * will prevent co_await from throwing exceptions. Instead, co_await will return the error code * along with other values specified in the handler signature. */ -constexpr auto nothrow_awaitable = asio::as_tuple(asio::use_awaitable); +constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable); /** - * In this coroutine, each asynchronous function is called with nothrow_awaitable completion token. - * Unlike the asio::use_awaitable completion token, nothrow_awaitable does not throw an exception - * when an error has occurred. Instead, each co_await will return an error_code, similar to - * the behavior of using callbacks. + * In this coroutine, each asynchronous function is called with use_nothrow_awaitable completion token. + * Each co_await will return an error_code, similar to the behavior of using callbacks + * (see ``__EXAMPLE_CALLBACK__``). */ asio::awaitable nothrow_coroutine(client_type& client) { async_mqtt5::error_code ec; @@ -118,21 +118,21 @@ asio::awaitable nothrow_coroutine(client_type& client) { std::tie(ec) = co_await client.async_publish( "test/mqtt-test", "Hello world!", async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, - nothrow_awaitable + use_nothrow_awaitable ); async_mqtt5::puback_props puback_props; std::tie(ec, rc, puback_props) = co_await client.async_publish( "test/mqtt-test", "Hello world!", async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, - nothrow_awaitable + use_nothrow_awaitable ); async_mqtt5::pubcomp_props pubcomp_props; std::tie(ec, rc, pubcomp_props) = co_await client.async_publish( "test/mqtt-test", "Hello world!", async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, - nothrow_awaitable + use_nothrow_awaitable ); auto sub_topic = async_mqtt5::subscribe_topic{ @@ -147,23 +147,25 @@ asio::awaitable nothrow_coroutine(client_type& client) { std::vector rcs; async_mqtt5::suback_props suback_props; std::tie(ec, rcs, suback_props) = co_await client.async_subscribe( - sub_topic, async_mqtt5::subscribe_props {}, nothrow_awaitable + sub_topic, async_mqtt5::subscribe_props {}, use_nothrow_awaitable ); - std::string topic, payload; - async_mqtt5::publish_props publish_props; - std::tie(ec, topic, payload, publish_props) = co_await client.async_receive(nothrow_awaitable); + if (!rcs[0]) { + std::string topic, payload; + async_mqtt5::publish_props publish_props; + std::tie(ec, topic, payload, publish_props) = co_await client.async_receive(use_nothrow_awaitable); + } async_mqtt5::unsuback_props unsuback_props; std::tie(ec, rcs, unsuback_props) = co_await client.async_unsubscribe( std::vector{ "test/mqtt-test" }, async_mqtt5::unsubscribe_props {}, - nothrow_awaitable + use_nothrow_awaitable ); std::tie(ec) = co_await client.async_disconnect( async_mqtt5::disconnect_rc_e::disconnect_with_will_message, async_mqtt5::disconnect_props {}, - nothrow_awaitable + use_nothrow_awaitable ); co_return; diff --git a/example/futures.cpp b/example/futures.cpp new file mode 100644 index 0000000..0e0a8b2 --- /dev/null +++ b/example/futures.cpp @@ -0,0 +1,121 @@ +//[futures_examples + +#include +#include + +#include + +#include + +#include +#include + +namespace asio = boost::asio; + +using stream_type = asio::ip::tcp::socket; +using client_type = async_mqtt5::mqtt_client; + +/** + * This function is a reference on how to use the mqtt_client with ``__USE_FUTURE__`` + * as the completion token. Each get() call on std::future will block the current + * thread and wait until the future has a valid result. That is why it is essential + * to ensure that the execution context is running in more than one thread. + */ +void run_with_future(client_type& client) { + // Just like the asio::use_awaitable completion token + // (see ``__EXAMPLE_COROUTINE__``), the ``__USE_FUTURE__`` completion token + // will not return the error_code. Instead, it will throw an exception if an error has occurred. + std::future pub_qos0_fut = + client.async_publish( + "test/mqtt-test", "Hello world!", + async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, + asio::use_future + ); + pub_qos0_fut.get(); // Blocking call! + + using qos1_fut_type = std::tuple; + std::future pub_qos1_fut = + client.async_publish( + "test/mqtt-test", "Hello world!", + async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, + asio::use_future + ); + auto [qos1_rc, puback_props] = pub_qos1_fut.get(); + std::cout << "Publish QoS 1 Reason Code: " << qos1_rc.message() << std::endl; + + using qos2_fut_type = std::tuple; + std::future pub_qos2_fut = + client.async_publish( + "test/mqtt-test", "Hello world!", + async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, + asio::use_future + ); + auto [qos2_rc, pubcomp_props] = pub_qos2_fut.get(); + std::cout << "Publish QoS 2 Reason Code: " << qos2_rc.message() << std::endl; + + auto sub_topic = async_mqtt5::subscribe_topic{ + "test/mqtt-test", { + async_mqtt5::qos_e::exactly_once, + async_mqtt5::subscribe_options::no_local_e::no, + async_mqtt5::subscribe_options::retain_as_published_e::retain, + async_mqtt5::subscribe_options::retain_handling_e::send + } + }; + + using sub_fut_type = std::tuple, async_mqtt5::suback_props>; + std::future sub_fut = client.async_subscribe( + sub_topic, async_mqtt5::subscribe_props {}, asio::use_future + ); + auto [sub_rcs, suback_props] = sub_fut.get(); + std::cout << "Subscribe Reason Code: " << sub_rcs[0].message() << std::endl; + + // Note: the get() call on async_receive future could block indefinitely if the mqtt_client + // failed to subscribe or there are no Application Messages to be received from the subcribed Topic! + if (!sub_rcs[0]) { + using rec_fut_type = std::tuple; + std::future rec_fut = client.async_receive(asio::use_future); + auto [topic, payload, publish_props] = rec_fut.get(); + std::cout << "Received message from Topic: " << topic << ", " << payload << std::endl; + } + + using unsub_fut_type = std::tuple, async_mqtt5::unsuback_props>; + std::future unsub_fut = client.async_unsubscribe( + "test/mqtt-test", async_mqtt5::unsubscribe_props {}, asio::use_future + ); + auto [unsub_rcs, unsuback_props] = unsub_fut.get(); + std::cout << "Unubscribe Reason Code: " << unsub_rcs[0].message() << std::endl; + + std::future dc_fut = client.async_disconnect(asio::use_future); + dc_fut.get(); + + return; +} + +int main(int argc, char** argv) { + // asio::io_context must be running in more than one thread! + constexpr auto thread_num = 2; + asio::io_context ioc(thread_num); + + std::vector threads; + threads.reserve(thread_num - 1); + + // Make an instance of mqtt_client. Establish a TCP connection with the Broker. + client_type c(ioc.get_executor(), ""); + + c.credentials("test-client", "", "") + .brokers("mqtt.mireo.local", 1883) + .run(); + + for (int i = 0; i < thread_num - 1; ++i) + threads.emplace_back([&ioc] { ioc.run(); }); + + run_with_future(c); + + ioc.run(); + + for (auto& t : threads) + if (t.joinable()) t.join(); +} + + +//]