forked from boostorg/mqtt5
Add a chapter on asio compliance - per operation cancellation
Summary: related to T12804 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D28516
This commit is contained in:
@ -42,6 +42,7 @@
|
||||
[def __Asio__ [@boost:/libs/asio/index.html Boost.Asio]]
|
||||
[def __Beast__ [@boost:/libs/beast/index.html Boost.Beast]]
|
||||
[def __ASIO_PER_OP_CANCELLATION__ [@boost:/doc/html/boost_asio/overview/core/cancellation.html Per-Operation Cancellation]]
|
||||
[def __ASIO_PARALLEL_GROUP__ [@boost:/doc/html/boost_asio/overview/composition/parallel_group.html Co-ordinating Parallel Operations]]
|
||||
[def __IOC__ [@boost:doc/html/boost_asio/reference/io_context.html `boost::asio::io_context`]]
|
||||
[def __POST__ [@boost:doc/html/boost_asio/reference/post.html `boost::asio::post`]]
|
||||
[def __CO_SPAWN__ [@boost:/doc/html/boost_asio/reference/co_spawn.html `boost::asio::co_spawn`]]
|
||||
|
@ -6,9 +6,14 @@ Every asynchronous operation in __Asio__ has associated characteristics that spe
|
||||
* A *cancellation slot* determines how the asynchronous operations support cancellation.
|
||||
* An *executor* determines the queuing and execution strategy for completion handlers.
|
||||
|
||||
This section expands further into the roles of allocators, cancellation slots, and [link async_mqtt5.asio_compliance.executors Executors],
|
||||
highlighting their integration and usage within the __Client__.
|
||||
This section expands further into the roles of allocators,
|
||||
cancellation slots, and executors, highlighting their integration and usage within the __Client__.
|
||||
|
||||
* See [link async_mqtt5.asio_compliance.per_op_cancellation Per-Operation Cancellation] for more information about how
|
||||
asynchronous operations within the __Client__ support cancellation.
|
||||
* See [link async_mqtt5.asio_compliance.executors Executors] for more information about executors.
|
||||
|
||||
[include 08_executors.qbk]
|
||||
[include 09_per_op_cancellation.qbk]
|
||||
|
||||
[endsect] [/asio_compliance]
|
||||
|
89
doc/qbk/09_per_op_cancellation.qbk
Normal file
89
doc/qbk/09_per_op_cancellation.qbk
Normal file
@ -0,0 +1,89 @@
|
||||
[section:per_op_cancellation Per-Operation Cancellation]
|
||||
[nochunk]
|
||||
|
||||
In __Asio__, various objects such as sockets and timers offer the ability to terminate all ongoing asynchronous operations
|
||||
globally through their `close` or `cancel` member functions.
|
||||
Beyond this global cancellation capability, __Asio__ also provides mechanisms for cancelling specific asynchronous
|
||||
operations on an individual basis.
|
||||
This individual per-operation cancellation is enabled by specifying that a completion handler has an associated cancellation slot.
|
||||
If the asynchronous operation supports cancellation, then it will install a cancellation handler into the associated slot.
|
||||
The cancellation handler will be invoked when the user emits a cancellation signal into the cancellation slot.
|
||||
See __ASIO_PER_OP_CANCELLATION__ for more information.
|
||||
|
||||
The utility of per-operation cancellation becomes particularly evident when using `boost::asio::experimental::parallel_group` to launch
|
||||
work performed in parallel and wait for one or all of the operations to complete.
|
||||
For instance, in a parallel group awaiting the completion of any one operation among several,
|
||||
completing one operation will trigger individual cancellation of all the remaining operations.
|
||||
The same concept applies to the `awaitable operator ||`, which runs two awaitables in parallel,
|
||||
waiting for one to complete and subsequently cancelling the remaining one.
|
||||
See __ASIO_PARALLEL_GROUP__ for more information.
|
||||
|
||||
Within the __Client__, every asynchronous function is designed to support individual per-operation cancellation.
|
||||
This allows for associating of a cancellation slot with any `async_xxx` function call, enabling the emission of a cancellation signal as needed.
|
||||
The impact of emitting a cancellation signal varies depending on the signal type (terminal, total, partial) and the operation being cancelled.
|
||||
Detailed descriptions of how cancellation signals affect each `async_xxx` function
|
||||
are provided in the ['Per-Operation Cancellation] paragraph in their respective sections of the __Client__ reference documentation.
|
||||
|
||||
[heading Example: associating a cancellation slot with an asynchronous operation]
|
||||
|
||||
This example illustrates associating a cancellation slot with a [refmem mqtt_client async_publish] operation
|
||||
and emitting a terminal cancellation signal.
|
||||
Executing this sequence effectively results in the immediate cancellation of the entire client operation,
|
||||
mirroring the outcome of invoking [refmem mqtt_client cancel] directly.
|
||||
|
||||
If a total or partial cancellation signal were issued instead of a terminal one, the implications would be less severe.
|
||||
In such cases, the cancellation would specifically target resending the __PUBLISH__ packet,
|
||||
preventing it from being retransmitted should the client reconnect during the ongoing operation.
|
||||
|
||||
```
|
||||
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc);
|
||||
|
||||
client.brokers("<your-mqtt-broker>", 1883)
|
||||
.async_run(boost::asio::detached);
|
||||
|
||||
boost::asio::cancellation_signal signal;
|
||||
|
||||
client.async_publish<async_mqtt5::qos_e::at_least_once>(
|
||||
"<topic>", "Hello world!",
|
||||
async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
|
||||
boost::asio::bind_cancellation_slot(
|
||||
signal.slot(),
|
||||
[&client](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::puback_props props ) {
|
||||
std::cout << ec.message() << std::endl;
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
signal.emit(boost::asio::cancellation_type_t::terminal);
|
||||
```
|
||||
|
||||
[section:parallel_group parallel_group/operator || and asynchronous functions in the mqtt_client]
|
||||
As a result of supporting per-operation cancellation,
|
||||
all the asynchronous functions with the __Client__ can be used in `parallel_group` or with `awaitable operator ||`.
|
||||
This feature is especially beneficial for executing operations that require a timeout mechanism.
|
||||
|
||||
Below are two examples illustrating how to implement a timeout:
|
||||
|
||||
* [link async_mqtt5.timeout_with_parallel_group timeout_with_parallel_group.cpp]
|
||||
* [link async_mqtt5.timeout_with_awaitable_operators timeout_with_awaitable_operators.cpp]
|
||||
|
||||
[endsect] [/parallel_group]
|
||||
|
||||
[section:protocol_level_cancellation About protocol-level cancellation]
|
||||
In the context of __Client__, the handling of cancellation signals varies across different asynchronous operations.
|
||||
Except for [refmem mqtt_client async_receive], all other `async_xxx` operations respond to a terminal cancellation signal by invoking [refmem mqtt_client cancel].
|
||||
These operations will halt the resending of certain packets for total and partial cancellation signals.
|
||||
|
||||
It is worth noting that cancelling an `async_xxx` operation during an ongoing protocol exchange is not implemented because of a design decision
|
||||
to prevent protocol breaches.
|
||||
For example, if [refmem mqtt_client async_publish] with QoS 2 is in the middle of a communication with the Broker
|
||||
[footnote Publishing with QoS 2 involves sending a __PUBLISH__ packet, receiving a __PUBREC__ acknowledgement from the Broker, transmitting a __PUBREL__ packet, and finally receiving a __PUBCOMP__ packet.]
|
||||
and an attempt to cancel it is made, it could lead to a protocol violation.
|
||||
For instance, if the operation is cancelled after a __PUBREC__ packet has been received from the Broker but before sending the __PUBREL__ packet,
|
||||
that would breach the MQTT protocol by failing to send a necessary packet and leave the connection with the Broker in an invalid state.
|
||||
|
||||
Therefore, the design of __Client__'s cancellation strategy carefully avoids these pitfalls to ensure continuous protocol compliance.
|
||||
|
||||
[endsect] [/protocol_level_cancellation]
|
||||
|
||||
[endsect] [/per_op_cancellation]
|
@ -32,6 +32,20 @@ The following list contains all the examples that showcase how to use the __Clie
|
||||
[[link async_mqtt5.multiflight_client multiflight_client.cpp]]
|
||||
[Shows how to use the __Client__ to simultaneously dispatch multiple requests.]
|
||||
]
|
||||
[
|
||||
[[link async_mqtt5.timeout_with_parallel_group timeout_with_parallel_group.cpp]]
|
||||
[
|
||||
Shows how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint
|
||||
using parallel group.
|
||||
]
|
||||
]
|
||||
[
|
||||
[[link async_mqtt5.timeout_with_awaitable_operators timeout_with_awaitable_operators.cpp]]
|
||||
[
|
||||
Shows how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint
|
||||
using awaitable operators.
|
||||
]
|
||||
]
|
||||
]
|
||||
|
||||
[endsect][/examples]
|
||||
|
@ -25,28 +25,46 @@ This example illustrates the process of setting up the Client to connect to the
|
||||
[endsect] [/hello_world_over_websocket_tls]
|
||||
|
||||
[section:publisher The publisher]
|
||||
This example will show how to use __Client__ as a publisher that publishes sensor readings every `5` seconds.
|
||||
The __Client__ will use TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token.
|
||||
This example shows how to use __Client__ as a publisher that publishes sensor readings every `5` seconds.
|
||||
The __Client__ uses TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token.
|
||||
|
||||
[import ../../../example/publisher.cpp]
|
||||
[publisher]
|
||||
[endsect]
|
||||
|
||||
[section:receiver The receiver]
|
||||
This example will show how to use __Client__ as a receiver.
|
||||
This example shows how to use __Client__ as a receiver.
|
||||
The __Client__ subscribes and indefinitely receives Application Messages from the Broker.
|
||||
The __Client__ will use TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token.
|
||||
The __Client__ uses TCP to connect to the Broker and modified __USE_AWAITABLE__ as the completion token.
|
||||
|
||||
[import ../../../example/receiver.cpp]
|
||||
[receiver]
|
||||
[endsect]
|
||||
|
||||
[section:multiflight_client The multiflight Client]
|
||||
This example will show how to use __Client__ to simultaneously dispatch multiple
|
||||
requests.
|
||||
This example shows how to use __Client__ to simultaneously dispatch multiple requests.
|
||||
|
||||
[import ../../../example/multiflight_client.cpp]
|
||||
[multiflight_client]
|
||||
[endsect]
|
||||
|
||||
[section:timeout_with_parallel_group Timed MQTT operations with parallel group]
|
||||
This example demonstrates how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint
|
||||
using parallel group.
|
||||
Specifically, in this example, the __Client__ will subscribe to a Topic and try to receive a message from the Topic within `5 seconds`.
|
||||
|
||||
[import ../../../example/timeout_with_parallel_group.cpp]
|
||||
[timeout_with_parallel_group]
|
||||
[endsect]
|
||||
|
||||
[section:timeout_with_awaitable_operators Timed MQTT operations with awaitable operators]
|
||||
This example demonstrates how to use the __Client__ with its support for per-operation cancellation to perform operations under a time constraint
|
||||
using awaitable operators.
|
||||
Specifically, in this example, a call to [refmem mqtt_client async_publish] and [refmem mqtt_client async_disconnect] must complete
|
||||
within `5 seconds`. Otherwise, they will be cancelled.
|
||||
|
||||
[import ../../../example/timeout_with_awaitable_operators.cpp]
|
||||
[timeout_with_awaitable_operators]
|
||||
[endsect]
|
||||
|
||||
[block'''</part>''']
|
||||
|
66
example/timeout_with_awaitable_operators.cpp
Normal file
66
example/timeout_with_awaitable_operators.cpp
Normal file
@ -0,0 +1,66 @@
|
||||
//[timeout_with_awaitable_operators
|
||||
#include <iostream>
|
||||
|
||||
#include <boost/asio/as_tuple.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/co_spawn.hpp>
|
||||
#include <boost/asio/detached.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/use_awaitable.hpp>
|
||||
#include <boost/asio/experimental/awaitable_operators.hpp>
|
||||
#include <boost/asio/experimental/parallel_group.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <async_mqtt5.hpp>
|
||||
|
||||
#ifdef BOOST_ASIO_HAS_CO_AWAIT
|
||||
|
||||
// Modified completion token that will prevent co_await from throwing exceptions.
|
||||
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable);
|
||||
|
||||
using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>;
|
||||
|
||||
boost::asio::awaitable<void> send_over_mqtt(client_type& client, const std::string& message) {
|
||||
client.brokers("<your-mqtt-broker>", 1883)
|
||||
.async_run(boost::asio::detached);
|
||||
|
||||
auto&& [pec, prc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
|
||||
"<your-mqtt-topic>", message,
|
||||
async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
|
||||
use_nothrow_awaitable
|
||||
);
|
||||
|
||||
co_await client.async_disconnect(use_nothrow_awaitable);
|
||||
}
|
||||
|
||||
int main() {
|
||||
boost::asio::io_context ioc;
|
||||
|
||||
co_spawn(ioc, [&ioc]() -> boost::asio::awaitable<void> {
|
||||
// Construct the Client.
|
||||
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc);
|
||||
|
||||
// Construct the timer.
|
||||
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
|
||||
|
||||
using namespace boost::asio::experimental::awaitable_operators;
|
||||
auto res = co_await (
|
||||
send_over_mqtt(client, "Hello world!") ||
|
||||
timer.async_wait(use_nothrow_awaitable)
|
||||
);
|
||||
|
||||
// The timer expired first. The client is cancelled.
|
||||
if (res.index() == 1)
|
||||
std::cout << "Send over MQTT timed out!" << std::endl;
|
||||
// send_over_mqtt completed first. The timer is cancelled.
|
||||
else
|
||||
std::cout << "Send over MQTT completed!" << std::endl;
|
||||
|
||||
}, boost::asio::detached);
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
//]
|
63
example/timeout_with_parallel_group.cpp
Normal file
63
example/timeout_with_parallel_group.cpp
Normal file
@ -0,0 +1,63 @@
|
||||
//[timeout_with_parallel_group
|
||||
#include <iostream>
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/deferred.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/experimental/parallel_group.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <async_mqtt5.hpp>
|
||||
|
||||
int main() {
|
||||
boost::asio::io_context ioc;
|
||||
|
||||
// Construct the Client.
|
||||
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(ioc);
|
||||
|
||||
// Construct the timer.
|
||||
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
|
||||
|
||||
client.brokers("<your-mqtt-broker>", 1883)
|
||||
.async_run(boost::asio::detached);
|
||||
|
||||
// Subscribe to a Topic.
|
||||
client.async_subscribe(
|
||||
{ "<your-mqtt-topic>" }, async_mqtt5::subscribe_props {},
|
||||
[](async_mqtt5::error_code ec, std::vector<async_mqtt5::reason_code> rcs, async_mqtt5::suback_props) {
|
||||
std::cout << "[subscribe ec]: " << ec.message() << std::endl;
|
||||
std::cout << "[subscribe rc]: " << rcs[0].message() << std::endl;
|
||||
}
|
||||
);
|
||||
|
||||
// Create a parallel group to wait up to 5 seconds to receive a message
|
||||
// using client.async_receive(...).
|
||||
boost::asio::experimental::make_parallel_group(
|
||||
timer.async_wait(boost::asio::deferred),
|
||||
client.async_receive(boost::asio::deferred)
|
||||
).async_wait(
|
||||
boost::asio::experimental::wait_for_one(),
|
||||
[&client](
|
||||
std::array<std::size_t, 2> ord, // Completion order
|
||||
async_mqtt5::error_code /* timer_ec */, // timer.async_wait(...) handler signature
|
||||
// client.async_receive(...) handler signature
|
||||
async_mqtt5::error_code receive_ec,
|
||||
std::string topic, std::string payload, async_mqtt5::publish_props /* props */
|
||||
) {
|
||||
if (ord[0] == 1) {
|
||||
std::cout << "Received a message!" << std::endl;
|
||||
std::cout << "[receive ec]: " << receive_ec.message() << std::endl;
|
||||
std::cout << "[receive topic]: " << topic << std::endl;
|
||||
std::cout << "[receive payload]: " << payload << std::endl;
|
||||
}
|
||||
else
|
||||
std::cout << "Timed out! Did not receive a message within 5 seconds." << std::endl;
|
||||
|
||||
client.cancel();
|
||||
}
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
//]
|
Reference in New Issue
Block a user