publisher & receiver examples, better completion token examples

Reviewers: iljazovic, ivica

Reviewed By: ivica

Subscribers: miljen

Differential Revision: https://repo.mireo.local/D26582
This commit is contained in:
Korina Šimičević
2023-11-17 09:46:07 +01:00
parent 376393fbfa
commit 73620e5653
12 changed files with 372 additions and 149 deletions

View File

@@ -103,9 +103,9 @@
[def __REASON_CODES__ [reflink2 Reason_codes `Reason Codes`]]
[def __ERROR_HANDLING__ [reflink2 Error_handling `Error handling`]]
[def __EXAMPLE_CALLBACK__ [link async_mqtt5.examples.asio.callbacks Async functions with callbacks]]
[def __EXAMPLE_COROUTINE__ [link async_mqtt5.examples.asio.cpp20_coroutines Async functions with C++20 coroutines]]
[def __EXAMPLE_FUTURE__ [link async_mqtt5.examples.asio.futures Async functions with futures]]
[def __EXAMPLE_CALLBACK__ [link async_mqtt5.examples.callbacks Async functions with callbacks]]
[def __EXAMPLE_COROUTINE__ [link async_mqtt5.examples.cpp20_coroutines Async functions with C++20 coroutines]]
[def __EXAMPLE_FUTURE__ [link async_mqtt5.examples.futures Async functions with futures]]
[include 01_intro.qbk]
[include 02_examples.qbk]
@@ -115,12 +115,14 @@
[h3 Table of Contents]
* [link async_mqtt5.intro Introduction]
* [link async_mqtt5.examples Examples]
* [link async_mqtt5.examples.publisher The publisher]
* [link async_mqtt5.examples.receiver The receiver]
* [link async_mqtt5.examples.network_connection Establishing a network connection with different protocols]
* [link async_mqtt5.examples.network_connection.tcp_ip_connection TCP/IP connection]
* [link async_mqtt5.examples.network_connection.tls_ssl_connection TLS/SSL connection]
* [link async_mqtt5.examples.network_connection.websocket_connection WebSocket connection]
* [link async_mqtt5.examples.asio Compatibility with Boost.Asio]
* [link async_mqtt5.examples.asio.callbacks Async functions with callbacks]
* [link async_mqtt5.examples.asio.cpp20_coroutines Async functions with C++20 coroutines]
* [link async_mqtt5.examples.asio.futures Async functions with futures]
* [link async_mqtt5.examples.completion_tokens Completion tokens]
* [link async_mqtt5.examples.callbacks Async functions with callbacks]
* [link async_mqtt5.examples.cpp20_coroutines Async functions with C++20 coroutines]
* [link async_mqtt5.examples.futures Async functions with futures]
* [link async_mqtt5.ref Reference]

View File

@@ -1,7 +1,7 @@
[section:intro Introduction]
[nochunk]
__Self__ is a C++20 client built on Boost.Asio.
__Self__ is a C++20 client built on __Asio__.
This client is designed for publishing or receiving messages from an MQTT 5.0 compatible broker.
__Self__ represents a comprehensive implementation of the MQTT 5.0 protocol standard,
offering full support for publishing or receiving messages with QoS 0, 1, and 2.

View File

