[mqtt-client] mqtt_client with asio::use_future documented

Summary: related to T12804

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Differential Revision: https://repo.mireo.local/D26370
This commit is contained in:
Korina Šimičević
2023-11-02 12:50:08 +01:00
parent e7227d70ec
commit ed0e838d87
5 changed files with 153 additions and 18 deletions

View File

@ -40,6 +40,7 @@
[def __POST__ [@boost:doc/html/boost_asio/reference/post.html `boost::asio::post`]]
[def __CO_SPAWN__ [@boost:/doc/html/boost_asio/reference/co_spawn.html `boost::asio::co_spawn`]]
[def __USE_AWAITABLE__ [@boost:/doc/html/boost_asio/reference/use_awaitable.html `boost::asio::use_awaitable`]]
[def __USE_FUTURE__ [@boost:/doc/html/boost_asio/reference/use_future.html `boost::asio::use_future`]]
[def __Self__ [reflink2 mqtt_client `mqtt_client`]]
@ -92,6 +93,9 @@
[def __REASON_CODES__ [reflink2 Reason_codes `Reason Codes`]]
[def __ERROR_HANDLING__ [reflink2 Error_handling `Error handling`]]
[def __EXAMPLE_CALLBACK__ [link async_mqtt5.async_examples.callbacks Async functions with callbacks]]
[def __EXAMPLE_COROUTINE__ [link async_mqtt5.async_examples.cpp20_coroutines Async functions with C++20 coroutines]]
[def __EXAMPLE_FUTURE__ [link async_mqtt5.async_examples.futures Async functions with futures]]
[include 01_intro.qbk]
[include 02_examples.qbk]

View File

@ -12,6 +12,7 @@ functions with different __CompletionToken__.
# [link async_mqtt5.async_examples.callbacks Async functions with callbacks]
# [link async_mqtt5.async_examples.cpp20_coroutines Async functions with C++20 coroutines]
# [link async_mqtt5.async_examples.futures Async functions with futures]
[section:callbacks Async functions with callbacks]
@ -29,4 +30,11 @@ using __USE_AWAITABLE__ and __CO_SPAWN__.
[cpp20_coroutines_examples]
[endsect]
[section:futures Async functions with futures]
This example demonstrates how to use __Self__ asynchrous functions with __USE_FUTURE__
completion token.
[import ../../example/futures.cpp]
[futures_examples]
[endsect]
[endsect]

View File

@ -1349,7 +1349,7 @@
<xsl:text>
</xsl:text><xsl:if test="@static='yes'">static </xsl:if><xsl:value-of
select="type"/><xsl:text> </xsl:text><xsl:value-of select="name"/>
<xsl:if test="count(initializer) = 1"><xsl:text> =</xsl:text>
<xsl:if test="count(initializer) = 1"><xsl:text> = </xsl:text>
<xsl:value-of select="initializer"/></xsl:if>;
</xsl:template>

View File

