mirror of
https://github.com/boostorg/mqtt5.git
synced 2025-07-29 20:17:37 +02:00
Add a chapter on multithreading and thread safety
Summary: related to T12804 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D28563
This commit is contained in:
@ -45,7 +45,11 @@
|
||||
[def __ASIO_ALLOCATORS__ [@boost:/doc/html/boost_asio/overview/model/allocators.html Allocators]]
|
||||
[def __ASIO_CUSTOM_MEMORY_ALLOCATION__ [@boost:/doc/html/boost_asio/overview/core/allocation.html Custom Memory Allocation]]
|
||||
[def __ASIO_PARALLEL_GROUP__ [@boost:/doc/html/boost_asio/overview/composition/parallel_group.html Co-ordinating Parallel Operations]]
|
||||
[def __ASIO_STRANDS__ [@boost:/doc/html/boost_asio/overview/core/strands.html Strands: Use Threads Without Explicit Locking]]
|
||||
[def __IOC__ [@boost:doc/html/boost_asio/reference/io_context.html `boost::asio::io_context`]]
|
||||
[def __IOC_RUN__ [@boost:doc/html/boost_asio/reference/io_context/run.html `boost::asio::io_context::run()`]]
|
||||
[def __STRAND__ [@boost:doc/html/boost_asio/reference/io_context__strand.html `boost::asio::io_context::strand`]]
|
||||
[def __DISPATCH__ [@boost:doc/html/boost_asio/reference/dispatch.html `boost::asio::dispatch`]]
|
||||
[def __POST__ [@boost:doc/html/boost_asio/reference/post.html `boost::asio::post`]]
|
||||
[def __CO_SPAWN__ [@boost:/doc/html/boost_asio/reference/co_spawn.html `boost::asio::co_spawn`]]
|
||||
[def __USE_AWAITABLE__ [@boost:/doc/html/boost_asio/reference/use_awaitable.html `boost::asio::use_awaitable`]]
|
||||
@ -120,6 +124,7 @@
|
||||
[include 05_optimising_communication.qbk]
|
||||
[include 06_disconnecting_the_client.qbk]
|
||||
[include 07_asio_compliance.qbk]
|
||||
[include 11_multithreading.qbk]
|
||||
|
||||
[include 15_examples.qbk]
|
||||
|
||||
|
86
doc/qbk/11_multithreading.qbk
Normal file
86
doc/qbk/11_multithreading.qbk
Normal file
@ -0,0 +1,86 @@
|
||||
[section:multithreading Using the mqtt_client in a multithreaded environment]
|
||||
[nochunk]
|
||||
|
||||
This chapter provides information about thread safety of the __Client__ and other __Asio__-compliant objects
|
||||
and provides examples of how to write thread-safe code in multithreaded environments.
|
||||
|
||||
[section:thread_safety Thread safety of ASIO-compliant objects]
|
||||
|
||||
A common misconception exists regarding the "thread safety" of ASIO-compliant asynchronous objects,
|
||||
specifically around the belief that initialising such an object with a __STRAND__
|
||||
[footnote An executor that provides serialised handler execution.]
|
||||
executor allows its asynchronous functions (`async_xxx`) to be freely called from any executor or thread.
|
||||
That is not correct. Those `async_xxx` functions themselves *must* be called from within the same executor.
|
||||
|
||||
Every `async_xxx` function in every ASIO-compliant object begins by executing some initiation code before
|
||||
typically proceeding to call an intermediate lower-level ASIO `async_xxx` function, with an adapted handler serving as the callback.
|
||||
It is worth noting that the thread safety of this initiation code,
|
||||
which is called directly by the [@boost:doc/html/boost_asio/reference/async_initiate.html `boost::asio::async_initiate`]
|
||||
function and executed by the same executor that called the `async_xxx` function, is not guaranteed
|
||||
and depends on the implementation itself.
|
||||
This uncertainty around thread safety is what the notation "Thread Safety: Shared objects: Unsafe" means,
|
||||
which appears in the documentation for any ASIO-compliant object.
|
||||
|
||||
Consequently, similar to the other ASIO-compliant objects, the __Client__ object *is not thread-safe*.
|
||||
Invoking its member functions concurrently from separate threads will result in a race condition.
|
||||
|
||||
This design choice is intentional and offloads the responsibility of managing concurrency to the user.
|
||||
Given that many applications using __Asio__ often operate in a single-threaded environment for better performance,
|
||||
ASIO-compliant objects do not want to pay the cost of the overhead associated with mutexes and other synchronization mechanisms.
|
||||
Instead, it encourages developers to implement their own concurrency management strategies, tailoring them to the specific needs of their applications.
|
||||
|
||||
[endsect] [/thread_safety]
|
||||
|
||||
[section:executors_threads_strands Executors, threads, and strands]
|
||||
|
||||
Before delving into thread-safe programming, it is essential to understand the distinction between executors and threads.
|
||||
Executors are not threads but mechanisms for scheduling how and when work gets done.
|
||||
An executor can distribute tasks across multiple threads, and a single thread can execute tasks from multiple executors.
|
||||
Thus, when several threads invoke __IOC_RUN__ on the same __IOC__,
|
||||
the underlying executor of that `io_context` has the flexibility to assign tasks to any of those threads.
|
||||
|
||||
A __STRAND__ executor is particularly important in maintaining thread safety and managing multithreading.
|
||||
As outlined earlier, this type of executor guarantees that tasks assigned to it are executed in a serialised manner,
|
||||
preventing concurrent execution.
|
||||
It is important to note that this serialisation does not mean that a single thread handles all tasks within a strand.
|
||||
If the `io_context` associated with a strand operates across multiple threads,
|
||||
these threads can independently undertake tasks within the strand.
|
||||
However, these tasks are executed in a non-concurrent fashion as guaranteed by the strand.
|
||||
Refer to __ASIO_STRANDS__ for more details.
|
||||
|
||||
[endsect] [/executors_threads_strands]
|
||||
|
||||
[section:thread_safe_code Writing thread-safe code]
|
||||
|
||||
As mentioned previously, it is the user's responsibility to ensure that none of the __Client__'s member functions
|
||||
are called concurrently from separate threads.
|
||||
To achieve thread safety in a multithreaded environment,
|
||||
all the __Client__'s member functions must be executed within the same implicit
|
||||
[footnote Only one thread is calling __IOC_RUN__.]
|
||||
or explicit strand.
|
||||
|
||||
Specifically, use __POST__ or __DISPATCH__ to delegate a function call to a strand,
|
||||
or __CO_SPAWN__ to spawn the coroutine into the strand.
|
||||
For asynchronous functions, this will ensure that the initiation code is executed within the strand in a thread-safe manner.
|
||||
The associated executor of all `async_xxx`'s completion handlers must be the same strand.
|
||||
This will guarantee that the entire sequence of operations
|
||||
- from the initiation code through any intermediate operations to the execution of the completion handler -
|
||||
is carried out within the strand, thereby ensuring thread safety.
|
||||
|
||||
[important
|
||||
To conclude, to achieve thread safety,
|
||||
all the member functions of the __Client__ *must* be executed in *the same strand*.
|
||||
This strand *must* be the associated executor of all the completion handlers across
|
||||
all `async_xxx` invocations.
|
||||
]
|
||||
|
||||
The examples below demonstrate how to publish a "Hello World" Application Message
|
||||
in a multithreaded setting using callbacks (`post`/`dispatch`) and coroutines (`co_spawn`):
|
||||
|
||||
* [link async_mqtt5.hello_world_in_multithreaded_env hello_world_in_multithreaded_env.cpp]
|
||||
* [link async_mqtt5.hello_world_in_coro_multithreaded_env hello_world_in_coro_multithreaded_env.cpp]
|
||||
|
||||
|
||||
[endsect] [/thread_safe_code]
|
||||
|
||||
[endsect] [/multithreading]
|
@ -46,6 +46,14 @@ The following list contains all the examples that showcase how to use the __Clie
|
||||
using awaitable operators.
|
||||
]
|
||||
]
|
||||
[
|
||||
[[link async_mqtt5.hello_world_in_multithreaded_env hello_world_in_multithreaded_env.cpp]]
|
||||
[Shows how to publish a "Hello World" message in a multithreaded environment using callbacks (`post`/`dispatch`).]
|
||||
]
|
||||
[
|
||||
[[link async_mqtt5.hello_world_in_coro_multithreaded_env hello_world_in_coro_multithreaded_env.cpp]]
|
||||
[Shows how to publish a "Hello World" message in a multithreaded environment using coroutines (`co_spawn`).]
|
||||
]
|
||||
]
|
||||
|
||||
[endsect][/examples]
|
||||
|
@ -67,4 +67,18 @@ within `5 seconds`. Otherwise, they will be cancelled.
|
||||
[timeout_with_awaitable_operators]
|
||||
[endsect]
|
||||
|
||||
[section:hello_world_in_multithreaded_env Hello World in a multithreaded environment using callbacks]
|
||||
This example demonstrates how to publish a "Hello World" message in a multithreaded environment using callbacks (`post`/`dispatch`).
|
||||
|
||||
[import ../../../example/hello_world_in_multithreaded_env.cpp]
|
||||
[hello_world_in_multithreaded_env]
|
||||
[endsect]
|
||||
|
||||
[section:hello_world_in_coro_multithreaded_env Hello World in a multithreaded environment using coroutines]
|
||||
This example demonstrates how to publish a "Hello World" message in a multithreaded environment using coroutines (`co_spawn`).
|
||||
|
||||
[import ../../../example/hello_world_in_coro_multithreaded_env.cpp]
|
||||
[hello_world_in_coro_multithreaded_env]
|
||||
[endsect]
|
||||
|
||||
[block'''</part>''']
|
||||
|
91
example/hello_world_in_coro_multithreaded_env.cpp
Normal file
91
example/hello_world_in_coro_multithreaded_env.cpp
Normal file
@ -0,0 +1,91 @@
|
||||
//[hello_world_in_coro_multithreaded_env
|
||||
#include <vector>
|
||||
#include <thread>
|
||||
|
||||
#include <boost/asio/use_awaitable.hpp>
|
||||
#ifdef BOOST_ASIO_HAS_CO_AWAIT
|
||||
|
||||
#include <boost/asio/as_tuple.hpp>
|
||||
#include <boost/asio/co_spawn.hpp>
|
||||
#include <boost/asio/detached.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <async_mqtt5.hpp>
|
||||
|
||||
using client_type = async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket>;
|
||||
|
||||
// Modified completion token that will prevent co_await from throwing exceptions.
|
||||
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable);
|
||||
|
||||
boost::asio::awaitable<void> publish_hello_world(
|
||||
client_type& client,
|
||||
const boost::asio::strand<boost::asio::io_context::executor_type>& strand
|
||||
) {
|
||||
// Confirmation that the coroutine running in the strand.
|
||||
assert(strand.running_in_this_thread());
|
||||
|
||||
// All these function calls will be executed by the strand that is executing the coroutine.
|
||||
// All the completion handler's associated executors will be the same strand
|
||||
// because the Client was constructed with it as the default associated executor,
|
||||
// and no executors were associated with the async_run call.
|
||||
|
||||
// Note: you can spawn the coroutine in another strand that is different
|
||||
// from the one used in constructing the Client.
|
||||
// However, then you must bind the second strand to async_run's handler.
|
||||
|
||||
client.async_run(boost::asio::detached);
|
||||
|
||||
auto&& [ec, rc, puback_props] = co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
|
||||
"<your-mqtt-topic>", "Hello world!", async_mqtt5::retain_e::no,
|
||||
async_mqtt5::publish_props {}, use_nothrow_awaitable);
|
||||
|
||||
co_await client.async_disconnect(use_nothrow_awaitable);
|
||||
co_return;
|
||||
}
|
||||
|
||||
int main() {
|
||||
// Create a multithreaded environment where 4 threads
|
||||
// will be calling ioc.run().
|
||||
|
||||
// Number of threads that will call io_context::run().
|
||||
int thread_num = 4;
|
||||
boost::asio::io_context ioc(4);
|
||||
|
||||
// Create the remaining threads (aside of this one).
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(thread_num - 1);
|
||||
|
||||
// Create an explicit strand from io_context's executor.
|
||||
// The strand guarantees a serialised handler execution regardless of the
|
||||
// number of threads running in the io_context.
|
||||
boost::asio::strand strand = boost::asio::make_strand(ioc.get_executor());
|
||||
|
||||
// Create the Client with the explicit strand as the default associated executor.
|
||||
client_type client(strand);
|
||||
|
||||
client.brokers("<your-mqtt-broker>", 1883);
|
||||
|
||||
// Spawn the coroutine.
|
||||
// The executor that executes the coroutine must be the same executor
|
||||
// that is the Client's default associated executor.
|
||||
co_spawn(strand, publish_hello_world(client, strand), boost::asio::detached);
|
||||
|
||||
// Call ioc.run() in the other threads.
|
||||
for (int i = 0; i < thread_num - 1; ++i)
|
||||
threads.emplace_back([&ioc] { ioc.run(); });
|
||||
|
||||
// Call ioc.run() on this thread.
|
||||
ioc.run();
|
||||
|
||||
for (auto& t : threads)
|
||||
if (t.joinable()) t.join();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
//]
|
90
example/hello_world_in_multithreaded_env.cpp
Normal file
90
example/hello_world_in_multithreaded_env.cpp
Normal file
@ -0,0 +1,90 @@
|
||||
//[hello_world_in_multithreaded_env
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <thread>
|
||||
|
||||
#include <boost/asio/detached.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <async_mqtt5.hpp>
|
||||
|
||||
int main() {
|
||||
// Create a multithreaded environment where 4 threads
|
||||
// will be calling ioc.run().
|
||||
|
||||
// Number of threads that will call io_context::run().
|
||||
int thread_num = 4;
|
||||
boost::asio::io_context ioc(4);
|
||||
|
||||
// Create the remaining threads (aside of this one).
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(thread_num - 1);
|
||||
|
||||
// Create an explicit strand from io_context's executor.
|
||||
// The strand guarantees a serialised handler execution regardless of the
|
||||
// number of threads running in the io_context.
|
||||
boost::asio::strand strand = boost::asio::make_strand(ioc.get_executor());
|
||||
|
||||
// Create the Client with the explicit strand as the default associated executor.
|
||||
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client(strand);
|
||||
|
||||
// Configure the client.
|
||||
client.brokers("<your-mqtt-broker>", 1883);
|
||||
|
||||
// Start the Client.
|
||||
// The async_run function call must be posted/dispatched to the strand.
|
||||
boost::asio::post(
|
||||
strand,
|
||||
[&client, &strand] {
|
||||
// Considering that the default associated executor of all completion handlers is the strand,
|
||||
// it is not necessary to explicitly bind it to async_run or other async_xxx's handlers.
|
||||
client.async_run(boost::asio::detached);
|
||||
}
|
||||
);
|
||||
|
||||
// The async_publish function call must be posted/dispatched to the strand.
|
||||
// The associated executor of async_publish's completion handler must be the same strand.
|
||||
boost::asio::post(
|
||||
strand,
|
||||
[&client, &strand] {
|
||||
assert(strand.running_in_this_thread());
|
||||
|
||||
client.async_publish<async_mqtt5::qos_e::at_least_once>(
|
||||
"<your-mqtt-topic>", "Hello world!", async_mqtt5::retain_e::no,
|
||||
async_mqtt5::publish_props {},
|
||||
// You may bind the strand to this handler, but it is not necessary
|
||||
// as the strand is already the default associated handler.
|
||||
// However, you must not bind it to any other executor!
|
||||
[&client, &strand](
|
||||
async_mqtt5::error_code ec, async_mqtt5::reason_code rc,
|
||||
async_mqtt5::puback_props props
|
||||
) {
|
||||
assert(strand.running_in_this_thread());
|
||||
|
||||
std::cout << ec.message() << std::endl;
|
||||
std::cout << rc.message() << std::endl;
|
||||
|
||||
// Stop the Client. This will cause ioc.run() to return.
|
||||
client.cancel();
|
||||
}
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
// Call ioc.run() on the other threads.
|
||||
for (int i = 0; i < thread_num - 1; ++i)
|
||||
threads.emplace_back([&ioc] { ioc.run(); });
|
||||
|
||||
// Call ioc.run() on this thread.
|
||||
ioc.run();
|
||||
|
||||
for (auto& t : threads)
|
||||
if (t.joinable()) t.join();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
//]
|
Reference in New Issue
Block a user