Update documentation and examples for clarity

Summary:
related to T13767

- Rewrite chaining of configuration functions
- Move examples not related to documentation to dev folder
- Rewrite links for clarity
- Update CMakeLists.txt to run one of the examples
- Rewrite auto reconnect and auto retry mechanism for clarity

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Differential Revision: https://repo.mireo.local/D29044
This commit is contained in:
Korina Šimičević
2024-04-24 14:03:18 +02:00
parent bfae05f7a9
commit 9b7f852342
14 changed files with 31 additions and 1254 deletions

View File

@ -60,6 +60,7 @@
[def __SSL_CONTEXT__ [asioreflink ssl__context ssl::context]]
[def __SSL_STREAM__ [asioreflink ssl__stream ssl::stream<__TCP_SOCKET__>]]
[def __WEBSOCKET_STREAM__ [beastreflink boost__beast__websocket__stream websocket::stream<NextLayer>]]
[def __BEAST_ASYNC_TEARDOWN__ [beastreflink boost__beast__websocket__async_teardown boost::beast::websocket::async_teardown]]
[/ MQTT ]
[def __MQTT__ [@https://mqtt.org/ MQTT]]

View File

@ -49,21 +49,22 @@ The following example illustrates a simple scenario of configuring a Client and
[!c++]
#include <iostream>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
int main() {
boost::asio::io_context ioc;
using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>;
client_type c(ioc);
c.credentials("<your-client-id>", "<client-username>", "<client-pwd>")
.brokers("<your-mqtt-broker>", 1883)
.async_run(asio::detached);
.async_run(boost::asio::detached);
c.async_publish<async_mqtt5::qos_e::at_most_once>(
"<topic>", "Hello world!",

View File

@ -19,12 +19,18 @@ Note that the same principle applies to all other asynchronous functions within
and [refmem mqtt_client async_disconnect]).
// Publishing with QoS 1 involves a two-step process: sending a PUBLISH message to the Broker and awaiting a PUBACK (acknowledgement) response.
// The scenarios that might unfold include:
// a) The Client sends the PUBLISH message immediately.
// b) If the Client is offline when attempting to publish, it queues the PUBLISH message and sends it
// as soon as the connection is re-established.
// c) Should the Client lose connection after sending the PUBLISH message but before receiving a PUBACK,
// it will automatically retransmit the PUBLISH message once connectivity is restored.
// The scenarios that might happen are:
// 1) The Client will immediately send the PUBLISH message and start to wait for the PUBACK reply.
// The user's completion handler will not be invoked yet, regardless of whether the Client successfully sent the message.
// 2) When the Client fails to send the message, it will try to reconnect to the Broker automatically.
// If it succeeds, it will resend the message. If reconnect fails, the Client will try to connect again until it succeeds.
// Meanwhile, the user's completion handler will NOT be invoked except when the Client detects an unrecoverable error (for example, when the list of configured brokers is empty).
// Note that the Client will try to connect indefinitely, meaning the user's completion handler may never be invoked.
// 3) When the Client successfully sends the PUBLISH message, the Broker is required to respond with a PUBACK message.
// The reply message may indicate success or an MQTT error (for example, signalling that the message is not accepted due to implementation or administrative limits).
// The Client will read the PUBACK message and call the user's completion handler with the appropriate arguments.
// 4) The PUBACK message should arrive within 20 seconds of the PUBLISH message being successfully sent.
// If it does not, the PUBLISH message will be sent again. In the meantime, the user's callback will not be invoked.
client.async_publish<async_mqtt5::qos_e::at_least_once>(
"my-topic", "Hello world!",
@ -90,8 +96,6 @@ The __Client__ is designed to automatically buffer requests that are initiated w
During extended downtime or when a high volume of requests accumulates, this can lead to an increase in memory usage.
This aspect is significant for devices with limited resources, as the growing memory consumption can impact their performance and functionality.
[/ TODO: link to the debugging the client chapter ]
[endsect] [/cons]
[endsect] [/auto_reconnect]

View File

@ -8,24 +8,10 @@ if(PROJECT_IS_TOP_LEVEL)
find_package(async-mqtt5 REQUIRED)
endif()
function(add_example name)
add_executable("${name}" ${ARGN})
target_compile_features("${name}" PRIVATE cxx_std_20) # for coroutines
target_link_libraries("${name}" PRIVATE Async::MQTT5)
endfunction()
# set(EXAMPLE <your-source-file>.cpp)
set(EXAMPLE hello_world_over_tcp.cpp)
foreach(f publisher receiver)
add_example("${f}" "${f}.cpp")
endforeach()
set(EXAMPLES
tcp.cpp
openssl_tls.cpp
websocket_tcp.cpp
websocket_tls.cpp
)
find_package(OpenSSL REQUIRED)
add_executable(examples src/run_examples.cpp ${EXAMPLES})
target_compile_features(examples PRIVATE cxx_std_17)
target_link_libraries(examples PRIVATE Async::MQTT5 OpenSSL::SSL)
find_package(OpenSSL REQUIRED) # if you require SSL connection
add_executable(example ${EXAMPLE})
target_compile_features(example PRIVATE cxx_std_17) # or cxx_std_20
target_link_libraries(example PRIVATE Async::MQTT5 OpenSSL::SSL)

View File

@ -1,115 +0,0 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
#include <iostream>
namespace asio = boost::asio;
using stream_type = asio::ip::tcp::socket;
using client_type = async_mqtt5::mqtt_client<stream_type>;
void run_with_callbacks(client_type& client) {
//[publish_callback
// Publish an Application Message with QoS 0.
client.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
// Callback with signature void (error_code)
[](async_mqtt5::error_code ec) {
std::cout << "error_code: " << ec.message() << std::endl;
}
);
// Publish an Application Message with QoS 1.
client.async_publish<async_mqtt5::qos_e::at_least_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
// Callback with signature void (error_code, reason_code, puback_props)
[](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::puback_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
}
);
// Publish an Application Message with QoS 2.
client.async_publish<async_mqtt5::qos_e::exactly_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
// Callback with signature (error_code, reason_code, pubcomp_props)
[](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::pubcomp_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
}
);
//]
//[subscribe_callback
// Subscribe to a single Topic.
client.async_subscribe(
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } }, async_mqtt5::subscribe_props {},
// Callback with signature void (error_code, std::vector<reason_code>, suback_props)
[](async_mqtt5::error_code ec,
std::vector<async_mqtt5::reason_code> codes, async_mqtt5::suback_props
) {
std::cout << "subscribe error_code: " << ec.message() << std::endl;
std::cout << "subscribe reason_code: " << codes[0].message() << std::endl;
}
);
//]
//[receive_callback
// Receive an Application Message.
client.async_receive(
// Callback with signature void (error_code, std::string, std::string, publish_props)
[] (
async_mqtt5::error_code ec, std::string topic,
std::string payload, async_mqtt5::publish_props
) {
std::cout << "topic: " << topic << std::endl;
std::cout << "payload: " << payload << std::endl;
}
);
//]
//[unsubscribe_callback
// Unsubscribe from the Topic.
client.async_unsubscribe("test/mqtt-test", async_mqtt5::unsubscribe_props {},
//Callback with signature void (error_code, std::vector<reason_code>, unsuback_props)
[](async_mqtt5::error_code ec,
std::vector<async_mqtt5::reason_code> codes, async_mqtt5::unsuback_props
) {
std::cout << "unsubscribe error_code: " << ec.message() << std::endl;
std::cout << "unsubscribe reason_code: " << codes[0].message() << std::endl;
}
);
//]
//[disconnect_callback
// Disconnect the Client.
client.async_disconnect(
async_mqtt5::disconnect_rc_e::disconnect_with_will_message,
async_mqtt5::disconnect_props {},
// Callback with signature void (error_code)
[](async_mqtt5::error_code) {}
);
//]
}
int main(int argc, char** argv) {
asio::io_context ioc;
// Make an instance of mqtt_client. Establish a TCP connection with the Broker.
client_type c(ioc.get_executor());
c.credentials("test-client", "username", "password")
.brokers("mqtt.broker", 1883)
.async_run(asio::detached);
run_with_callbacks(c);
ioc.run();
}