@@ -6,22 +6,28 @@
]
[section:examples Examples]
The following examples demonstrate how to use __Client__ in different scenarios.
The main class in __Self__ is __Client__, and the upcoming examples will briefly explain how to use it.
The first section will show how to use different underlying transport protocols (such as TCP, SSL and WebSocket)
The first examples will show two common uses of an MQTT client: as the publisher and as the receiver.
* [link async_mqtt5.examples.publisher The publisher]
* [link async_mqtt5.examples.receiver The receiver]
The following section will show how to use different underlying transport protocols (such as TCP, SSL and WebSocket)
to establish a connection to a MQTT Broker.
* [link async_mqtt5.examples.network_connection Establishing a network connection with different protocols]
The second section will showcase how to use asynchronous functions in __Client__
The final section will showcase how to use asynchronous functions in __Client__
with different __CompletionToken__.
* [link async_mqtt5.examples.asio Compatibility with Boost.Asio]
* [link async_mqtt5.examples.asio.callbacks Async functions with callbacks]
* [link async_mqtt5.examples.asio.cpp20_coroutines Async functions with C++20 coroutines]
* [link async_mqtt5.examples.asio.futures Async functions with futures]
* [link async_mqtt5.examples.completion_tokens Completion tokens]
* [link async_mqtt5.examples.callbacks Async functions with callbacks]
* [link async_mqtt5.examples.cpp20_coroutines Async functions with C++20 coroutines]
* [link async_mqtt5.examples.futures Async functions with futures]
[include examples/Tutorial.qbk]
[include examples/Network_connection.qbk]
[include examples/Asio_compatibility.qbk]
[include examples/Completion_tokens.qbk]
[endsect]

View File

@@ -1,40 +0,0 @@
[/
Copyright (c) 2023 Mireo
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
]
[section:asio Compatibility with Boost.Asio]
The __Client__ is built upon __Asio__ and thus follows the same principles.
This section illustrates the usage of __Client__ async
functions with different __CompletionToken__.
# [link async_mqtt5.examples.asio.callbacks Async functions with callbacks]
# [link async_mqtt5.examples.asio.cpp20_coroutines Async functions with C++20 coroutines]
# [link async_mqtt5.examples.asio.futures Async functions with futures]
[section:callbacks Async functions with callbacks]
This example demonstrates how to use __Client__ asynchrous functions with callbacks.
[import ../../../example/callbacks.cpp]
[callbacks_examples]
[endsect]
[section:cpp20_coroutines Async functions with C++20 coroutines]
This example demonstrates how to use __Client__ asynchrous functions with C++20 coroutines
using __USE_AWAITABLE__ and __CO_SPAWN__.
[import ../../../example/cpp20_coroutines.cpp]
[cpp20_coroutines_examples]
[endsect]
[section:futures Async functions with futures]
This example demonstrates how to use __Client__ asynchrous functions with __USE_FUTURE__
completion token.
[import ../../../example/futures.cpp]
[futures_examples]
[endsect]
[endsect]

View File

@@ -0,0 +1,118 @@
[/
Copyright (c) 2023 Mireo
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
]
[section:completion_tokens Completion tokens]
The __Client__ is built upon __Asio__ and thus follows the same principles.
This section illustrates the usage of __Client__ async
functions with different __CompletionToken__.
# [link async_mqtt5.examples.callbacks Async functions with callbacks]
# [link async_mqtt5.examples.cpp20_coroutines Async functions with C++20 coroutines]
# [link async_mqtt5.examples.futures Async functions with futures]
[endsect]
[section:callbacks Async functions with callbacks]
The following list is a reference on how to use asynchrous functions in __Client__ with callbacks.
[import ../../../example/callbacks.cpp]
[h4 Publish]
[publish_callback]
[h4 Subscribe]
[subscribe_callback]
[h4 Receive]
[receive_callback]
[h4 Unsubscribe]
[unsubscribe_callback]
[h4 Disconnect]
[disconnect_callback]
[endsect]
[section:cpp20_coroutines Async functions with C++20 coroutines]
This example demonstrates how to use __Client__ asynchrous functions with C++20 coroutines
using __USE_AWAITABLE__.
[import ../../../example/cpp20_coroutines.cpp]
[h2 use_awaitable]
In this section, each asynchronous function is invoked with __USE_AWAITABLE__ completion token.
When using this completion token, co_await will throw exceptions instead of returning an error code.
If you do not wish to throw exceptions, refer to the following use_nothrow_awaitable section.
[h4 Publish]
[publish_coro]
[h4 Subscribe]
[subscribe_coro]
[h4 Receive]
[receive_coro]
[h4 Unsubscribe]
[unsubscribe_coro]
[h4 Disconnect]
[disconnect_coro]
[h2 use_nothrow_awaitable]
The following examples will use a modified completion token.
Using this completion token instead of __USE_AWAITABLE__ will prevent co_await from throwing exceptions.
Instead, co_await will return the error code along with other values specified in the handler signature.
[no_throw_awaitable]
[h4 Publish]
[publish_coro_nothrow]
[h4 Subscribe]
[subscribe_coro_nothrow]
[h4 Receive]
[receive_coro_nothrow]
[h4 Unsubscribe]
[unsubscribe_coro_nothrow]
[h4 Disconnect]
[disconnect_coro_nothrow]
[endsect]
[section:futures Async functions with futures]
The following list is a reference on how to use the mqtt_client with __USE_FUTURE__ as the completion token.
Each get() call on std::future will block the current thread and wait until the future has a valid result.
That is why it is essential to ensure that the execution context is running in more than one thread.
[import ../../../example/futures.cpp]
[h4 Publish]
[publish_future]
[h4 Subscribe]
[subscribe_future]
[h4 Receive]
[receive_future]
[h4 Unsubscribe]
[unsubscribe_future]
[h4 Disconnect]
[disconnect_future]
[endsect]

