Examples rework

Summary:
related to T15261, T15262
- all examples are added to CI checks
	- as a result, CMakeLists.txt in example folder is reworked to build all examples instead of one
	- also removed chapter Building with CMake temporarily (it will be back)
- all examples accept runtime arguments (brokers, port, clientid) (default: hivemq public broker)
- implemented example suggestions by R.Perez, most notable:
	- all examples include mqtt headers they use (no more unified headers)
	- multithreaded examples use thread_pool instead of asio::io_context
- all examples use logger with log_level::info by default

Reviewers: ivica

Reviewed By: ivica

Subscribers: iljazovic, miljen

Differential Revision: https://repo.mireo.local/D32602
This commit is contained in:
Korina Šimičević
2024-12-03 15:26:05 +01:00
parent d19f466e3e
commit 9a6788c913
20 changed files with 556 additions and 295 deletions

View File

@ -101,6 +101,14 @@ jobs:
cmake -S . -B build -DBoost_INCLUDE_DIR="${{ github.workspace }}\\boost_${{ env.BOOST_DIR_VER_NAME }}" cmake -S . -B build -DBoost_INCLUDE_DIR="${{ github.workspace }}\\boost_${{ env.BOOST_DIR_VER_NAME }}"
cmake --install build cmake --install build
- name: Build examples
run: |
cmake -S example -B example\\build -A ${{ matrix.architecture }} ^
-DCMAKE_CXX_FLAGS="${{ env.CXXFLAGS }}" -DCMAKE_EXE_LINKER_FLAGS="${{ env.LDFLAGS }}" ^
-DCMAKE_CXX_STANDARD="${{ matrix.cxxstd }}" -DCMAKE_BUILD_TYPE="${{ matrix.build-type }}" ^
-DBoost_INCLUDE_DIR="${{ github.workspace }}\\boost_${{ env.BOOST_DIR_VER_NAME }}"
cmake --build example\\build -j 4
- name: Build tests - name: Build tests
run: | run: |
cmake -S test -B test\\build -A ${{ matrix.architecture }} ^ cmake -S test -B test\\build -A ${{ matrix.architecture }} ^
@ -278,6 +286,14 @@ jobs:
-DBoost_INCLUDE_DIR="/usr/local/boost_${{ env.BOOST_DIR_VER_NAME }}" -DBoost_INCLUDE_DIR="/usr/local/boost_${{ env.BOOST_DIR_VER_NAME }}"
sudo cmake --install build sudo cmake --install build
- name: Build examples
run: |
cmake -S example -B example/build \
-DCMAKE_CXX_COMPILER="${{ matrix.compiler }}" -DCMAKE_CXX_FLAGS="${{ env.CXXFLAGS }}" \
-DCMAKE_CXX_STANDARD="${{ matrix.cxxstd }}" -DCMAKE_EXE_LINKER_FLAGS="${{ env.LDFLAGS }}" -DCMAKE_BUILD_TYPE="${{ matrix.build-type }}" \
-DBoost_INCLUDE_DIR="/usr/local/boost_${{ env.BOOST_DIR_VER_NAME }}"
cmake --build example/build -j 4
- name: Build tests - name: Build tests
run: | run: |
cmake -S test -B test/build \ cmake -S test -B test/build \

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15) cmake_minimum_required(VERSION 3.15)
project(async-mqtt5 VERSION 1.0.1 LANGUAGES CXX) project(async-mqtt5 VERSION 1.0.2 LANGUAGES CXX)
include(cmake/project-is-top-level.cmake) include(cmake/project-is-top-level.cmake)
include(cmake/variables.cmake) include(cmake/variables.cmake)

View File