View File

@ -1,177 +0,0 @@
#include <boost/asio/use_awaitable.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
namespace asio = boost::asio;
using stream_type = asio::ip::tcp::socket;
using client_type = async_mqtt5::mqtt_client<stream_type>;
asio::awaitable<void> coroutine(client_type& client) {
//[publish_coro
// Publish an Application Message with QoS 0.
// The handler signature for this function is void (error_code).
// However, when using asio::use_awaitable as a completion token,
// the error_code is not returned but thrown as an exception if an error occurrs.
co_await client.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_awaitable
);
// Publish an Application Message with QoS 1.
// The handler signature for this function is void (error_code, reason_code, puback_props).
// With asio::use_awaitable as a completion token, the co_await will return reason_code and puback_props.
auto [puback_rc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_awaitable
);
// Publish an Application Message with QoS 2.
// The handler signature for this function is void (error_code, reason_code, pubcomp_props).
// With asio::use_awaitable as a completion token, the co_await will return reason_code and pubcomp_props.
auto [pubcomp_rc, pubcomp_props] = co_await client.async_publish<async_mqtt5::qos_e::exactly_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_awaitable
);
//]
//[subscribe_coro
// Subscribe to a single Topic.
// The handler signature for this function is void (error_code, std::vector<reason_code>, suback_props).
// With asio::use_awaitable as a completion token, the co_await
// will return std::vector<reason_code> and suback_props.
auto [sub_codes, sub_props] = co_await client.async_subscribe(
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } },
async_mqtt5::subscribe_props {}, asio::use_awaitable
);
//]
//[receive_coro
// Receive an Application Message.
// The co_await call will return std::string (topic), std::string (payload) and publish_props.
// Note: the coroutine will be suspended until an Application Message is ready to be received
// or an error has occurred. In theory, the coroutine could be suspended indefinitely.
// Avoid calling this if you have not successfully subscribed to a Topic.
auto [topic, payload, publish_props] = co_await client.async_receive(asio::use_awaitable);
//]
//[unsubscribe_coro
// Unsubscribe from the Topic.
// The handler signature for this function is void (error_code, std::vector<reason_code>, unsuback_props).
// With asio::use_awaitable as a completion token, the co_await
// will return std::vector<reason_code> and unsuback_props.
auto [unsub_codes, unsub_props] = co_await client.async_unsubscribe(
"test/mqtt-test", async_mqtt5::unsubscribe_props {},
asio::use_awaitable
);
//]
//[disconnect_coro
// Disconnect the Client.
// With asio::use_awaitable as a completion token and void (error_code) as the completion signature,
// the co_await has nothing to return.
co_await client.async_disconnect(
async_mqtt5::disconnect_rc_e::disconnect_with_will_message,
async_mqtt5::disconnect_props {},
asio::use_awaitable
);
//]
co_return;
}
//[no_throw_awaitable
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
//]
asio::awaitable<void> nothrow_coroutine(client_type& client) {
//[publish_coro_nothrow
async_mqtt5::error_code ec;
async_mqtt5::reason_code rc;
std::tie(ec) = co_await client.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
use_nothrow_awaitable
);
async_mqtt5::puback_props puback_props;
std::tie(ec, rc, puback_props) = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
use_nothrow_awaitable
);
async_mqtt5::pubcomp_props pubcomp_props;
std::tie(ec, rc, pubcomp_props) = co_await client.async_publish<async_mqtt5::qos_e::exactly_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
use_nothrow_awaitable
);
//]
//[subscribe_coro_nothrow
std::vector<async_mqtt5::reason_code> rcs;
async_mqtt5::suback_props suback_props;
std::tie(ec, rcs, suback_props) = co_await client.async_subscribe(
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } },
async_mqtt5::subscribe_props {}, use_nothrow_awaitable
);
//]
//[receive_coro_nothrow
std::string topic, payload;
async_mqtt5::publish_props publish_props;
std::tie(ec, topic, payload, publish_props) = co_await client.async_receive(use_nothrow_awaitable);
//]
//[unsubscribe_coro_nothrow
async_mqtt5::unsuback_props unsuback_props;
std::tie(ec, rcs, unsuback_props) = co_await client.async_unsubscribe(
std::vector<std::string>{ "test/mqtt-test" }, async_mqtt5::unsubscribe_props {},
use_nothrow_awaitable
);
//]
//[disconnect_coro_nothrow
std::tie(ec) = co_await client.async_disconnect(
async_mqtt5::disconnect_rc_e::disconnect_with_will_message,
async_mqtt5::disconnect_props {},
use_nothrow_awaitable
);
//]
co_return;
}
int main(int argc, char** argv) {
asio::io_context ioc;
// Make an instance of mqtt_client. Establish a TCP connection with the Broker.
client_type c(ioc.get_executor());
c.credentials("test-client", "username", "password")
.brokers("mqtt.broker", 1883)
.async_run(asio::detached);
co_spawn(ioc.get_executor(), coroutine(c), asio::detached);
// or...
co_spawn(ioc.get_executor(), nothrow_coroutine(c), asio::detached);
ioc.run();
}
#endif