View File

@@ -15,12 +15,12 @@ transport protocols such as TCP/IP, TLS/SSL, and WebSocket.
[import ../../../example/network_connection.cpp]
[h3 TCP/IP connection]
To create a TCP/IP connection with a Broker, initialize __Client__ with __TCP_SOCKET__ as the __StreamType__.
To create a TCP/IP connection with a Broker, initialise __Client__ with __TCP_SOCKET__ as the __StreamType__.
[tcp]
[h3 TLS/SSL connection]
To establish a secure and encrypted connection using the TLS/SSL protocol, supply a context object that meets the __TlsContext__ requirements.
Additionally, initialize __Client__ with an underlying stream that implements TLS/SSL protocol as the __StreamType__.
Additionally, initialise __Client__ with an underlying stream that implements TLS/SSL protocol as the __StreamType__.
This example will demonstrate how to set up an SSL connection using __SSL_CONTEXT__ and __SSL_STREAM__.
To use SSL support in __Asio__, __OPENSSL__ is required.
@@ -38,6 +38,6 @@ __WEBSOCKET_STREAM__.
[h4 WebSocket over TLS/SSL]
[websocket_tls]
Once the __Client__ has been initialized with a suitable __StreamType__, it is prepared for configuration and utilization.
Once the __Client__ has been initialised with a suitable __StreamType__, it is prepared for configuration and utilisation.
[endsect]

View File