@ -89,15 +89,16 @@ The following example illustrates a scenario of configuring a Client and publish
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/types.hpp>
int main() { int main() {
boost::asio::io_context ioc; boost::asio::io_context ioc;
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> c(ioc); async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> c(ioc);
c.credentials("<your-client-id>", "<client-username>", "<client-pwd>") c.brokers("<your-mqtt-broker>", 1883)
.brokers("<your-mqtt-broker>", 1883) .credentials("<your-client-id>", "<client-username>", "<client-pwd>")
.async_run(boost::asio::detached); .async_run(boost::asio::detached);
c.async_publish<async_mqtt5::qos_e::at_most_once>( c.async_publish<async_mqtt5::qos_e::at_most_once>(
@ -114,27 +115,6 @@ int main() {
``` ```
To see more examples, visit [Examples](https://github.com/mireo/async-mqtt5/tree/master/example). To see more examples, visit [Examples](https://github.com/mireo/async-mqtt5/tree/master/example).
Building with CMake
---------
You can use the `CMakeLists.txt` provided in our repository to compile and run any of the [examples](https://github.com/mireo/async-mqtt5/tree/master/example) or your own source files.
The following commands demonstrate compiling and running the previous code using CMake on Linux.
The source file is located at [example/hello_world_over_tcp.cpp](https://github.com/mireo/async-mqtt5/blob/master/example/hello_world_over_tcp.cpp).
```bash
# navigate to the root folder of Async.MQTT5
# compile the example
cmake -S . -B {build-folder} -DBUILD_EXAMPLES=ON -DCMAKE_EXE_LINKER_FLAGS="-pthread"
cmake --build {build-folder}
# run the example
./{build-folder}/example/example
```
You can edit the [example/CMakeLists.txt](https://github.com/mireo/async-mqtt5/blob/master/example/CMakeLists.txt) file to compile the source file of your choice.
By default, it will compile [example/hello_world_over_tcp.cpp](https://github.com/mireo/async-mqtt5/blob/master/example/hello_world_over_tcp.cpp).
Contributing Contributing
--------- ---------
When contributing to this repository, please first discuss the change you wish to make via issue, email, or any other method with the owners of this repository before making a change. When contributing to this repository, please first discuss the change you wish to make via issue, email, or any other method with the owners of this repository before making a change.

View File

@ -208,23 +208,6 @@ The following example illustrates a scenario of configuring a Client and publish
To see more examples, visit [link async_mqtt5.examples Examples]. To see more examples, visit [link async_mqtt5.examples Examples].
[heading Building with CMake]
You can use the `CMakeLists.txt` provided in our repository to compile and run any of the [link async_mqtt5.examples examples] or your own source files.
The following commands demonstrate compiling and running the previous code using CMake on Linux.
The source file is located at [ghreflink example/hello_world_over_tcp.cpp example/hello_world_over_tcp.cpp].
# navigate to the root folder of Async.MQTT5
# compile the example
cmake -S . -B {build-folder} -DBUILD_EXAMPLES=ON -DCMAKE_EXE_LINKER_FLAGS="-pthread"
cmake --build {build-folder}
# run the example
./{build-folder}/example/example
You can edit the [ghreflink example/CMakeLists.txt example/CMakeLists.txt] file to compile the source file of your choice.
By default, it will compile [ghreflink example/hello_world_over_tcp.cpp example/hello_world_over_tcp.cpp].
[heading Acknowledgements] [heading Acknowledgements]
We thank [@https://github.com/chriskohlhoff Christopher Kohlhoff] for his outstanding __Asio__ library, We thank [@https://github.com/chriskohlhoff Christopher Kohlhoff] for his outstanding __Asio__ library,
which inspired the design of all interfaces and implementation strategies. which inspired the design of all interfaces and implementation strategies.

View File

@ -30,7 +30,12 @@ which might be required for establishing a secure connection.
In this example, we choose TCP/IP as the underlying protocol to initialize the __Client__. In this example, we choose TCP/IP as the underlying protocol to initialize the __Client__.
[init_tcp_client] [!c++]
// Initialize the execution context required to run I/O operations.
boost::asio::io_context ioc;
// Construct the Client with ``__TCP_SOCKET__`` as the underlying stream.
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc);
[endsect] [/transport_protocol] [endsect] [/transport_protocol]
@ -48,9 +53,9 @@ Listing Brokers from different clusters may lead to inconsistencies between MQTT
* *Assign a custom user-implemented authenticator:* The custom authentication will be used for __ENHANCED_AUTH__ ([refmem mqtt_client authenticator]). * *Assign a custom user-implemented authenticator:* The custom authentication will be used for __ENHANCED_AUTH__ ([refmem mqtt_client authenticator]).
* *Defining CONNECT Packet Properties:* Specify properties that will be included in the __CONNECT__ packet sent during connection initiation (see [refmem mqtt_client connect_property] and [refmem mqtt_client connect_properties]). * *Defining CONNECT Packet Properties:* Specify properties that will be included in the __CONNECT__ packet sent during connection initiation (see [refmem mqtt_client connect_property] and [refmem mqtt_client connect_properties]).
We will connect to the __HIVEMQ__'s public Broker at `broker.hivemq.com`, which listens for MQTT connections on the default TCP/IP port 1883. Firstly, we will specify the Broker we want to connect to using the [refmem mqtt_client brokers] function.
Additionally, we will set the Client Identifier to `async_mqtt5_tester` using the [refmem mqtt_client credentials] function. This can be __HIVEMQ__'s public Broker available at `broker.hivemq.com`:1883.
This is not strictly mandatory, as some Brokers allow anonymous connections. Additionally, we can set the Client Identifier using the [refmem mqtt_client credentials] function. This is not mandatory, as some Brokers allow anonymous connections.
After configuring the `mqtt_client`, we call the [refmem mqtt_client async_run] function. After configuring the `mqtt_client`, we call the [refmem mqtt_client async_run] function.
This starts the process of establishing a connection with the Broker. This starts the process of establishing a connection with the Broker.
@ -120,12 +125,7 @@ The __Self__ library provides a built-in [ghreflink include/async_mqtt5/logger.h
This logger outputs detailed information about each step in the connection process, including DNS resolution, TCP connection, TLS handshake, WebSocket handshake, and MQTT handshake. This logger outputs detailed information about each step in the connection process, including DNS resolution, TCP connection, TLS handshake, WebSocket handshake, and MQTT handshake.
To enable this functionality, construct the __Client__ with an instance of this logger class: To enable this functionality, construct the __Client__ with an instance of this logger class:
[!c++] [init_tcp_client_with_logger]
// Since we are not establishing a secure connection, set the TlsContext template parameter to std::monostate.
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger> client(
ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::debug)
);
[endsect] [/debugging] [endsect] [/debugging]

View File

@ -28,7 +28,6 @@ This example illustrates the process of setting up the Client to connect to the
[section:publisher The publisher] [section:publisher The publisher]
This example shows how to use __Client__ as a publisher that publishes sensor readings every `5` seconds. This example shows how to use __Client__ as a publisher that publishes sensor readings every `5` seconds.
The __Client__ uses TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token.
[publisher] [publisher]
[endsect] [endsect]
@ -36,7 +35,6 @@ The __Client__ uses TCP to connect to the Broker and modified __USE_AWAITABLE__
[section:receiver The receiver] [section:receiver The receiver]
This example shows how to use __Client__ as a receiver. This example shows how to use __Client__ as a receiver.
The __Client__ subscribes and indefinitely receives Application Messages from the Broker. The __Client__ subscribes and indefinitely receives Application Messages from the Broker.
The __Client__ uses TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token.
[receiver] [receiver]
[endsect] [endsect]

View File

@ -8,10 +8,20 @@ if(PROJECT_IS_TOP_LEVEL)
find_package(async-mqtt5 REQUIRED) find_package(async-mqtt5 REQUIRED)
endif() endif()
# set(EXAMPLE <your-source-file>.cpp) function(add_example name)
set(EXAMPLE hello_world_over_tcp.cpp) add_executable("${name}" ${ARGN})
target_link_libraries("${name}" PRIVATE Async::MQTT5)
string(FIND "${example_name}" "tls" found_tls)
if(found_tls GREATER -1)
target_link_libraries("${name}" PRIVATE OpenSSL::SSL)
endif()
endfunction()
find_package(OpenSSL REQUIRED) # if you require SSL connection file(GLOB examples "*.cpp")
add_executable(example ${EXAMPLE})
target_compile_features(example PRIVATE cxx_std_17) # or cxx_std_20 foreach(file_path ${examples})
target_link_libraries(example PRIVATE Async::MQTT5 OpenSSL::SSL) get_filename_component(example_name "${file_path}" NAME_WE)
add_example("${example_name}" "${file_path}")
endforeach()
find_package(OpenSSL REQUIRED)

View File

@ -5,31 +5,47 @@
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
// //
//[hello_world_in_coro_multithreaded_env
#include <vector>
#include <thread>
#include <boost/asio/use_awaitable.hpp> #include <boost/asio/use_awaitable.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT #ifdef BOOST_ASIO_HAS_CO_AWAIT
//[hello_world_in_coro_multithreaded_env
#include <vector>
#include <string>
#include <thread>
#include <boost/asio/as_tuple.hpp> #include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp> #include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/deferred.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/types.hpp>
using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>; struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
// client_type with logging enabled
using client_type = async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
>;
// client_type without logging
//using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>;
// Modified completion token that will prevent co_await from throwing exceptions. // Modified completion token that will prevent co_await from throwing exceptions.
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::deferred);
boost::asio::awaitable<void> publish_hello_world( boost::asio::awaitable<void> publish_hello_world(
client_type& client, const config& cfg, client_type& client,
const boost::asio::strand<boost::asio::io_context::executor_type>& strand const boost::asio::strand<boost::asio::thread_pool::executor_type>& strand
) { ) {
// Confirmation that the coroutine running in the strand. // Confirmation that the coroutine running in the strand.
assert(strand.running_in_this_thread()); assert(strand.running_in_this_thread());
@ -37,55 +53,63 @@ boost::asio::awaitable<void> publish_hello_world(
// All these function calls will be executed by the strand that is executing the coroutine. // All these function calls will be executed by the strand that is executing the coroutine.
// All the completion handler's associated executors will be that same strand // All the completion handler's associated executors will be that same strand
// because the Client was constructed with it as the default associated executor. // because the Client was constructed with it as the default associated executor.
client.brokers("<your-mqtt-broker>", 1883) client.brokers(cfg.brokers, cfg.port) // Set the Broker to connect to.
.async_run(boost::asio::detached); .credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the Client.
auto&& [ec, rc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>( auto&& [ec, rc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"<your-mqtt-topic>", "Hello world!", async_mqtt5::retain_e::no, "async-mqtt5/test" /* topic */, "Hello world!" /* payload*/, async_mqtt5::retain_e::no,
async_mqtt5::publish_props {}, use_nothrow_awaitable); async_mqtt5::publish_props {}, use_nothrow_awaitable);
co_await client.async_disconnect(use_nothrow_awaitable); co_await client.async_disconnect(use_nothrow_awaitable);
co_return; co_return;
} }
int main() { int main(int argc, char** argv) {
// Create a multithreaded environment where 4 threads config cfg;
// will be calling ioc.run().
// Number of threads that will call io_context::run(). if (argc == 4) {
int thread_num = 4; cfg.brokers = argv[1];
boost::asio::io_context ioc(4); cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
// Create the remaining threads (aside of this one). // Create a thread pool with 4 threads.
std::vector<std::thread> threads; boost::asio::thread_pool tp(4);
threads.reserve(thread_num - 1);
// Create an explicit strand from io_context's executor. // Create an explicit strand from io_context's executor.
// The strand guarantees a serialised handler execution regardless of the // The strand guarantees a serialised handler execution regardless of the
// number of threads running in the io_context. // number of threads running in the io_context.
boost::asio::strand strand = boost::asio::make_strand(ioc.get_executor()); boost::asio::strand strand = boost::asio::make_strand(tp.get_executor());
// Create the Client with the explicit strand as the default associated executor. // Create the Client with the explicit strand as the default associated executor.
client_type client(strand); client_type client(strand, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
// Spawn the coroutine. // Spawn the coroutine.
// The executor that executes the coroutine must be the same executor // The executor that executes the coroutine must be the same executor
// that is the Client's default associated executor. // that is the Client's default associated executor.
co_spawn(strand, publish_hello_world(client, strand), boost::asio::detached); co_spawn(
strand,
publish_hello_world(cfg, client, strand),
[](std::exception_ptr e) {
if (e)
std::rethrow_exception(e);
}
);
// Call ioc.run() in the other threads. tp.join();
for (int i = 0; i < thread_num - 1; ++i)
threads.emplace_back([&ioc] { ioc.run(); });
// Call ioc.run() on this thread.
ioc.run();
for (auto& t : threads)
if (t.joinable()) t.join();
return 0; return 0;
} }
#endif
//] //]
#else
#include <iostream>
int main() {
std::cout << "This example requires C++20 standard to compile and run" << std::endl;
}
#endif

View File

@ -7,89 +7,98 @@
//[hello_world_in_multithreaded_env //[hello_world_in_multithreaded_env
#include <iostream> #include <iostream>
#include <vector> #include <string>
#include <thread> #include <thread>
#include <vector>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/dispatch.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/thread_pool.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>
int main() { struct config {
// Create a multithreaded environment where 4 threads std::string brokers = "broker.hivemq.com";
// will be calling ioc.run(). uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
// Number of threads that will call io_context::run(). int main(int argc, char** argv) {
int thread_num = 4; config cfg;
boost::asio::io_context ioc(4);
// Create the remaining threads (aside of this one). if (argc == 4) {
std::vector<std::thread> threads; cfg.brokers = argv[1];
threads.reserve(thread_num - 1); cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
// Create a thread pool with 4 threads.
boost::asio::thread_pool tp(4);
// Create an explicit strand from io_context's executor. // Create an explicit strand from thread_pool's executor.
// The strand guarantees a serialised handler execution regardless of the // The strand guarantees a serialised handler execution regardless of the
// number of threads running in the io_context. // number of threads running in the thread_pool.
boost::asio::strand strand = boost::asio::make_strand(ioc.get_executor()); boost::asio::strand strand = boost::asio::make_strand(tp.get_executor());
// Create the Client with the explicit strand as the default associated executor. // Create the Client with the explicit strand as the default associated executor.
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(strand); async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
> client(strand, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
// Configure the client. // Configure the client.
client.brokers("<your-mqtt-broker>", 1883); client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.credentials(cfg.client_id); // Set the Client Identifier. (optional)
// Start the Client. // Start the Client.
// The async_run function call must be posted/dispatched to the strand. // The async_run function call must be posted/dispatched to the strand.
boost::asio::post( boost::asio::dispatch(
strand, boost::asio::bind_executor(
[&client, &strand] { strand,
// Considering that the default associated executor of all completion handlers is the strand, [&client, &strand, &cfg] {
// it is not necessary to explicitly bind it to async_run or other async_xxx's handlers. // Considering that the default associated executor of all completion handlers is the strand,
client.async_run(boost::asio::detached); // it is not necessary to explicitly bind it to async_run or other async_xxx's handlers.
} client.async_run(boost::asio::detached); // Start the Client.
}
)
); );
// The async_publish function call must be posted/dispatched to the strand. // The async_publish function call must be posted/dispatched to the strand.
// The associated executor of async_publish's completion handler must be the same strand. // The associated executor of async_publish's completion handler must be the same strand.
boost::asio::post( boost::asio::dispatch(
strand, boost::asio::bind_executor(
[&client, &strand] { strand,
assert(strand.running_in_this_thread()); [&client, &strand, &cfg] {
assert(strand.running_in_this_thread());
client.async_publish<async_mqtt5::qos_e::at_least_once>( client.async_publish<async_mqtt5::qos_e::at_least_once>(
"<your-mqtt-topic>", "Hello world!", async_mqtt5::retain_e::no, "async-mqtt5/test", "Hello World!", async_mqtt5::retain_e::no,
async_mqtt5::publish_props {}, async_mqtt5::publish_props {},
// You may bind the strand to this handler, but it is not necessary // You may bind the strand to this handler, but it is not necessary
// as the strand is already the default associated handler. // as the strand is already the default associated handler.
// However, you must not bind it to any other executor! [&client, &strand](
[&client, &strand]( async_mqtt5::error_code ec, async_mqtt5::reason_code rc,
async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::puback_props props
async_mqtt5::puback_props props ) {
) { assert(strand.running_in_this_thread());
assert(strand.running_in_this_thread());
std::cout << ec.message() << std::endl; std::cout << ec.message() << std::endl;
std::cout << rc.message() << std::endl; std::cout << rc.message() << std::endl;
// Stop the Client. This will cause ioc.run() to return. // Stop the Client.
client.cancel(); client.cancel();
} }
); );
} }
)
); );
// Call ioc.run() on the other threads. tp.join();
for (int i = 0; i < thread_num - 1; ++i)
threads.emplace_back([&ioc] { ioc.run(); });
// Call ioc.run() on this thread.
ioc.run();
for (auto& t : threads)
if (t.joinable()) t.join();
return 0; return 0;
} }

View File

@ -7,27 +7,48 @@
//[hello_world_over_tcp //[hello_world_over_tcp
#include <iostream> #include <iostream>
#include <string>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/types.hpp>
struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883; // 1883 is the default TCP MQTT port.
std::string client_id = "async_mqtt5_tester";
};
int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
int main() {
//[init_tcp_client
// Initialize the execution context required to run I/O operations.
boost::asio::io_context ioc; boost::asio::io_context ioc;
// Construct the Client with ``__TCP_SOCKET__`` as the underlying stream. //[init_tcp_client_with_logger
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc); // Construct the Client with ``__TCP_SOCKET__`` as the underlying stream and enabled logging.
// Since we are not establishing a secure connection, set the TlsContext template parameter to std::monostate.
async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
> client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
//] //]
// If you want to use the Client without logging, initialise it with the following line instead.
//async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc);
//[configure_tcp_client //[configure_tcp_client
// 1883 is the default TCP MQTT port. client.brokers(cfg.brokers, cfg.port) // Set the Broker to connect to.
client.brokers("broker.hivemq.com", 1883) .credentials(cfg.client_id) // Set the Client Identifier. (optional)
.credentials("async_mqtt5_tester") .async_run(boost::asio::detached); // Start the Client.
.async_run(boost::asio::detached);
//] //]
//[publish_hello_world //[publish_hello_world

View File

@ -7,21 +7,30 @@
//[hello_world_over_tls //[hello_world_over_tls
#include <iostream> #include <iostream>
#include <string>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/ssl.hpp> #include <boost/asio/ssl.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/types.hpp>
struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 8883; // 8883 is the default TLS MQTT port.
std::string client_id = "async_mqtt5_tester";
};
// External customization point. // External customization point.
namespace async_mqtt5 { namespace async_mqtt5 {
// Specify that the TLS handshake will be performed as a client.
template <typename StreamBase> template <typename StreamBase>
struct tls_handshake_type<boost::asio::ssl::stream<StreamBase>> { struct tls_handshake_type<boost::asio::ssl::stream<StreamBase>> {
static constexpr auto client = boost::asio::ssl::stream_base::client; static constexpr auto client = boost::asio::ssl::stream_base::client;
static constexpr auto server = boost::asio::ssl::stream_base::server;
}; };
// This client uses this function to indicate which hostname it is // This client uses this function to indicate which hostname it is
@ -37,49 +46,49 @@ void assign_tls_sni(
} // end namespace async_mqtt5 } // end namespace async_mqtt5
// The certificate file in the PEM format. int main(int argc, char** argv) {
constexpr char ca_cert[] = config cfg;
"-----BEGIN CERTIFICATE-----\n"
"...........................\n" if (argc == 4) {
"-----END CERTIFICATE-----\n" cfg.brokers = argv[1];
; cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
int main() {
boost::asio::io_context ioc; boost::asio::io_context ioc;
// Context satisfying ``__TlsContext__`` requirements that the underlying SSL stream will use. // TLS context that the underlying SSL stream will use.
// The purpose of the context is to allow us to set up TLS/SSL-related options. // The purpose of the context is to allow us to set up TLS/SSL-related options.
// See ``__SSL__`` for more information and options. // See ``__SSL__`` for more information and options.
boost::asio::ssl::context context(boost::asio::ssl::context::tls_client); boost::asio::ssl::context context(boost::asio::ssl::context::tls_client);
async_mqtt5::error_code ec; // Set up the TLS context.
// This step is highly dependent on the specific requirements of the Broker you are connecting to.
// Add the trusted certificate authority for performing verification. // Each broker may have its own standards and expectations for establishing a secure TLS/SSL connection.
context.add_certificate_authority(boost::asio::buffer(ca_cert), ec); // This can include verifying certificates, setting up private keys, PSK authentication, and others.
if (ec)
std::cout << "Failed to add certificate authority!" << std::endl;
ec.clear();
// Set peer verification mode used by the context.
// This will verify that the server's certificate is valid and signed by a trusted certificate authority.
context.set_verify_mode(boost::asio::ssl::verify_peer, ec);
if (ec)
std::cout << "Failed to set peer verification mode!" << std::endl;
ec.clear();
// Construct the Client with ``__SSL_STREAM__`` as the underlying stream // Construct the Client with ``__SSL_STREAM__`` as the underlying stream
// with ``__SSL_CONTEXT__`` as the ``__TlsContext__`` type. // with ``__SSL_CONTEXT__`` as the ``__TlsContext__`` type
// and logging enabled.
async_mqtt5::mqtt_client< async_mqtt5::mqtt_client<
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>, boost::asio::ssl::stream<boost::asio::ip::tcp::socket>,
boost::asio::ssl::context boost::asio::ssl::context,
> client(ioc, std::move(context)); async_mqtt5::logger
> client(ioc, std::move(context), async_mqtt5::logger(async_mqtt5::log_level::info));
// 8883 is the default TLS MQTT port.
client.brokers("<your-mqtt-broker>", 8883) // If you want to use the Client without logging, initialise it with the following line instead.
.async_run(boost::asio::detached); //async_mqtt5::mqtt_client<
// boost::asio::ssl::stream<boost::asio::ip::tcp::socket>,
// boost::asio::ssl::context
//> client(ioc, std::move(context));
client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the Client.
client.async_publish<async_mqtt5::qos_e::at_most_once>( client.async_publish<async_mqtt5::qos_e::at_most_once>(
"<topic>", "Hello world!", "async-mqtt5/test", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props{}, async_mqtt5::retain_e::no, async_mqtt5::publish_props{},
[&client](async_mqtt5::error_code ec) { [&client](async_mqtt5::error_code ec) {
std::cout << ec.message() << std::endl; std::cout << ec.message() << std::endl;

View File

@ -7,6 +7,7 @@
//[hello_world_over_websocket_tcp //[hello_world_over_websocket_tcp
#include <iostream> #include <iostream>
#include <string>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
@ -14,24 +15,47 @@
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/types.hpp>
#include <async_mqtt5/websocket.hpp> // WebSocket traits #include <async_mqtt5/websocket.hpp> // WebSocket traits
int main() { struct config {
std::string brokers = "broker.hivemq.com/mqtt"; // Path example: localhost/mqtt
uint16_t port = 8000; // 8083 is the default Webscoket/TCP MQTT port. However, HiveMQ's public broker uses 8000 instead.
std::string client_id = "async_mqtt5_tester";
};
int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
boost::asio::io_context ioc; boost::asio::io_context ioc;
// Construct the Client with WebSocket/TCP as the underlying stream. // Construct the Client with WebSocket/TCP as the underlying stream and enabled logging.
// Since we are not establishing a secure connection, set the TlsContext template parameter to std::monostate.
async_mqtt5::mqtt_client< async_mqtt5::mqtt_client<
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> boost::beast::websocket::stream<boost::asio::ip::tcp::socket>,
> client(ioc); std::monostate,
async_mqtt5::logger
> client(ioc, {}, async_mqtt5::logger(async_mqtt5::log_level::info));
// 8083 is the default Webscoket/TCP MQTT port. // If you want to use the Client without logging, initialise it with the following line instead.
client.brokers("<your-mqtt-broker-path>", 8083) // Path example: localhost/mqtt //async_mqtt5::mqtt_client<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>> client(ioc);
.async_run(boost::asio::detached);
client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the Client.
client.async_publish<async_mqtt5::qos_e::at_most_once>( client.async_publish<async_mqtt5::qos_e::at_most_once>(
"<topic>", "Hello world!", "async-mqtt5/test", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props{}, async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
[&client](async_mqtt5::error_code ec) { [&client](async_mqtt5::error_code ec) {
std::cout << ec.message() << std::endl; std::cout << ec.message() << std::endl;
client.async_disconnect(boost::asio::detached); client.async_disconnect(boost::asio::detached);

View File

@ -7,6 +7,7 @@
//[hello_world_over_websocket_tls //[hello_world_over_websocket_tls
#include <iostream> #include <iostream>
#include <string>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
@ -14,18 +15,27 @@
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/beast/ssl/ssl_stream.hpp> // async_teardown specialization for websocket ssl stream #include <boost/beast/ssl/ssl_stream.hpp> // async_teardown specialization for WebSocket SSL stream
#include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/types.hpp>
#include <async_mqtt5.hpp>
#include <async_mqtt5/websocket.hpp> // WebSocket traits #include <async_mqtt5/websocket.hpp> // WebSocket traits
struct config {
std::string brokers = "broker.hivemq.com/mqtt"; // Path example: localhost/mqtt
uint16_t port = 8884; // 8884 is the default Websocket/TLS MQTT port.
std::string client_id = "async_mqtt5_tester";
};
// External customization point. // External customization point.
namespace async_mqtt5 { namespace async_mqtt5 {
// Specify that the TLS handshake will be performed as a client.
template <typename StreamBase> template <typename StreamBase>
struct tls_handshake_type<boost::asio::ssl::stream<StreamBase>> { struct tls_handshake_type<boost::asio::ssl::stream<StreamBase>> {
static constexpr auto client = boost::asio::ssl::stream_base::client; static constexpr auto client = boost::asio::ssl::stream_base::client;
static constexpr auto server = boost::asio::ssl::stream_base::server;
}; };
// This client uses this function to indicate which hostname it is // This client uses this function to indicate which hostname it is
@ -41,49 +51,47 @@ void assign_tls_sni(
} // end namespace async_mqtt5 } // end namespace async_mqtt5
// The certificate file in the PEM format. int main(int argc, char** argv) {
constexpr char ca_cert[] = config cfg;
"-----BEGIN CERTIFICATE-----\n"
"...........................\n" if (argc == 4) {
"-----END CERTIFICATE-----\n" cfg.brokers = argv[1];
; cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
int main() {
boost::asio::io_context ioc; boost::asio::io_context ioc;
// Context satisfying ``__TlsContext__`` requirements that the underlying SSL stream will use. // TLS context that the underlying SSL stream will use.
// The purpose of the context is to allow us to set up TLS/SSL-related options. // The purpose of the context is to allow us to set up TLS/SSL-related options.
// See ``__SSL__`` for more information and options. // See ``__SSL__`` for more information and options.
boost::asio::ssl::context context(boost::asio::ssl::context::tls_client); boost::asio::ssl::context context(boost::asio::ssl::context::tls_client);
async_mqtt5::error_code ec; // Set up the TLS context.
// This step is highly dependent on the specific requirements of the Broker you are connecting to.
// Add the trusted certificate authority for performing verification. // Each broker may have its own standards and expectations for establishing a secure TLS/SSL connection.
context.add_certificate_authority(boost::asio::buffer(ca_cert), ec); // This can include verifying certificates, setting up private keys, PSK authentication, and others.
if (ec)
std::cout << "Failed to add certificate authority!" << std::endl;
ec.clear();
// Set peer verification mode used by the context.
// This will verify that the server's certificate is valid and signed by a trusted certificate authority.
context.set_verify_mode(boost::asio::ssl::verify_peer, ec);
if (ec)
std::cout << "Failed to set peer verification mode!" << std::endl;
ec.clear();
// Construct the Client with WebSocket/SSL as the underlying stream // Construct the Client with WebSocket/SSL as the underlying stream
// with ``__SSL_CONTEXT__`` as the ``__TlsContext__`` type. // with ``__SSL_CONTEXT__`` as the ``__TlsContext__`` type and enabled logging.
async_mqtt5::mqtt_client< async_mqtt5::mqtt_client<
boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>, boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>,
boost::asio::ssl::context boost::asio::ssl::context,
> client(ioc, std::move(context)); async_mqtt5::logger
> client(ioc, std::move(context), async_mqtt5::logger(async_mqtt5::log_level::info));
// 8884 is the default Websocket/TLS MQTT port. // If you want to use the Client without logging, initialise it with the following line instead.
client.brokers("<your-mqtt-broker-path>", 8884) // Path example: localhost/mqtt //async_mqtt5::mqtt_client<
.async_run(boost::asio::detached); // boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>,
// boost::asio::ssl::context
//> client(ioc, std::move(context));
client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the Client.
client.async_publish<async_mqtt5::qos_e::at_most_once>( client.async_publish<async_mqtt5::qos_e::at_most_once>(
"<topic>", "Hello world!", "async-mqtt5/test", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props{}, async_mqtt5::retain_e::no, async_mqtt5::publish_props{},
[&client](async_mqtt5::error_code ec) { [&client](async_mqtt5::error_code ec) {
std::cout << ec.message() << std::endl; std::cout << ec.message() << std::endl;

View File

@ -7,27 +7,50 @@
//[multiflight_client //[multiflight_client
#include <iostream> #include <iostream>
#include <string>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp> #include <boost/asio/signal_set.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>
struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
int main() {
boost::asio::io_context ioc; boost::asio::io_context ioc;
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc); // Construct the Client with ``__TCP_SOCKET__`` as the underlying stream and enabled logging.
// Since we are not establishing a secure connection, set the TlsContext template parameter to std::monostate.
async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
> client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
client.brokers("<your-mqtt-broker>", 1883) client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.async_run(boost::asio::detached); .credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the client.
// Publish with QoS 2 five times in a row without waiting for the handler // Publish with QoS 2 five times in a row without waiting for the handler
// of the previous async_publish call to be invoked. // of the previous async_publish call to be invoked.
for (auto i = 1; i <= 5; ++i) for (auto i = 1; i <= 5; ++i)
client.async_publish<async_mqtt5::qos_e::exactly_once>( client.async_publish<async_mqtt5::qos_e::exactly_once>(
"<topic>", "Hello world!", "async-mqtt5/test", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
[i](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::pubcomp_props) { [i](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::pubcomp_props) {
std::cout << "Publish number " << i << " completed with: " << std::endl; std::cout << "Publish number " << i << " completed with: " << std::endl;

View File

@ -5,6 +5,9 @@
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
// //
#include <boost/asio/use_awaitable.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
//[publisher //[publisher
#include <chrono> #include <chrono>
#include <cstdlib> #include <cstdlib>
@ -13,22 +16,35 @@
#include <boost/asio/as_tuple.hpp> #include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp> #include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp> #include <boost/asio/signal_set.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
// Modified completion token that will prevent co_await from throwing exceptions. // Modified completion token that will prevent co_await from throwing exceptions.
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::deferred);
using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>; // client_type with logging enabled
using client_type = async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
>;
// client_type without logging
//using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>;
int next_sensor_reading() { int next_sensor_reading() {
srand(static_cast<unsigned int>(std::time(0))); srand(static_cast<unsigned int>(std::time(0)));
@ -36,12 +52,13 @@ int next_sensor_reading() {
} }
boost::asio::awaitable<void> publish_sensor_readings( boost::asio::awaitable<void> publish_sensor_readings(
client_type& client, boost::asio::steady_timer& timer const config& cfg, client_type& client, boost::asio::steady_timer& timer
) { ) {
// Configure the Client. // Configure the Client.
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
client.brokers("<your-mqtt-broker>", 1883) // Broker that we want to connect to. 1883 is the default TCP port. client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.async_run(boost::asio::detached); // Start the client. .credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the Client.
for (;;) { for (;;) {
// Get the next sensor reading. // Get the next sensor reading.
@ -49,7 +66,7 @@ boost::asio::awaitable<void> publish_sensor_readings(
// Publish the sensor reading with QoS 1. // Publish the sensor reading with QoS 1.
auto&& [ec, rc, props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>( auto&& [ec, rc, props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"<your-mqtt-topic>", reading, "async-mqtt5/test" /* topic */, reading /* payload */,
async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, use_nothrow_awaitable async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, use_nothrow_awaitable
); );
// An error can occur as a result of: // An error can occur as a result of:
@ -78,12 +95,20 @@ boost::asio::awaitable<void> publish_sensor_readings(
co_return; co_return;
} }
int main() { int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
// Initialise execution context. // Initialise execution context.
boost::asio::io_context ioc; boost::asio::io_context ioc;
// Initialise the Client to connect to the Broker over TCP. // Initialise the Client to connect to the Broker over TCP.
client_type client(ioc); client_type client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
// Initialise the timer. // Initialise the timer.
boost::asio::steady_timer timer(ioc); boost::asio::steady_timer timer(ioc);
@ -98,12 +123,27 @@ int main() {
}); });
// Spawn the coroutine. // Spawn the coroutine.
co_spawn(ioc.get_executor(), publish_sensor_readings(client, timer), boost::asio::detached); co_spawn(
ioc.get_executor(),
publish_sensor_readings(cfg, client, timer),
[](std::exception_ptr e) {
if (e)
std::rethrow_exception(e);
}
);
// Start the execution. // Start the execution.
ioc.run(); ioc.run();
} }
#endif
//] //]
#else
#include <iostream>
int main() {
std::cout << "This example requires C++20 standard to compile and run" << std::endl;
}
#endif

View File

@ -5,31 +5,48 @@
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
// //
#include <boost/asio/use_awaitable.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
//[receiver //[receiver
#include <iostream> #include <iostream>
#include <string>
#include <boost/asio/as_tuple.hpp> #include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp> #include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp> #include <boost/asio/signal_set.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
// Modified completion token that will prevent co_await from throwing exceptions. // Modified completion token that will prevent co_await from throwing exceptions.
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::deferred);
using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>; // client_type with logging enabled
using client_type = async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
>;
// client_type without logging
//using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>;
boost::asio::awaitable<bool> subscribe(client_type& client) { boost::asio::awaitable<bool> subscribe(client_type& client) {
// Configure the request to subscribe to a Topic. // Configure the request to subscribe to a Topic.
async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{ async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{
"<your-mqtt-topic>", "test" /* topic */,
async_mqtt5::subscribe_options { async_mqtt5::subscribe_options {
async_mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2. async_mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2.
async_mqtt5::no_local_e::no, // Forward message from Clients with same ID. async_mqtt5::no_local_e::no, // Forward message from Clients with same ID.
@ -55,10 +72,13 @@ boost::asio::awaitable<bool> subscribe(client_type& client) {
co_return !ec && !sub_codes[0]; // True if the subscription was successfully established. co_return !ec && !sub_codes[0]; // True if the subscription was successfully established.
} }
boost::asio::awaitable<void> subscribe_and_receive(client_type& client) { boost::asio::awaitable<void> subscribe_and_receive(
const config& cfg, client_type& client
) {
// Configure the Client. // Configure the Client.
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
client.brokers("<your-mqtt-broker>", 1883) // Broker that we want to connect to. 1883 is the default TCP port. client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the client. .async_run(boost::asio::detached); // Start the client.
// Before attempting to receive an Application Message from the Topic we just subscribed to, // Before attempting to receive an Application Message from the Topic we just subscribed to,
@ -90,12 +110,20 @@ boost::asio::awaitable<void> subscribe_and_receive(client_type& client) {
co_return; co_return;
} }
int main() { int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
// Initialise execution context. // Initialise execution context.
boost::asio::io_context ioc; boost::asio::io_context ioc;
// Initialise the Client to connect to the Broker over TCP. // Initialise the Client to connect to the Broker over TCP.
client_type client(ioc); client_type client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
// Set up signals to stop the program on demand. // Set up signals to stop the program on demand.
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
@ -106,12 +134,27 @@ int main() {
}); });
// Spawn the coroutine. // Spawn the coroutine.
co_spawn(ioc, subscribe_and_receive(client), boost::asio::detached); co_spawn(
ioc,
subscribe_and_receive(cfg, client),
[](std::exception_ptr e) {
if (e)
std::rethrow_exception(e);
}
);
// Start the execution. // Start the execution.
ioc.run(); ioc.run();
} }
#endif
//] //]
#else
#include <iostream>
int main() {
std::cout << "This example requires C++20 standard to compile and run" << std::endl;
}
#endif

View File

@ -5,13 +5,18 @@
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
// //
#include <boost/asio/use_awaitable.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
//[timeout_with_awaitable_operators //[timeout_with_awaitable_operators
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <string>
#include <boost/asio/as_tuple.hpp> #include <boost/asio/as_tuple.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/co_spawn.hpp> #include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp> #include <boost/asio/detached.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp> #include <boost/asio/use_awaitable.hpp>
@ -19,42 +24,72 @@
#include <boost/asio/experimental/parallel_group.hpp> #include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
// Modified completion token that will prevent co_await from throwing exceptions. // Modified completion token that will prevent co_await from throwing exceptions.
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::deferred);
using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>; // client_type with logging enabled
using client_type = async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
>;
boost::asio::awaitable<void> send_over_mqtt(client_type& client, const std::string& message) { // client_type without logging
client.brokers("<your-mqtt-broker>", 1883) //using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>;
.async_run(boost::asio::detached);
boost::asio::awaitable<void> send_over_mqtt(
const config& cfg, client_type& client
) {
client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the Client.
auto&& [pec, prc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>( auto&& [pec, prc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"<your-mqtt-topic>", message, "async-mqtt5/test", "Hello World!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
use_nothrow_awaitable use_nothrow_awaitable
); );
co_await client.async_disconnect(use_nothrow_awaitable); co_await client.async_disconnect(use_nothrow_awaitable);
co_return;
} }
int main() { int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
boost::asio::io_context ioc; boost::asio::io_context ioc;
co_spawn(ioc, [&ioc]() -> boost::asio::awaitable<void> { co_spawn(
// Construct the Client. ioc,
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc); [&ioc, &cfg]() -> boost::asio::awaitable<void> {
// Initialise the Client to connect to the Broker over TCP.
client_type client(ioc);
// You can also initialise the Client and its logger with a specific log_level (default log_level::info).
//client_type client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::debug));
// Construct the timer. // Construct the timer.
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5)); boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
using namespace boost::asio::experimental::awaitable_operators; using namespace boost::asio::experimental::awaitable_operators;
auto res = co_await ( auto res = co_await (
send_over_mqtt(client, "Hello world!") || send_over_mqtt(cfg, client) ||
timer.async_wait(use_nothrow_awaitable) timer.async_wait(boost::asio::as_tuple(boost::asio::use_awaitable))
); );
// The timer expired first. The client is cancelled. // The timer expired first. The client is cancelled.
@ -64,11 +99,24 @@ int main() {
else else
std::cout << "Send over MQTT completed!" << std::endl; std::cout << "Send over MQTT completed!" << std::endl;
}, boost::asio::detached); },
[](std::exception_ptr e) {
if (e)
std::rethrow_exception(e);
}
);
ioc.run(); ioc.run();
} }
#endif
//] //]
#else
#include <iostream>
int main() {
std::cout << "This example requires C++20 standard to compile and run" << std::endl;
}
#endif

View File

@ -19,23 +19,47 @@
#include <boost/asio/experimental/parallel_group.hpp> #include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp> #include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>
struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
int main() {
boost::asio::io_context ioc; boost::asio::io_context ioc;
// Construct the Client. // Construct the Client with ``__TCP_SOCKET__`` as the underlying stream and enabled logging.
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc); // Since we are not establishing a secure connection, set the TlsContext template parameter to std::monostate.
async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
> client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
// If you want to use the Client without logging, initialise it with the following line instead.
//async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc);
// Construct the timer. // Construct the timer.
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5)); boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
client.brokers("<your-mqtt-broker>", 1883) client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.async_run(boost::asio::detached); .credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the Client.
// Subscribe to a Topic. // Subscribe to a Topic.
client.async_subscribe( client.async_subscribe(
{ "<your-mqtt-topic>" }, async_mqtt5::subscribe_props {}, { "test" /* Topic */}, async_mqtt5::subscribe_props {},
[](async_mqtt5::error_code ec, std::vector<async_mqtt5::reason_code> rcs, async_mqtt5::suback_props) { [](async_mqtt5::error_code ec, std::vector<async_mqtt5::reason_code> rcs, async_mqtt5::suback_props) {
std::cout << "[subscribe ec]: " << ec.message() << std::endl; std::cout << "[subscribe ec]: " << ec.message() << std::endl;
std::cout << "[subscribe rc]: " << rcs[0].message() << std::endl; std::cout << "[subscribe rc]: " << rcs[0].message() << std::endl;

View File

@ -98,8 +98,9 @@ public:
if (++it != eps.end()) if (++it != eps.end())
std::clog << ","; std::clog << ",";
} }
std::clog << "]" << std::endl; std::clog << "]";
} }
std::clog << std::endl;
} }
/** /**
@ -114,7 +115,7 @@ public:
output_prefix(); output_prefix();
std::clog std::clog
<< "connect: " << "TCP connect: "
<< ep.address().to_string() << ":" << ep.port() << ep.address().to_string() << ":" << ep.port()
<< " - " << ec.message() << " - " << ec.message()
<< std::endl; << std::endl;

View File

@ -116,7 +116,7 @@ BOOST_AUTO_TEST_CASE(successful_connect_debug) {
std::string log = output.rdbuf()->str(); std::string log = output.rdbuf()->str();
BOOST_TEST_MESSAGE(log); BOOST_TEST_MESSAGE(log);
BOOST_TEST_WARN(contains(log, "resolve")); BOOST_TEST_WARN(contains(log, "resolve"));
BOOST_TEST_WARN(contains(log, "connect")); BOOST_TEST_WARN(contains(log, "TCP connect"));
BOOST_TEST_WARN(contains(log, "TLS handshake")); BOOST_TEST_WARN(contains(log, "TLS handshake"));
BOOST_TEST_WARN(contains(log, "WebSocket handshake")); BOOST_TEST_WARN(contains(log, "WebSocket handshake"));
BOOST_TEST_WARN(contains(log, "connack")); BOOST_TEST_WARN(contains(log, "connack"));