View File

@ -1,110 +0,0 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
#include <iostream>
#include <thread>
namespace asio = boost::asio;
using stream_type = asio::ip::tcp::socket;
using client_type = async_mqtt5::mqtt_client<stream_type>;
void run_with_future(client_type& client) {
//[publish_future
// Just like the asio::use_awaitable completion token
// (see ``__EXAMPLE_COROUTINE__``), the ``__USE_FUTURE__`` completion token
// will not return the error_code. Instead, it will throw an exception if an error has occurred.
std::future<void> pub_qos0_fut =
client.async_publish<async_mqtt5::qos_e::at_most_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_future
);
pub_qos0_fut.get(); // Blocking call!
using qos1_fut_type = std::tuple<async_mqtt5::reason_code, async_mqtt5::puback_props>;
std::future<qos1_fut_type> pub_qos1_fut =
client.async_publish<async_mqtt5::qos_e::at_least_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_future
);
auto [qos1_rc, puback_props] = pub_qos1_fut.get();
std::cout << "Publish QoS 1 Reason Code: " << qos1_rc.message() << std::endl;
using qos2_fut_type = std::tuple<async_mqtt5::reason_code, async_mqtt5::pubcomp_props>;
std::future<qos2_fut_type> pub_qos2_fut =
client.async_publish<async_mqtt5::qos_e::exactly_once>(
"test/mqtt-test", "Hello world!",
async_mqtt5::retain_e::yes, async_mqtt5::publish_props {},
asio::use_future
);
auto [qos2_rc, pubcomp_props] = pub_qos2_fut.get();
std::cout << "Publish QoS 2 Reason Code: " << qos2_rc.message() << std::endl;
//]
//[subscribe_future
using sub_fut_type = std::tuple<std::vector<async_mqtt5::reason_code>, async_mqtt5::suback_props>;
std::future<sub_fut_type> sub_fut = client.async_subscribe(
{ "test/mqtt-test", { async_mqtt5::qos_e::exactly_once } },
async_mqtt5::subscribe_props {}, asio::use_future
);
auto [sub_rcs, suback_props] = sub_fut.get();
std::cout << "Subscribe Reason Code: " << sub_rcs[0].message() << std::endl;
//]
//[receive_future
// Note: the get() call on async_receive future could block indefinitely if the ``__Client__``
// failed to subscribe or there are no Application Messages to be received from the subscribed Topic!
using rec_fut_type = std::tuple<std::string, std::string, async_mqtt5::publish_props>;
std::future<rec_fut_type> rec_fut = client.async_receive(asio::use_future);
auto [topic, payload, publish_props] = rec_fut.get();
std::cout << "Received message from Topic: " << topic << ", " << payload << std::endl;
//]
//[unsubscribe_future
using unsub_fut_type = std::tuple<std::vector<async_mqtt5::reason_code>, async_mqtt5::unsuback_props>;
std::future<unsub_fut_type> unsub_fut = client.async_unsubscribe(
"test/mqtt-test", async_mqtt5::unsubscribe_props {}, asio::use_future
);
auto [unsub_rcs, unsuback_props] = unsub_fut.get();
std::cout << "Unubscribe Reason Code: " << unsub_rcs[0].message() << std::endl;
//]
//[disconnect_future
std::future<void> dc_fut = client.async_disconnect(asio::use_future);
dc_fut.get();
//]
return;
}
int main(int argc, char** argv) {
// asio::io_context must be running in more than one thread!
constexpr auto thread_num = 2;
asio::io_context ioc(thread_num);
std::vector<std::thread> threads;
threads.reserve(thread_num - 1);
// Make an instance of mqtt_client. Establish a TCP connection with the Broker.
client_type c(ioc.get_executor());
c.credentials("test-client", "", "")
.brokers("mqtt.broker", 1883)
.async_run(asio::detached);
for (int i = 0; i < thread_num - 1; ++i)
threads.emplace_back([&ioc] { ioc.run(); });
run_with_future(c);
ioc.run();
for (auto& t : threads)
if (t.joinable()) t.join();
}