@@ -0,0 +1,30 @@
[/
Copyright (c) 2023 Mireo
Distributed under the Boost Software License, Version 1.0. (See accompanying
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
]
[section:publisher The publisher]
This example will show how to use __Client__ as a publisher.
The __Client__ will use TCP to connect to the Broker, and __USE_AWAITABLE__ as the completion token.
To modify the connection type, refer to [link async_mqtt5.examples.network_connection Establishing a network connection with different protocols].
To use different completion token, refer to [link async_mqtt5.examples.completion_tokens Completion tokens].
[import ../../../example/publisher.cpp]
[publisher]
[endsect]
[section:receiver The receiver]
This example will show how to use __Client__ as a receiver.
The __Client__ will use TCP to connect to the Broker, and __USE_AWAITABLE__ as the completion token.
To modify the connection type, refer to [link async_mqtt5.examples.network_connection Establishing a network connection with different protocols].
To use different completion token, refer to [link async_mqtt5.examples.completion_tokens Completion tokens].
[import ../../../example/receiver.cpp]
[receiver]
[endsect]

View File

@@ -1,5 +1,3 @@
//[callbacks_examples
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
@@ -12,13 +10,10 @@ namespace asio = boost::asio;
using stream_type = asio::ip::tcp::socket;
using client_type = async_mqtt5::mqtt_client<stream_type>;
/**
* This function showcases how to call each asynchronous function
* in mqtt_client using callbacks. Note that this example is not
* intended for direct execution, as the async_disconnect call
* will promptly close the client.
*/
void run_with_callbacks(client_type& client) {
//[publish_callback
// Publish an Application Message with QoS 0.
client.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "Hello world!",
@@ -50,7 +45,9 @@ void run_with_callbacks(client_type& client) {
std::cout << "reason_code: " << rc.message() << std::endl;
}
);
//]
//[subscribe_callback
// Subscribe to a single Topic.
client.async_subscribe(
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } }, async_mqtt5::subscribe_props {},
@@ -62,7 +59,9 @@ void run_with_callbacks(client_type& client) {
std::cout << "subscribe reason_code: " << codes[0].message() << std::endl;
}
);
//]
//[receive_callback
// Receive an Application Message.
client.async_receive(
// Callback with signature void (error_code, std::string, std::string, publish_props)
@@ -74,7 +73,9 @@ void run_with_callbacks(client_type& client) {
std::cout << "payload: " << payload << std::endl;
}
);
//]
//[unsubscribe_callback
// Unsubscribe from the Topic.
client.async_unsubscribe("test/mqtt-test", async_mqtt5::unsubscribe_props {},
//Callback with signature void (error_code, std::vector<reason_code>, unsuback_props)
@@ -85,7 +86,9 @@ void run_with_callbacks(client_type& client) {
std::cout << "unsubscribe reason_code: " << codes[0].message() << std::endl;
}
);
//]
//[disconnect_callback
// Disconnect the Client.
client.async_disconnect(
async_mqtt5::disconnect_rc_e::disconnect_with_will_message,
@@ -93,7 +96,7 @@ void run_with_callbacks(client_type& client) {
// Callback with signature void (error_code)
[](async_mqtt5::error_code) {}
);
//]
}
int main(int argc, char** argv) {
@@ -110,5 +113,3 @@ int main(int argc, char** argv) {
ioc.run();
}
//]

View File

@@ -1,5 +1,3 @@
//[cpp20_coroutines_examples
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
@@ -15,16 +13,8 @@ namespace asio = boost::asio;
using stream_type = asio::ip::tcp::socket;
using client_type = async_mqtt5::mqtt_client<stream_type>;
/**
* An example of a coroutine. It must have a return type of boost::asio::awaitable<T>.
* When an asynchronous function is called, the coroutine is suspended.
* After the asynchronous operation finishes, the coroutine resumes from the point it was suspended.
*
* In this example, each asynchronous function is invoked with ``__USE_AWAITABLE__`` completion token.
* When using this completion token, co_await will throw exceptions instead of returning an error code.
* If you do not wish to throw exceptions, refer to the following use_nothrow_awaitable and nothrow_coroutine() example.
*/
asio::awaitable<void> coroutine(client_type& client) {
//[publish_coro
// Publish an Application Message with QoS 0.
// The handler signature for this function is void (error_code).
// However, when using asio::use_awaitable as a completion token,
@@ -52,32 +42,29 @@ asio::awaitable<void> coroutine(client_type& client) {
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_awaitable
);
//]
auto sub_topic = async_mqtt5::subscribe_topic {
"test/mqtt-test", {
async_mqtt5::qos_e::exactly_once,
async_mqtt5::subscribe_options::no_local_e::no,
async_mqtt5::subscribe_options::retain_as_published_e::retain,
async_mqtt5::subscribe_options::retain_handling_e::send
}
};
//[subscribe_coro
// Subscribe to a single Topic.
// The handler signature for this function is void (error_code, std::vector<reason_code>, suback_props).
// With asio::use_awaitable as a completion token, the co_await
// will return std::vector<reason_code> and suback_props.
auto [sub_codes, sub_props] = co_await client.async_subscribe(
sub_topic, async_mqtt5::subscribe_props {}, asio::use_awaitable
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } },
async_mqtt5::subscribe_props {}, asio::use_awaitable
);
//]
//[receive_coro
// Receive an Application Message.
// The co_await call will return std::string (topic), std::string (payload) and publish_props.
// Note: the coroutine will be suspended until an Application Message is ready to be received
// or an error has occurred. In theory, the coroutine could be suspended indefinitely.
// Avoid calling this if you have not successfully subscribed to a Topic.
if (!sub_codes[0])
auto [topic, payload, publish_props] = co_await client.async_receive(asio::use_awaitable);
//]
//[unsubscribe_coro
// Unsubscribe from the Topic.
// The handler signature for this function is void (error_code, std::vector<reason_code>, unsuback_props).
// With asio::use_awaitable as a completion token, the co_await
@@ -86,7 +73,9 @@ asio::awaitable<void> coroutine(client_type& client) {
"test/mqtt-test", async_mqtt5::unsubscribe_props {},
asio::use_awaitable
);
//]
//[disconnect_coro
// Disconnect the Client.
// With asio::use_awaitable as a completion token and void (error_code) as the completion signature,
// the co_await has nothing to return.
@@ -95,23 +84,18 @@ asio::awaitable<void> coroutine(client_type& client) {
async_mqtt5::disconnect_props {},
asio::use_awaitable
);
//]
co_return;
}
/**
* A modified completion token. Using this completion token instead of ``__USE_AWAITABLE__``
* will prevent co_await from throwing exceptions. Instead, co_await will return the error code
* along with other values specified in the handler signature.
*/
//[no_throw_awaitable
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
//]
/**
* In this coroutine, each asynchronous function is called with use_nothrow_awaitable completion token.
* Each co_await will return an error_code, similar to the behavior of using callbacks
* (see ``__EXAMPLE_CALLBACK__``).
*/
asio::awaitable<void> nothrow_coroutine(client_type& client) {
//[publish_coro_nothrow
async_mqtt5::error_code ec;
async_mqtt5::reason_code rc;
@@ -134,39 +118,38 @@ asio::awaitable<void> nothrow_coroutine(client_type& client) {
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
use_nothrow_awaitable
);
//]
auto sub_topic = async_mqtt5::subscribe_topic{
"test/mqtt-test", {
async_mqtt5::qos_e::exactly_once,
async_mqtt5::subscribe_options::no_local_e::no,
async_mqtt5::subscribe_options::retain_as_published_e::retain,
async_mqtt5::subscribe_options::retain_handling_e::send
}
};
//[subscribe_coro_nothrow
std::vector<async_mqtt5::reason_code> rcs;
async_mqtt5::suback_props suback_props;
std::tie(ec, rcs, suback_props) = co_await client.async_subscribe(
sub_topic, async_mqtt5::subscribe_props {}, use_nothrow_awaitable
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } },
async_mqtt5::subscribe_props {}, use_nothrow_awaitable
);
//]
if (!rcs[0]) {
//[receive_coro_nothrow
std::string topic, payload;
async_mqtt5::publish_props publish_props;
std::tie(ec, topic, payload, publish_props) = co_await client.async_receive(use_nothrow_awaitable);
}
//]
//[unsubscribe_coro_nothrow
async_mqtt5::unsuback_props unsuback_props;
std::tie(ec, rcs, unsuback_props) = co_await client.async_unsubscribe(
std::vector<std::string>{ "test/mqtt-test" }, async_mqtt5::unsubscribe_props {},
use_nothrow_awaitable
);
//]
//[disconnect_coro_nothrow
std::tie(ec) = co_await client.async_disconnect(
async_mqtt5::disconnect_rc_e::disconnect_with_will_message,
async_mqtt5::disconnect_props {},
use_nothrow_awaitable
);
//]
co_return;
}
@@ -188,5 +171,3 @@ int main(int argc, char** argv) {
ioc.run();
}
//]

