diff --git a/doc/qbk/00_main.qbk b/doc/qbk/00_main.qbk index a65351f..3d31a23 100644 --- a/doc/qbk/00_main.qbk +++ b/doc/qbk/00_main.qbk @@ -42,6 +42,7 @@ [def __Asio__ [@boost:/libs/asio/index.html Boost.Asio]] [def __Beast__ [@boost:/libs/beast/index.html Boost.Beast]] [def __ASIO_PER_OP_CANCELLATION__ [@boost:/doc/html/boost_asio/overview/core/cancellation.html Per-Operation Cancellation]] +[def __ASIO_PARALLEL_GROUP__ [@boost:/doc/html/boost_asio/overview/composition/parallel_group.html Co-ordinating Parallel Operations]] [def __IOC__ [@boost:doc/html/boost_asio/reference/io_context.html `boost::asio::io_context`]] [def __POST__ [@boost:doc/html/boost_asio/reference/post.html `boost::asio::post`]] [def __CO_SPAWN__ [@boost:/doc/html/boost_asio/reference/co_spawn.html `boost::asio::co_spawn`]] diff --git a/doc/qbk/07_asio_compliance.qbk b/doc/qbk/07_asio_compliance.qbk index 62e9156..1303c2a 100644 --- a/doc/qbk/07_asio_compliance.qbk +++ b/doc/qbk/07_asio_compliance.qbk @@ -6,9 +6,14 @@ Every asynchronous operation in __Asio__ has associated characteristics that spe * A *cancellation slot* determines how the asynchronous operations support cancellation. * An *executor* determines the queuing and execution strategy for completion handlers. -This section expands further into the roles of allocators, cancellation slots, and [link async_mqtt5.asio_compliance.executors Executors], -highlighting their integration and usage within the __Client__. +This section expands further into the roles of allocators, +cancellation slots, and executors, highlighting their integration and usage within the __Client__. + +* See [link async_mqtt5.asio_compliance.per_op_cancellation Per-Operation Cancellation] for more information about how +asynchronous operations within the __Client__ support cancellation. +* See [link async_mqtt5.asio_compliance.executors Executors] for more information about executors. [include 08_executors.qbk] +[include 09_per_op_cancellation.qbk] [endsect] [/asio_compliance] diff --git a/doc/qbk/09_per_op_cancellation.qbk b/doc/qbk/09_per_op_cancellation.qbk new file mode 100644 index 0000000..6db81fe --- /dev/null +++ b/doc/qbk/09_per_op_cancellation.qbk @@ -0,0 +1,89 @@ +[section:per_op_cancellation Per-Operation Cancellation] +[nochunk] + +In __Asio__, various objects such as sockets and timers offer the ability to terminate all ongoing asynchronous operations +globally through their `close` or `cancel` member functions. +Beyond this global cancellation capability, __Asio__ also provides mechanisms for cancelling specific asynchronous +operations on an individual basis. +This individual per-operation cancellation is enabled by specifying that a completion handler has an associated cancellation slot. +If the asynchronous operation supports cancellation, then it will install a cancellation handler into the associated slot. +The cancellation handler will be invoked when the user emits a cancellation signal into the cancellation slot. +See __ASIO_PER_OP_CANCELLATION__ for more information. + +The utility of per-operation cancellation becomes particularly evident when using `boost::asio::experimental::parallel_group` to launch +work performed in parallel and wait for one or all of the operations to complete. +For instance, in a parallel group awaiting the completion of any one operation among several, +completing one operation will trigger individual cancellation of all the remaining operations. +The same concept applies to the `awaitable operator ||`, which runs two awaitables in parallel, +waiting for one to complete and subsequently cancelling the remaining one. +See __ASIO_PARALLEL_GROUP__ for more information. + +Within the __Client__, every asynchronous function is designed to support individual per-operation cancellation. +This allows for associating of a cancellation slot with any `async_xxx` function call, enabling the emission of a cancellation signal as needed. +The impact of emitting a cancellation signal varies depending on the signal type (terminal, total, partial) and the operation being cancelled. +Detailed descriptions of how cancellation signals affect each `async_xxx` function +are provided in the ['Per-Operation Cancellation] paragraph in their respective sections of the __Client__ reference documentation. + +[heading Example: associating a cancellation slot with an asynchronous operation] + +This example illustrates associating a cancellation slot with a [refmem mqtt_client async_publish] operation +and emitting a terminal cancellation signal. +Executing this sequence effectively results in the immediate cancellation of the entire client operation, +mirroring the outcome of invoking [refmem mqtt_client cancel] directly. + +If a total or partial cancellation signal were issued instead of a terminal one, the implications would be less severe. +In such cases, the cancellation would specifically target resending the __PUBLISH__ packet, +preventing it from being retransmitted should the client reconnect during the ongoing operation. + +``` + async_mqtt5::mqtt_client client(ioc); + + client.brokers("", 1883) + .async_run(boost::asio::detached); + + boost::asio::cancellation_signal signal; + + client.async_publish( + "", "Hello world!", + async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, + boost::asio::bind_cancellation_slot( + signal.slot(), + [&client](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::puback_props props ) { + std::cout << ec.message() << std::endl; + } + ) + ); + + signal.emit(boost::asio::cancellation_type_t::terminal); +``` + +[section:parallel_group parallel_group/operator || and asynchronous functions in the mqtt_client] +As a result of supporting per-operation cancellation, +all the asynchronous functions with the __Client__ can be used in `parallel_group` or with `awaitable operator ||`. +This feature is especially beneficial for executing operations that require a timeout mechanism. + +Below are two examples illustrating how to implement a timeout: + +* [link async_mqtt5.timeout_with_parallel_group timeout_with_parallel_group.cpp] +* [link async_mqtt5.timeout_with_awaitable_operators timeout_with_awaitable_operators.cpp] + +[endsect] [/parallel_group] + +[section:protocol_level_cancellation About protocol-level cancellation] +In the context of __Client__, the handling of cancellation signals varies across different asynchronous operations. +Except for [refmem mqtt_client async_receive], all other `async_xxx` operations respond to a terminal cancellation signal by invoking [refmem mqtt_client cancel]. +These operations will halt the resending of certain packets for total and partial cancellation signals. + +It is worth noting that cancelling an `async_xxx` operation during an ongoing protocol exchange is not implemented because of a design decision +to prevent protocol breaches. +For example, if [refmem mqtt_client async_publish] with QoS 2 is in the middle of a communication with the Broker +[footnote Publishing with QoS 2 involves sending a __PUBLISH__ packet, receiving a __PUBREC__ acknowledgement from the Broker, transmitting a __PUBREL__ packet, and finally receiving a __PUBCOMP__ packet.] +and an attempt to cancel it is made, it could lead to a protocol violation. +For instance, if the operation is cancelled after a __PUBREC__ packet has been received from the Broker but before sending the __PUBREL__ packet, +that would breach the MQTT protocol by failing to send a necessary packet and leave the connection with the Broker in an invalid state. + +Therefore, the design of __Client__'s cancellation strategy carefully avoids these pitfalls to ensure continuous protocol compliance. + +[endsect] [/protocol_level_cancellation] + +[endsect] [/per_op_cancellation] diff --git a/doc/qbk/10_examples.qbk b/doc/qbk/10_examples.qbk index ece0e25..213f694 100644 --- a/doc/qbk/10_examples.qbk +++ b/doc/qbk/10_examples.qbk @@ -32,6 +32,20 @@ The following list contains all the examples that showcase how to use the __Clie [[link async_mqtt5.multiflight_client multiflight_client.cpp]] [Shows how to use the __Client__ to simultaneously dispatch multiple requests.] ] + [ + [[link async_mqtt5.timeout_with_parallel_group timeout_with_parallel_group.cpp]] + [ + Shows how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint + using parallel group. + ] + ] + [ + [[link async_mqtt5.timeout_with_awaitable_operators timeout_with_awaitable_operators.cpp]] + [ + Shows how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint + using awaitable operators. + ] + ] ] [endsect][/examples] diff --git a/doc/qbk/examples/Examples.qbk b/doc/qbk/examples/Examples.qbk index 71d7893..1108333 100644 --- a/doc/qbk/examples/Examples.qbk +++ b/doc/qbk/examples/Examples.qbk @@ -25,28 +25,46 @@ This example illustrates the process of setting up the Client to connect to the [endsect] [/hello_world_over_websocket_tls] [section:publisher The publisher] -This example will show how to use __Client__ as a publisher that publishes sensor readings every `5` seconds. -The __Client__ will use TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token. +This example shows how to use __Client__ as a publisher that publishes sensor readings every `5` seconds. +The __Client__ uses TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token. [import ../../../example/publisher.cpp] [publisher] [endsect] [section:receiver The receiver] -This example will show how to use __Client__ as a receiver. +This example shows how to use __Client__ as a receiver. The __Client__ subscribes and indefinitely receives Application Messages from the Broker. -The __Client__ will use TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token. +The __Client__ uses TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token. [import ../../../example/receiver.cpp] [receiver] [endsect] [section:multiflight_client The multiflight Client] -This example will show how to use __Client__ to simultaneously dispatch multiple -requests. +This example shows how to use __Client__ to simultaneously dispatch multiple requests. [import ../../../example/multiflight_client.cpp] [multiflight_client] [endsect] +[section:timeout_with_parallel_group Timed MQTT operations with parallel group] +This example demonstrates how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint +using parallel group. +Specifically, in this example, the __Client__ will subscribe to a Topic and try to receive a message from the Topic within `5 seconds`. + +[import ../../../example/timeout_with_parallel_group.cpp] +[timeout_with_parallel_group] +[endsect] + +[section:timeout_with_awaitable_operators Timed MQTT operations with awaitable operators] +This example demonstrates how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint +using awaitable operators. +Specifically, in this example, a call to [refmem mqtt_client async_publish] and [refmem mqtt_client async_disconnect] must complete +within `5 seconds`. Otherwise, they will be cancelled. + +[import ../../../example/timeout_with_awaitable_operators.cpp] +[timeout_with_awaitable_operators] +[endsect] + [block''''''] diff --git a/example/timeout_with_awaitable_operators.cpp b/example/timeout_with_awaitable_operators.cpp new file mode 100644 index 0000000..635bdf3 --- /dev/null +++ b/example/timeout_with_awaitable_operators.cpp @@ -0,0 +1,66 @@ +//[timeout_with_awaitable_operators +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#ifdef BOOST_ASIO_HAS_CO_AWAIT + +// Modified completion token that will prevent co_await from throwing exceptions. +constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); + +using client_type = async_mqtt5::mqtt_client; + +boost::asio::awaitable send_over_mqtt(client_type& client, const std::string& message) { + client.brokers("", 1883) + .async_run(boost::asio::detached); + + auto&& [pec, prc, puback_props] = co_await client.async_publish( + "", message, + async_mqtt5::retain_e::no, async_mqtt5::publish_props {}, + use_nothrow_awaitable + ); + + co_await client.async_disconnect(use_nothrow_awaitable); +} + +int main() { + boost::asio::io_context ioc; + + co_spawn(ioc, [&ioc]() -> boost::asio::awaitable { + // Construct the Client. + async_mqtt5::mqtt_client client(ioc); + + // Construct the timer. + boost::asio::steady_timer timer(ioc, std::chrono::seconds(5)); + + using namespace boost::asio::experimental::awaitable_operators; + auto res = co_await ( + send_over_mqtt(client, "Hello world!") || + timer.async_wait(use_nothrow_awaitable) + ); + + // The timer expired first. The client is cancelled. + if (res.index() == 1) + std::cout << "Send over MQTT timed out!" << std::endl; + // send_over_mqtt completed first. The timer is cancelled. + else + std::cout << "Send over MQTT completed!" << std::endl; + + }, boost::asio::detached); + + ioc.run(); +} + +#endif + +//] diff --git a/example/timeout_with_parallel_group.cpp b/example/timeout_with_parallel_group.cpp new file mode 100644 index 0000000..d5794b0 --- /dev/null +++ b/example/timeout_with_parallel_group.cpp @@ -0,0 +1,63 @@ +//[timeout_with_parallel_group +#include + +#include +#include +#include +#include +#include + +#include + +int main() { + boost::asio::io_context ioc; + + // Construct the Client. + async_mqtt5::mqtt_client client(ioc); + + // Construct the timer. + boost::asio::steady_timer timer(ioc, std::chrono::seconds(5)); + + client.brokers("", 1883) + .async_run(boost::asio::detached); + + // Subscribe to a Topic. + client.async_subscribe( + { "" }, async_mqtt5::subscribe_props {}, + [](async_mqtt5::error_code ec, std::vector rcs, async_mqtt5::suback_props) { + std::cout << "[subscribe ec]: " << ec.message() << std::endl; + std::cout << "[subscribe rc]: " << rcs[0].message() << std::endl; + } + ); + + // Create a parallel group to wait up to 5 seconds to receive a message + // using client.async_receive(...). + boost::asio::experimental::make_parallel_group( + timer.async_wait(boost::asio::deferred), + client.async_receive(boost::asio::deferred) + ).async_wait( + boost::asio::experimental::wait_for_one(), + [&client]( + std::array ord, // Completion order + async_mqtt5::error_code /* timer_ec */, // timer.async_wait(...) handler signature + // client.async_receive(...) handler signature + async_mqtt5::error_code receive_ec, + std::string topic, std::string payload, async_mqtt5::publish_props /* props */ + ) { + if (ord[0] == 1) { + std::cout << "Received a message!" << std::endl; + std::cout << "[receive ec]: " << receive_ec.message() << std::endl; + std::cout << "[receive topic]: " << topic << std::endl; + std::cout << "[receive payload]: " << payload << std::endl; + } + else + std::cout << "Timed out! Did not receive a message within 5 seconds." << std::endl; + + client.cancel(); + } + ); + + ioc.run(); +} + +//]