View File

@ -12,7 +12,7 @@
int main() {
boost::asio::io_context ioc;
// Construct the Client with ``[beastreflink boost__beast__websocket__stream websocket::stream<__TCP_SOCKET__>]`` as the underlying stream.
// Construct the Client with WebSocket/TCP as the underlying stream.
async_mqtt5::mqtt_client<
boost::beast::websocket::stream<boost::asio::ip::tcp::socket>
> client(ioc);

View File

@ -12,12 +12,12 @@
namespace boost::beast::websocket {
// ``[beastreflink boost__beast__websocket__async_teardown boost::beast::websocket::async_teardown]`` is a free function
// designed to initiate the asynchronous teardown of a connection.
// boost::beast::websocket::async_teardown is a free function designed to initiate the asynchronous teardown of a connection.
// The specific behaviour of this function is based on the NextLayer type (Socket type) used to create the ``__WEBSOCKET_STREAM__``.
// ``__Beast__`` library includes an implementation of this function for ``__TCP_SOCKET__``.
// However, the callers are responsible for providing a suitable overload of this function for any other type,
// such as ``__SSL_STREAM__`` as shown in this example.
// See ``__BEAST_ASYNC_TEARDOWN__`` for more information.
template <typename TeardownHandler>
void async_teardown(
boost::beast::role_type role,
@ -82,8 +82,8 @@ int main() {
std::cout << "Failed to set peer verification mode!" << std::endl;
ec.clear();
// Construct the Client with ``[beastreflink boost__beast__websocket__stream websocket::stream<__SSL_STREAM__>]``
// as the underlying stream with ``__SSL_CONTEXT__`` as the ``__TlsContext__`` type.
// Construct the Client with WebSocket/SSL as the underlying stream
// with ``__SSL_CONTEXT__`` as the ``__TlsContext__`` type.
async_mqtt5::mqtt_client<
boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>,
boost::asio::ssl::context
@ -104,4 +104,4 @@ int main() {
ioc.run();
}
//]
//]

View File

@ -1,228 +0,0 @@
#include <iostream>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
namespace asio = boost::asio;
namespace async_mqtt5 {
template <typename StreamBase>
struct tls_handshake_type<asio::ssl::stream<StreamBase>> {
static constexpr auto client = asio::ssl::stream_base::client;
static constexpr auto server = asio::ssl::stream_base::server;
};
template <typename StreamBase>
void assign_tls_sni(
const authority_path& ap,
asio::ssl::context& /* ctx */,
asio::ssl::stream<StreamBase>& stream
) {
SSL_set_tlsext_host_name(stream.native_handle(), ap.host.c_str());
}
} // end namespace async_mqtt5
constexpr char mireo_ca[] =
"-----BEGIN CERTIFICATE-----\n"
"MIIDUTCCAjmgAwIBAgIUAzV59EhZA5MXluHNqRi9cBP0x9swDQYJKoZIhvcNAQEL\n"
"BQAwGDEWMBQGA1UEAwwNTWlyZW8gUm9vdCBDQTAeFw0yMjA0MDcxMzM1MjlaFw0z\n"
"MjA0MDQxMzM1MjlaMBgxFjAUBgNVBAMMDU1pcmVvIFJvb3QgQ0EwggEiMA0GCSqG\n"
"SIb3DQEBAQUAA4IBDwAwggEKAoIBAQCin/qsHpdxT3iW0SEHhAcTfESyQcfwGtJE\n"
"jcRrGEj36X6eahyY4AF+4Mlz2vWFeW52ayGXpQKn/z4tChdN80txdY77YmEX7XE0\n"
"HHZYY6toNq/+mNX9h2HvB0GW+8+E0YfNN/HloTxDo3RT8+IovY9OSXt44vY4YtQK\n"
"JbvZIm2Q8Iuv3vfNR05uFa4HcNqFhELh10jss0xG/54Y2NvB6xdKOZ8LRQuIX+Fu\n"
"QRzMiqRFQPUJzWxbKF5I/MFiKWmAG0QNPDnlb8XtPmFTFCWY9X96wOpQOczrxT2+\n"
"+vnTxPA3aTAkz7M4yUuocZQqTlbdfdGOSAENXavewdMCyy5bQsSLAgMBAAGjgZIw\n"
"gY8wHQYDVR0OBBYEFLdUGYfJRf9mbM/fTav9U2vFI+TRMFMGA1UdIwRMMEqAFLdU\n"
"GYfJRf9mbM/fTav9U2vFI+TRoRykGjAYMRYwFAYDVQQDDA1NaXJlbyBSb290IENB\n"
"ghQDNXn0SFkDkxeW4c2pGL1wE/TH2zAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIB\n"
"BjANBgkqhkiG9w0BAQsFAAOCAQEAHm5d4YUP8BYcks10UCdswLtxbMUN99fNbnYo\n"
"RMxx4EapwhEZFSNbIZvf1INJd5Po+hH5jteBeFVP+4zKqrhg3I8pjdC4josHmrhS\n"
"28OjOFWp6xNJC43BHnLpc84bH0+XIEBbk7YA6H3GjpsZ7aJkhj/JPjjNq7bmyYN7\n"
"1I9RK4PtIrNtUFbSsHZCZhf8Amtl8PrpktITECjfqCq+8uOAqP4McTIQ1JKwYy6f\n"
"O6iu0eybJCFhWYENTUQyPi1VtEwOpWNLzaXBYdj69Xg8wA/J9RZIoqXWvtHv4rPF\n"
"HGudMEIVB3y2vVLmujvQCqYPZWwbgpy5mN3F4uBNuZhTIwWRFg==\n"
"-----END CERTIFICATE-----\n"
;
void publish_qos0_openssl_tls() {
std::cout << "[Test-publish-qos0-openssl-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ssl::stream<asio::ip::tcp::socket>;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
error_code ec;
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-qos0-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
retain_e::no, publish_props{},
[&c](error_code ec) {
std::cout << "error_code: " << ec.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void publish_qos1_openssl_tls() {
std::cout << "[Test-publish-qos1-openssl-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ssl::stream<asio::ip::tcp::socket>;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
error_code ec;
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-qos1-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
retain_e::no, publish_props{},
[&c](error_code ec, reason_code rc, puback_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void publish_qos2_openssl_tls() {
std::cout << "[Test-publish-qos2-openssl-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ssl::stream<asio::ip::tcp::socket>;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
error_code ec;
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-qos2-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
retain_e::no, publish_props{},
[&c](error_code ec, reason_code rc, pubcomp_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void subscribe_and_receive_openssl_tls(int num_receive) {
std::cout << "[Test-subscribe-and-receive-openssl-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ssl::stream<asio::ip::tcp::socket>;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
error_code ec;
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-subscriber-openssl-tls", "", "")
.brokers("emqtt.mireo.local", 8883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
std::vector<subscribe_topic> topics;
topics.push_back(subscribe_topic {
"test/mqtt-test", {
qos_e::exactly_once,
no_local_e::no,
retain_as_published_e::retain,
retain_handling_e::send
}
});
c.async_subscribe(
topics, subscribe_props {},
[](error_code ec, std::vector<reason_code> codes, suback_props) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "subscribe error_code: " << ec.message() << std::endl;
std::cout << "subscribe reason_code: " << codes[0].message() << std::endl;
}
);
for (auto i = 0; i < num_receive; i++) {
c.async_receive(
[&c, i, num_receive] (
error_code ec, std::string topic,
std::string payload, publish_props
) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "message " << i + 1 << "/" << num_receive << std::endl;
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "topic: " << topic << std::endl;
std::cout << "payload: " << payload << std::endl;
if (i == num_receive - 1)
c.async_disconnect(asio::detached);
}
);
}
ioc.run();
return;
}
void run_openssl_tls_examples() {
publish_qos0_openssl_tls();
publish_qos1_openssl_tls();
publish_qos2_openssl_tls();
subscribe_and_receive_openssl_tls(1);
}

View File

@ -1,15 +0,0 @@
void run_tcp_examples();
void run_openssl_tls_examples();
void run_websocket_tcp_examples();
void run_websocket_tls_examples();
int main() {
run_tcp_examples();
run_openssl_tls_examples();
run_websocket_tcp_examples();
run_websocket_tls_examples();
return 0;
}

View File

@ -1,151 +0,0 @@
#include <iostream>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5.hpp>
namespace asio = boost::asio;
void publish_qos0_tcp() {
std::cout << "[Test-publish-qos0-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ip::tcp::socket;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
connect_props props;
props[prop::maximum_packet_size] = 1024;
c.credentials("test-qos0-tcp", "", "")
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.connect_properties(std::move(props))
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
retain_e::no, publish_props{},
[&c](error_code ec) {
std::cout << "error_code: " << ec.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
}
void publish_qos1_tcp() {
std::cout << "[Test-publish-qos1-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ip::tcp::socket;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
c.credentials("test-qos1-tcp", "", "")
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
retain_e::no, publish_props {},
[&c](error_code ec, reason_code rc, puback_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
}
void publish_qos2_tcp() {
std::cout << "[Test-publish-qos2-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ip::tcp::socket;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
c.credentials("test-qos2-tcp", "", "")
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
retain_e::no, publish_props {},
[&c](error_code ec, reason_code rc, pubcomp_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
}
void subscribe_and_receive_tcp(int num_receive) {
std::cout << "[Test-subscribe-and-receive-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = asio::ip::tcp::socket;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
c.credentials("test-subscriber-tcp", "", "")
.brokers("emqtt.mireo.local", 1883)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_subscribe(
{ "test/mqtt-test", { qos_e::exactly_once } }, subscribe_props {},
[](error_code ec, std::vector<reason_code> codes, suback_props) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "subscribe error_code: " << ec.message() << std::endl;
std::cout << "subscribe reason_code: " << codes[0].message() << std::endl;
}
);
for (auto i = 0; i < num_receive; i++) {
c.async_receive(
[&c, i, num_receive] (
error_code ec, std::string topic,
std::string payload, publish_props
) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "message " << i + 1 << "/" << num_receive << std::endl;
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "topic: " << topic << std::endl;
std::cout << "payload: " << payload << std::endl;
if (i == num_receive - 1)
c.async_disconnect(asio::detached);
}
);
}
ioc.run();
}
void run_tcp_examples() {
publish_qos0_tcp();
publish_qos1_tcp();
publish_qos2_tcp();
subscribe_and_receive_tcp(1);
}

View File

@ -1,172 +0,0 @@
#include <iostream>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/websocket.hpp>
#include <async_mqtt5.hpp>
namespace asio = boost::asio;
void publish_qos0_websocket_tcp() {
std::cout << "[Test-publish-qos0-websocket-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ip::tcp::socket
>;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
c.credentials("test-qos0-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
retain_e::no, publish_props{},
[&c](error_code ec) {
std::cout << "error_code: " << ec.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void publish_qos1_websocket_tcp() {
std::cout << "[Test-publish-qos1-websocket-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ip::tcp::socket
>;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
c.credentials("test-qos1-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
async_mqtt5::retain_e::no, publish_props{},
[&c](error_code ec, reason_code rc, puback_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void publish_qos2_websocket_tcp() {
std::cout << "[Test-publish-qos2-websocket-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ip::tcp::socket
>;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
c.credentials("test-qos2-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
retain_e::no, publish_props{},
[&c](error_code ec, reason_code rc, pubcomp_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void subscribe_and_receive_websocket_tcp(int num_receive) {
std::cout << "[Test-subscribe-and-receive-websocket-tcp]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ip::tcp::socket
>;
using client_type = mqtt_client<stream_type>;
client_type c(ioc);
c.credentials("test-subscriber-websocket-tcp", "", "")
.brokers("emqtt.mireo.local/mqtt", 8083)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
std::vector<subscribe_topic> topics;
topics.push_back(subscribe_topic{
"test/mqtt-test", {
qos_e::exactly_once,
no_local_e::no,
retain_as_published_e::retain,
retain_handling_e::send
}
});
c.async_subscribe(
topics, subscribe_props{},
[](error_code ec, std::vector<reason_code> codes, suback_props) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "subscribe error_code: " << ec.message() << std::endl;
std::cout << "subscribe reason_code: " << codes[0].message() << std::endl;
}
);
for (auto i = 0; i < num_receive; i++) {
c.async_receive(
[&c, i, num_receive] (
error_code ec, std::string topic,
std::string payload, publish_props
) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "message " << i + 1 << "/" << num_receive << std::endl;
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "topic: " << topic << std::endl;
std::cout << "payload: " << payload << std::endl;
if (i == num_receive - 1)
c.async_disconnect(asio::detached);
}
);
}
ioc.run();
return;
}
void run_websocket_tcp_examples() {
publish_qos0_websocket_tcp();
publish_qos1_websocket_tcp();
publish_qos2_websocket_tcp();
subscribe_and_receive_websocket_tcp(1);
}

View File

@ -1,247 +0,0 @@
#include <iostream>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/websocket.hpp>
#include <async_mqtt5.hpp>
namespace asio = boost::asio;
namespace boost::beast::websocket {
template <typename TeardownHandler>
void async_teardown(
boost::beast::role_type /* role */,
asio::ssl::stream<asio::ip::tcp::socket>& stream,
TeardownHandler&& handler
) {
return stream.async_shutdown(std::forward<TeardownHandler>(handler));
}
} // end namespace boost::beast::websocket
namespace async_mqtt5 {
template <typename StreamBase>
struct tls_handshake_type<asio::ssl::stream<StreamBase>> {
static constexpr auto client = asio::ssl::stream_base::client;
static constexpr auto server = asio::ssl::stream_base::server;
};
template <typename StreamBase>
void assign_tls_sni(
const authority_path& ap,
asio::ssl::context& /* ctx */,
asio::ssl::stream<StreamBase>& stream
) {
SSL_set_tlsext_host_name(stream.native_handle(), ap.host.c_str());
}
} // end namespace async_mqtt5
constexpr char mireo_ca[] =
"-----BEGIN CERTIFICATE-----\n"
"MIIDUTCCAjmgAwIBAgIUAzV59EhZA5MXluHNqRi9cBP0x9swDQYJKoZIhvcNAQEL\n"
"BQAwGDEWMBQGA1UEAwwNTWlyZW8gUm9vdCBDQTAeFw0yMjA0MDcxMzM1MjlaFw0z\n"
"MjA0MDQxMzM1MjlaMBgxFjAUBgNVBAMMDU1pcmVvIFJvb3QgQ0EwggEiMA0GCSqG\n"
"SIb3DQEBAQUAA4IBDwAwggEKAoIBAQCin/qsHpdxT3iW0SEHhAcTfESyQcfwGtJE\n"
"jcRrGEj36X6eahyY4AF+4Mlz2vWFeW52ayGXpQKn/z4tChdN80txdY77YmEX7XE0\n"
"HHZYY6toNq/+mNX9h2HvB0GW+8+E0YfNN/HloTxDo3RT8+IovY9OSXt44vY4YtQK\n"
"JbvZIm2Q8Iuv3vfNR05uFa4HcNqFhELh10jss0xG/54Y2NvB6xdKOZ8LRQuIX+Fu\n"
"QRzMiqRFQPUJzWxbKF5I/MFiKWmAG0QNPDnlb8XtPmFTFCWY9X96wOpQOczrxT2+\n"
"+vnTxPA3aTAkz7M4yUuocZQqTlbdfdGOSAENXavewdMCyy5bQsSLAgMBAAGjgZIw\n"
"gY8wHQYDVR0OBBYEFLdUGYfJRf9mbM/fTav9U2vFI+TRMFMGA1UdIwRMMEqAFLdU\n"
"GYfJRf9mbM/fTav9U2vFI+TRoRykGjAYMRYwFAYDVQQDDA1NaXJlbyBSb290IENB\n"
"ghQDNXn0SFkDkxeW4c2pGL1wE/TH2zAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIB\n"
"BjANBgkqhkiG9w0BAQsFAAOCAQEAHm5d4YUP8BYcks10UCdswLtxbMUN99fNbnYo\n"
"RMxx4EapwhEZFSNbIZvf1INJd5Po+hH5jteBeFVP+4zKqrhg3I8pjdC4josHmrhS\n"
"28OjOFWp6xNJC43BHnLpc84bH0+XIEBbk7YA6H3GjpsZ7aJkhj/JPjjNq7bmyYN7\n"
"1I9RK4PtIrNtUFbSsHZCZhf8Amtl8PrpktITECjfqCq+8uOAqP4McTIQ1JKwYy6f\n"
"O6iu0eybJCFhWYENTUQyPi1VtEwOpWNLzaXBYdj69Xg8wA/J9RZIoqXWvtHv4rPF\n"
"HGudMEIVB3y2vVLmujvQCqYPZWwbgpy5mN3F4uBNuZhTIwWRFg==\n"
"-----END CERTIFICATE-----\n"
;
void publish_qos0_websocket_tls() {
std::cout << "[Test-publish-qos0-websocket-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ssl::stream<asio::ip::tcp::socket>
>;
error_code ec;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-qos0-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", async_mqtt5::qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!",
retain_e::no, publish_props{},
[&c](error_code ec) {
std::cout << "error_code: " << ec.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void publish_qos1_websocket_tls() {
std::cout << "[Test-publish-qos1-websocket-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ssl::stream<asio::ip::tcp::socket>
>;
error_code ec;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-qos1-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
retain_e::no, publish_props{},
[&c](error_code ec, reason_code rc, puback_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void publish_qos2_websocket_tls() {
std::cout << "[Test-publish-qos2-websocket-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ssl::stream<asio::ip::tcp::socket>
>;
error_code ec;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-qos2-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
c.async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
retain_e::no, publish_props{},
[&c](error_code ec, reason_code rc, pubcomp_props) {
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "reason_code: " << rc.message() << std::endl;
c.async_disconnect(asio::detached);
}
);
ioc.run();
return;
}
void subscribe_and_receive_websocket_tls(int num_receive) {
std::cout << "[Test-subscribe-and-receive-websocket-tls]" << std::endl;
using namespace async_mqtt5;
asio::io_context ioc;
using stream_type = boost::beast::websocket::stream<
asio::ssl::stream<asio::ip::tcp::socket>
>;
error_code ec;
asio::ssl::context tls_context(asio::ssl::context::tls_client);
tls_context.add_certificate_authority(asio::buffer(mireo_ca), ec);
tls_context.set_verify_mode(asio::ssl::verify_peer);
using client_type = mqtt_client<stream_type, decltype(tls_context)>;
client_type c(ioc, std::move(tls_context));
c.credentials("test-subscriber-websocket-tls", "", "")
.brokers("emqtt.mireo.local/mqtt", 8884)
.will({ "test/mqtt-test", "Client disconnected!", qos_e::at_least_once })
.async_run(asio::detached);
std::vector<subscribe_topic> topics;
topics.push_back(subscribe_topic{
"test/mqtt-test", {
qos_e::exactly_once,
no_local_e::no,
retain_as_published_e::retain,
retain_handling_e::send
}
});
c.async_subscribe(
topics, subscribe_props{},
[](error_code ec, std::vector<reason_code> codes, suback_props) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "subscribe error_code: " << ec.message() << std::endl;
std::cout << "subscribe reason_code: " << codes[0].message() << std::endl;
}
);
for (auto i = 0; i < num_receive; i++) {
c.async_receive(
[&c, i, num_receive] (
error_code ec, std::string topic,
std::string payload, publish_props
) {
if (ec == asio::error::operation_aborted)
return;
std::cout << "message " << i + 1 << "/" << num_receive << std::endl;
std::cout << "error_code: " << ec.message() << std::endl;
std::cout << "topic: " << topic << std::endl;
std::cout << "payload: " << payload << std::endl;
if (i == num_receive - 1)
c.async_disconnect(asio::detached);
}
);
}
ioc.run();
return;
}
void run_websocket_tls_examples() {
publish_qos0_websocket_tls();
publish_qos1_websocket_tls();
publish_qos2_websocket_tls();
subscribe_and_receive_websocket_tls(1);
}