View File

@@ -1,5 +1,3 @@
//[futures_examples
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_future.hpp>
@@ -15,13 +13,9 @@ namespace asio = boost::asio;
using stream_type = asio::ip::tcp::socket;
using client_type = async_mqtt5::mqtt_client<stream_type>;
/**
* This function is a reference on how to use the mqtt_client with ``__USE_FUTURE__``
* as the completion token. Each get() call on std::future will block the current
* thread and wait until the future has a valid result. That is why it is essential
* to ensure that the execution context is running in more than one thread.
*/
void run_with_future(client_type& client) {
//[publish_future
// Just like the asio::use_awaitable completion token
// (see ``__EXAMPLE_COROUTINE__``), the ``__USE_FUTURE__`` completion token
// will not return the error_code. Instead, it will throw an exception if an error has occurred.
@@ -52,42 +46,40 @@ void run_with_future(client_type& client) {
);
auto [qos2_rc, pubcomp_props] = pub_qos2_fut.get();
std::cout << "Publish QoS 2 Reason Code: " << qos2_rc.message() << std::endl;
//]
auto sub_topic = async_mqtt5::subscribe_topic{
"test/mqtt-test", {
async_mqtt5::qos_e::exactly_once,
async_mqtt5::subscribe_options::no_local_e::no,
async_mqtt5::subscribe_options::retain_as_published_e::retain,
async_mqtt5::subscribe_options::retain_handling_e::send
}
};
//[subscribe_future
using sub_fut_type = std::tuple<std::vector<async_mqtt5::reason_code>, async_mqtt5::suback_props>;
std::future<sub_fut_type> sub_fut = client.async_subscribe(
sub_topic, async_mqtt5::subscribe_props {}, asio::use_future
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } },
async_mqtt5::subscribe_props {}, asio::use_future
);
auto [sub_rcs, suback_props] = sub_fut.get();
std::cout << "Subscribe Reason Code: " << sub_rcs[0].message() << std::endl;
//]
// Note: the get() call on async_receive future could block indefinitely if the mqtt_client
//[receive_future
// Note: the get() call on async_receive future could block indefinitely if the ``__Client__``
// failed to subscribe or there are no Application Messages to be received from the subscribed Topic!
if (!sub_rcs[0]) {
using rec_fut_type = std::tuple<std::string, std::string, async_mqtt5::publish_props>;
std::future<rec_fut_type> rec_fut = client.async_receive(asio::use_future);
auto [topic, payload, publish_props] = rec_fut.get();
std::cout << "Received message from Topic: " << topic << ", " << payload << std::endl;
}
//]
//[unsubscribe_future
using unsub_fut_type = std::tuple<std::vector<async_mqtt5::reason_code>, async_mqtt5::unsuback_props>;
std::future<unsub_fut_type> unsub_fut = client.async_unsubscribe(
"test/mqtt-test", async_mqtt5::unsubscribe_props {}, asio::use_future
);
auto [unsub_rcs, unsuback_props] = unsub_fut.get();
std::cout << "Unubscribe Reason Code: " << unsub_rcs[0].message() << std::endl;
//]
//[disconnect_future
std::future<void> dc_fut = client.async_disconnect(asio::use_future);
dc_fut.get();
//]
return;
}
@@ -117,5 +109,3 @@ int main(int argc, char** argv) {
if (t.joinable()) t.join();
}
//]

