diff --git a/doc/qbk/10_examples.qbk b/doc/qbk/10_examples.qbk index ca1dabc..ece0e25 100644 --- a/doc/qbk/10_examples.qbk +++ b/doc/qbk/10_examples.qbk @@ -6,11 +6,11 @@ The following list contains all the examples that showcase how to use the __Clie [variablelist [ [[link async_mqtt5.publisher publisher.cpp]] - [Shows how to use the __Client__ as a publisher.] + [Shows how to use the __Client__ as a publisher. The __Client__ publishes sensor readings every `5 seconds`.] ] [ [[link async_mqtt5.receiver receiver.cpp]] - [Shows how to use the __Client__ as a receiver.] + [Shows how to use the __Client__ as a receiver. The __Client__ subscribes and indefinitely receives Application Messages from the Broker.] ] [ [[link async_mqtt5.hello_world_over_tcp hello_world_over_tcp.cpp]] diff --git a/doc/qbk/examples/Examples.qbk b/doc/qbk/examples/Examples.qbk index c9978b5..71d7893 100644 --- a/doc/qbk/examples/Examples.qbk +++ b/doc/qbk/examples/Examples.qbk @@ -25,16 +25,17 @@ This example illustrates the process of setting up the Client to connect to the [endsect] [/hello_world_over_websocket_tls] [section:publisher The publisher] -This example will show how to use __Client__ as a publisher. -The __Client__ will use TCP to connect to the Broker and __USE_AWAITABLE__ as the completion token. +This example will show how to use __Client__ as a publisher that publishes sensor readings every `5` seconds. +The __Client__ will use TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token. [import ../../../example/publisher.cpp] [publisher] [endsect] [section:receiver The receiver] -This example will show how to use __Client__ as a receiver. -The __Client__ will use TCP to connect to the Broker and __USE_AWAITABLE__ as the completion token. +This example will show how to use __Client__ as a receiver. +The __Client__ subscribes and indefinitely receives Application Messages from the Broker. +The __Client__ will use TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token. [import ../../../example/receiver.cpp] [receiver] diff --git a/example/publisher.cpp b/example/publisher.cpp index e0e4a93..377aaa8 100644 --- a/example/publisher.cpp +++ b/example/publisher.cpp @@ -1,9 +1,13 @@ //[publisher +#include #include +#include #include #include #include +#include +#include #include #include @@ -12,43 +16,80 @@ #ifdef BOOST_ASIO_HAS_CO_AWAIT -namespace asio = boost::asio; +// Modified completion token that will prevent co_await from throwing exceptions. +constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); -asio::awaitable client_publisher(asio::io_context& ioc) { - // Initialise the Client, establish connection to the Broker over TCP. - async_mqtt5::mqtt_client client(ioc); +using client_type = async_mqtt5::mqtt_client; +int next_sensor_reading() { + srand(static_cast(std::time(0))); + return rand() % 100; +} + +boost::asio::awaitable publish_sensor_readings( + client_type& client, boost::asio::steady_timer& timer +) { // Configure the Client. // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. - client.brokers("mqtt.broker", 1883) // Broker that we want to connect to. 1883 is the default TCP port. - .async_run(asio::detached); // Start the client. + client.brokers("", 1883) // Broker that we want to connect to. 1883 is the default TCP port. + .async_run(boost::asio::detached); // Start the client. - // Publish an Application Message with QoS 1. - auto [rc, props] = co_await client.async_publish( - "test/mqtt-test", "my application message", - async_mqtt5::retain_e::yes, async_mqtt5::publish_props {}, asio::use_awaitable - ); - if (rc) - std::cout << "MQTT protocol error occurred: " << rc.message() << std::endl; + for (;;) { + // Get the next sensor reading. + auto reading = std::to_string(next_sensor_reading()); - // Publish some more messages... + // Publish the sensor reading with QoS 1. + auto&& [ec, rc, props] = co_await client.async_publish( + "", reading, + async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, use_nothrow_awaitable + ); + // An error can occur as a result of: + // a) wrong publish parameters + // b) mqtt_client::cancel is called while the Client is publishing the message + // resulting in cancellation. + if (ec) { + std::cout << "Publish error occurred: " << ec.message() << std::endl; + break; + } - // After we are done with publishing all the messages, disconnect the Client. - // Alternatively, you can also use mqtt_client::cancel. - // Regardless, you should ensure all the operations are completed before disconnecting the Client. - co_await client.async_disconnect( - async_mqtt5::disconnect_rc_e::normal_disconnection, async_mqtt5::disconnect_props {}, asio::use_awaitable - ); + // Reason code is the reply from the server presenting the result of the publish operation. + std::cout << "Result of publish request: " << rc.message() << std::endl; + if (!rc) + std::cout << "Published sensor reading: " << reading << std::endl; + + // Wait 5 seconds before publishing the next reading. + timer.expires_after(std::chrono::seconds(5)); + auto&& [tec] = co_await timer.async_wait(use_nothrow_awaitable); + + // An error occurred if we cancelled the timer. + if (tec) + break; + } co_return; } int main() { // Initialise execution context. - asio::io_context ioc; + boost::asio::io_context ioc; + + // Initialise the Client to connect to the Broker over TCP. + client_type client(ioc); + + // Initialise the timer. + boost::asio::steady_timer timer(ioc); + + // Set up signals to stop the program on demand. + boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); + signals.async_wait([&client, &timer](async_mqtt5::error_code /* ec */, int /* signal */) { + // After we are done with publishing all the messages, cancel the timer and the Client. + // Alternatively, use mqtt_client::async_disconnect. + timer.cancel(); + client.cancel(); + }); // Spawn the coroutine. - co_spawn(ioc.get_executor(), client_publisher(ioc), asio::detached); + co_spawn(ioc.get_executor(), publish_sensor_readings(client, timer), boost::asio::detached); // Start the execution. ioc.run(); diff --git a/example/receiver.cpp b/example/receiver.cpp index 938880f..b392221 100644 --- a/example/receiver.cpp +++ b/example/receiver.cpp @@ -1,9 +1,11 @@ //[receiver #include +#include #include #include #include +#include #include #include @@ -12,20 +14,15 @@ #ifdef BOOST_ASIO_HAS_CO_AWAIT -namespace asio = boost::asio; +// Modified completion token that will prevent co_await from throwing exceptions. +constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); -asio::awaitable client_receiver(asio::io_context& ioc) { - // Initialise the Client, establish connection to the Broker over TCP. - async_mqtt5::mqtt_client client(ioc); - - // Configure the Client. - // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. - client.brokers("mqtt.broker", 1883) // Broker that we want to connect to. 1883 is the default TCP port. - .async_run(asio::detached); // Start the client. +using client_type = async_mqtt5::mqtt_client; +boost::asio::awaitable subscribe(client_type& client) { // Configure the request to subscribe to a Topic. - async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic { - "test/mqtt-test", + async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{ + "", async_mqtt5::subscribe_options { async_mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2. async_mqtt5::no_local_e::no, // Forward message from Clients with same ID. @@ -35,45 +32,74 @@ asio::awaitable client_receiver(asio::io_context& ioc) { }; // Subscribe to a single Topic. - auto [sub_codes, sub_props] = co_await client.async_subscribe( - sub_topic, async_mqtt5::subscribe_props {}, asio::use_awaitable + auto&& [ec, sub_codes, sub_props] = co_await client.async_subscribe( + sub_topic, async_mqtt5::subscribe_props {}, use_nothrow_awaitable ); // Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call. - // std::vector sub_codes contain the result of the subscribe action for every Topic. + // An error can occur as a result of: + // a) wrong subscribe parameters + // b) mqtt_client::cancel is called while the Client is in the process of subscribing + if (ec) + std::cout << "Subscribe error occurred: " << ec.message() << std::endl; + else + std::cout << "Result of subscribe request: " << sub_codes[0].message() << std::endl; + + co_return !ec && !sub_codes[0]; // True if the subscription was successfully established. +} + +boost::asio::awaitable subscribe_and_receive(client_type& client) { + // Configure the Client. + // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. + client.brokers("", 1883) // Broker that we want to connect to. 1883 is the default TCP port. + .async_run(boost::asio::detached); // Start the client. + // Before attempting to receive an Application Message from the Topic we just subscribed to, // it is advisable to verify that the subscription succeeded. // It is not recommended to call mqtt_client::async_receive if you do not have any // subscription established as the corresponding handler will never be invoked. - if (!sub_codes[0]) - auto [topic, payload, publish_props] = co_await client.async_receive(asio::use_awaitable); - // Receive more messages... + if (!(co_await subscribe(client))) + co_return; - // Unsubscribe from the Topic. - // Similar to mqtt_client::async_subscribe call, std::vector unsub_codes contain - // the result of the unsubscribe action for every Topic. - auto [unsub_codes, unsub_props] = co_await client.async_unsubscribe( - "test/mqtt-test", async_mqtt5::unsubscribe_props {}, - asio::use_awaitable - ); - // Note: you can unsubscribe from multiple Topics in one mqtt_client::async_unsubscribe call. + for (;;) { + // Receive an Appplication Message from the subscribed Topic(s). + auto&& [ec, topic, payload, publish_props] = co_await client.async_receive(use_nothrow_awaitable); - // Disconnect the Client. - co_await client.async_disconnect( - async_mqtt5::disconnect_rc_e::disconnect_with_will_message, - async_mqtt5::disconnect_props {}, - asio::use_awaitable - ); + if (ec == async_mqtt5::client::error::session_expired) { + // The Client has reconnected, and the prior session has expired. + // As a result, any previous subscriptions have been lost and must be reinstated. + if (co_await subscribe(client)) + continue; + else + break; + } else if (ec) + break; + + std::cout << "Received message from the Broker" << std::endl; + std::cout << "\t topic: " << topic << std::endl; + std::cout << "\t payload: " << payload << std::endl; + } co_return; } int main() { // Initialise execution context. - asio::io_context ioc; + boost::asio::io_context ioc; + + // Initialise the Client to connect to the Broker over TCP. + client_type client(ioc); + + // Set up signals to stop the program on demand. + boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); + signals.async_wait([&client](async_mqtt5::error_code /* ec */, int /* signal */) { + // After we are done with publishing all the messages, cancel the timer and the Client. + // Alternatively, use mqtt_client::async_disconnect. + client.cancel(); + }); // Spawn the coroutine. - co_spawn(ioc, client_receiver(ioc), asio::detached); + co_spawn(ioc, subscribe_and_receive(client), boost::asio::detached); // Start the execution. ioc.run();