@ -22,7 +22,7 @@ using client_type = async_mqtt5::mqtt_client<stream_type>;
*
* In this example, each asynchronous function is invoked with boost::asio::use_awaitable completion token.
* When using this completion token, co_await will throw exceptions instead of returning an error code.
* If you do not wish to throw exceptions, refer to the following nothrow_awaitable and nothrow_coroutine() example.
* If you do not wish to throw exceptions, refer to the following use_nothrow_awaitable and nothrow_coroutine() example.
*/
asio::awaitable<void> coroutine(client_type& client) {
// Publish an Application Message with QoS 0.
@ -75,6 +75,7 @@ asio::awaitable<void> coroutine(client_type& client) {
// Note: the coroutine will be suspended until an Application Message is ready to be received
// or an error has occurred. In theory, the coroutine could be suspended indefinitely.
// Avoid calling this if you have not successfully subscribed to a Topic.
if (!sub_codes[0])
auto [topic, payload, publish_props] = co_await client.async_receive(asio::use_awaitable);
// Unsubscribe from the Topic.
@ -82,7 +83,7 @@ asio::awaitable<void> coroutine(client_type& client) {
// With asio::use_awaitable as a completion token, the co_await
// will return std::vector<reason_code> and unsuback_props.
auto [unsub_codes, unsub_props] = co_await client.async_unsubscribe(
std::vector<std::string>{ "test/mqtt-test" }, async_mqtt5::unsubscribe_props {},
"test/mqtt-test", async_mqtt5::unsubscribe_props {},
asio::use_awaitable
);
@ -103,13 +104,12 @@ asio::awaitable<void> coroutine(client_type& client) {
* will prevent co_await from throwing exceptions. Instead, co_await will return the error code
* along with other values specified in the handler signature.
*/
constexpr auto nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
/**
* In this coroutine, each asynchronous function is called with nothrow_awaitable completion token.
* Unlike the asio::use_awaitable completion token, nothrow_awaitable does not throw an exception
* when an error has occurred. Instead, each co_await will return an error_code, similar to
* the behavior of using callbacks.
* In this coroutine, each asynchronous function is called with use_nothrow_awaitable completion token.
* Each co_await will return an error_code, similar to the behavior of using callbacks
* (see ``__EXAMPLE_CALLBACK__``).
*/
asio::awaitable<void> nothrow_coroutine(client_type& client) {
async_mqtt5::error_code ec;
@ -118,21 +118,21 @@ asio::awaitable<void> nothrow_coroutine(client_type& client) {
std::tie(ec) = co_await client.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
nothrow_awaitable
use_nothrow_awaitable
);
async_mqtt5::puback_props puback_props;
std::tie(ec, rc, puback_props) = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
nothrow_awaitable
use_nothrow_awaitable
);
async_mqtt5::pubcomp_props pubcomp_props;
std::tie(ec, rc, pubcomp_props) = co_await client.async_publish<async_mqtt5::qos_e::exactly_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
nothrow_awaitable
use_nothrow_awaitable
);
auto sub_topic = async_mqtt5::subscribe_topic{
@ -147,23 +147,25 @@ asio::awaitable<void> nothrow_coroutine(client_type& client) {
std::vector<async_mqtt5::reason_code> rcs;
async_mqtt5::suback_props suback_props;
std::tie(ec, rcs, suback_props) = co_await client.async_subscribe(
sub_topic, async_mqtt5::subscribe_props {}, nothrow_awaitable
sub_topic, async_mqtt5::subscribe_props {}, use_nothrow_awaitable
);
if (!rcs[0]) {
std::string topic, payload;
async_mqtt5::publish_props publish_props;
std::tie(ec, topic, payload, publish_props) = co_await client.async_receive(nothrow_awaitable);
std::tie(ec, topic, payload, publish_props) = co_await client.async_receive(use_nothrow_awaitable);
}
async_mqtt5::unsuback_props unsuback_props;
std::tie(ec, rcs, unsuback_props) = co_await client.async_unsubscribe(
std::vector<std::string>{ "test/mqtt-test" }, async_mqtt5::unsubscribe_props {},
nothrow_awaitable
use_nothrow_awaitable
);
std::tie(ec) = co_await client.async_disconnect(
async_mqtt5::disconnect_rc_e::disconnect_with_will_message,
async_mqtt5::disconnect_props {},
nothrow_awaitable
use_nothrow_awaitable
);
co_return;

121
example/futures.cpp Normal file
View File

@ -0,0 +1,121 @@
//[futures_examples
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
#include <iostream>
#include <thread>
namespace asio = boost::asio;
using stream_type = asio::ip::tcp::socket;
using client_type = async_mqtt5::mqtt_client<stream_type>;
/**
* This function is a reference on how to use the mqtt_client with ``__USE_FUTURE__``
* as the completion token. Each get() call on std::future will block the current
* thread and wait until the future has a valid result. That is why it is essential
* to ensure that the execution context is running in more than one thread.
*/
void run_with_future(client_type& client) {
// Just like the asio::use_awaitable completion token
// (see ``__EXAMPLE_COROUTINE__``), the ``__USE_FUTURE__`` completion token
// will not return the error_code. Instead, it will throw an exception if an error has occurred.
std::future<void> pub_qos0_fut =
client.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_future
);
pub_qos0_fut.get(); // Blocking call!
using qos1_fut_type = std::tuple<async_mqtt5::reason_code, async_mqtt5::puback_props>;
std::future<qos1_fut_type> pub_qos1_fut =
client.async_publish<async_mqtt5::qos_e::at_least_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_future
);
auto [qos1_rc, puback_props] = pub_qos1_fut.get();
std::cout << "Publish QoS 1 Reason Code: " << qos1_rc.message() << std::endl;
using qos2_fut_type = std::tuple<async_mqtt5::reason_code, async_mqtt5::pubcomp_props>;
std::future<qos2_fut_type> pub_qos2_fut =
client.async_publish<async_mqtt5::qos_e::exactly_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_future
);
auto [qos2_rc, pubcomp_props] = pub_qos2_fut.get();
std::cout << "Publish QoS 2 Reason Code: " << qos2_rc.message() << std::endl;
auto sub_topic = async_mqtt5::subscribe_topic{
"test/mqtt-test", {
async_mqtt5::qos_e::exactly_once,
async_mqtt5::subscribe_options::no_local_e::no,
async_mqtt5::subscribe_options::retain_as_published_e::retain,
async_mqtt5::subscribe_options::retain_handling_e::send
}
};
using sub_fut_type = std::tuple<std::vector<async_mqtt5::reason_code>, async_mqtt5::suback_props>;
std::future<sub_fut_type> sub_fut = client.async_subscribe(
sub_topic, async_mqtt5::subscribe_props {}, asio::use_future
);
auto [sub_rcs, suback_props] = sub_fut.get();
std::cout << "Subscribe Reason Code: " << sub_rcs[0].message() << std::endl;
// Note: the get() call on async_receive future could block indefinitely if the mqtt_client
// failed to subscribe or there are no Application Messages to be received from the subcribed Topic!
if (!sub_rcs[0]) {
using rec_fut_type = std::tuple<std::string, std::string, async_mqtt5::publish_props>;
std::future<rec_fut_type> rec_fut = client.async_receive(asio::use_future);
auto [topic, payload, publish_props] = rec_fut.get();
std::cout << "Received message from Topic: " << topic << ", " << payload << std::endl;
}
using unsub_fut_type = std::tuple<std::vector<async_mqtt5::reason_code>, async_mqtt5::unsuback_props>;
std::future<unsub_fut_type> unsub_fut = client.async_unsubscribe(
"test/mqtt-test", async_mqtt5::unsubscribe_props {}, asio::use_future
);
auto [unsub_rcs, unsuback_props] = unsub_fut.get();
std::cout << "Unubscribe Reason Code: " << unsub_rcs[0].message() << std::endl;
std::future<void> dc_fut = client.async_disconnect(asio::use_future);
dc_fut.get();
return;
}
int main(int argc, char** argv) {
// asio::io_context must be running in more than one thread!
constexpr auto thread_num = 2;
asio::io_context ioc(thread_num);
std::vector<std::thread> threads;
threads.reserve(thread_num - 1);
// Make an instance of mqtt_client. Establish a TCP connection with the Broker.
client_type c(ioc.get_executor(), "");
c.credentials("test-client", "", "")
.brokers("mqtt.mireo.local", 1883)
.run();
for (int i = 0; i < thread_num - 1; ++i)
threads.emplace_back([&ioc] { ioc.run(); });
run_with_future(c);
ioc.run();
for (auto& t : threads)
if (t.joinable()) t.join();
}
//]