55
example/publisher.cpp Normal file
View File

@@ -0,0 +1,55 @@
//[publisher
#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
namespace asio = boost::asio;
asio::awaitable<void> client_publisher(asio::io_context& ioc) {
// Initialise the ``__Client__``, establish connection to the Broker over TCP.
async_mqtt5::mqtt_client<asio::ip::tcp::socket> client(ioc, "");
// Configure the ``__Client__``.
// It is mandatory to call brokers() and run() to configure the Brokers to connect to and start the Client.
client.brokers("mqtt.mireo.local", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
.run(); // Start the client.
// Publish an Application Message with QoS 1.
auto [rc, props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"test/mqtt-test", "my application message",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, asio::use_awaitable
);
if (rc)
std::cout << "MQTT protocol error occurred: " << rc.message() << std::endl;
// Publish some more messages...
// After we are done with publishing all the messages, disconnect the Client.
// Alternatively, you can also use mqtt_client::cancel.
// Regardless, you should ensure all the operations are completed before disconnecting the Client.
co_await client.async_disconnect(
async_mqtt5::disconnect_rc_e::normal_disconnection, async_mqtt5::disconnect_props {}, asio::use_awaitable
);
co_return;
}
int main() {
// Initialise execution context.
asio::io_context ioc;
// Spawn the coroutine.
asio::co_spawn(ioc.get_executor(), client_publisher(ioc), asio::detached);
// Start the execution.
ioc.run();
}
//]

80
example/receiver.cpp Normal file
View File

@@ -0,0 +1,80 @@
//[receiver
#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
namespace asio = boost::asio;
asio::awaitable<void> client_receiver(asio::io_context& ioc) {
// Initialise the ``__Client__``, establish connection to the Broker over TCP.
async_mqtt5::mqtt_client<asio::ip::tcp::socket> client(ioc, "");
// Configure the``__Client__``.
// It is mandatory to call brokers() and run() to configure the Brokers to connect to and start the Client.
client.brokers("mqtt.broker", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
.run(); // Start the client.
// Configure the request to subscribe to a Topic.
async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic {
"test/mqtt-test",
async_mqtt5::subscribe_options {
async_mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2.
async_mqtt5::subscribe_options::no_local_e::no, // Forward message from Clients with same ID.
async_mqtt5::subscribe_options::retain_as_published_e::retain, // Keep the original RETAIN flag.
async_mqtt5::subscribe_options::retain_handling_e::send // Send retained messages when the subscription is established.
}
};
// Subscribe to a single Topic.
auto [sub_codes, sub_props] = co_await client.async_subscribe(
sub_topic, async_mqtt5::subscribe_props {}, asio::use_awaitable
);
// Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call.
// std::vector<async_mqtt5::reason_code> sub_codes contain the result of the subscribe action for every Topic.
// Before attempting to receive an Application Message from the Topic we just subscribed to,
// it is advisable to verify that the subscription succeeded.
// It is not recommended to call mqtt_client::async_receive if you do not have any
// Subscription established as the corresponding handler will never be invoked.
if (!sub_codes[0])
auto [topic, payload, publish_props] = co_await client.async_receive(asio::use_awaitable);
// Receive more messages...
// Unsubscribe from the Topic.
// Similar to mqtt_client::async_subscribe call, std::vector<async_mqtt5::reason_code> unsub_codes contain
// the result of the unsubscribe action for every Topic.
auto [unsub_codes, unsub_props] = co_await client.async_unsubscribe(
"test/mqtt-test", async_mqtt5::unsubscribe_props {},
asio::use_awaitable
);
// Note: you can unsubscribe from multiple Topics in one mqtt_client::async_unsubscribe call.
// Disconnect the Client.
co_await client.async_disconnect(
async_mqtt5::disconnect_rc_e::disconnect_with_will_message,
async_mqtt5::disconnect_props {},
asio::use_awaitable
);
co_return;
}
int main() {
// Initialise execution context.
asio::io_context ioc;
// Spawn the coroutine.
asio::co_spawn(ioc, client_receiver(ioc), asio::detached);
// Start the execution.
ioc.run();
}
//]