diff --git a/doc/qbk/00_main.qbk b/doc/qbk/00_main.qbk index 6d76611..7265b7c 100644 --- a/doc/qbk/00_main.qbk +++ b/doc/qbk/00_main.qbk @@ -45,7 +45,11 @@ [def __ASIO_ALLOCATORS__ [@boost:/doc/html/boost_asio/overview/model/allocators.html Allocators]] [def __ASIO_CUSTOM_MEMORY_ALLOCATION__ [@boost:/doc/html/boost_asio/overview/core/allocation.html Custom Memory Allocation]] [def __ASIO_PARALLEL_GROUP__ [@boost:/doc/html/boost_asio/overview/composition/parallel_group.html Co-ordinating Parallel Operations]] +[def __ASIO_STRANDS__ [@boost:/doc/html/boost_asio/overview/core/strands.html Strands: Use Threads Without Explicit Locking]] [def __IOC__ [@boost:doc/html/boost_asio/reference/io_context.html `boost::asio::io_context`]] +[def __IOC_RUN__ [@boost:doc/html/boost_asio/reference/io_context/run.html `boost::asio::io_context::run()`]] +[def __STRAND__ [@boost:doc/html/boost_asio/reference/io_context__strand.html `boost::asio::io_context::strand`]] +[def __DISPATCH__ [@boost:doc/html/boost_asio/reference/dispatch.html `boost::asio::dispatch`]] [def __POST__ [@boost:doc/html/boost_asio/reference/post.html `boost::asio::post`]] [def __CO_SPAWN__ [@boost:/doc/html/boost_asio/reference/co_spawn.html `boost::asio::co_spawn`]] [def __USE_AWAITABLE__ [@boost:/doc/html/boost_asio/reference/use_awaitable.html `boost::asio::use_awaitable`]] @@ -120,6 +124,7 @@ [include 05_optimising_communication.qbk] [include 06_disconnecting_the_client.qbk] [include 07_asio_compliance.qbk] +[include 11_multithreading.qbk] [include 15_examples.qbk] diff --git a/doc/qbk/11_multithreading.qbk b/doc/qbk/11_multithreading.qbk new file mode 100644 index 0000000..24a5234 --- /dev/null +++ b/doc/qbk/11_multithreading.qbk @@ -0,0 +1,86 @@ +[section:multithreading Using the mqtt_client in a multithreaded environment] +[nochunk] + +This chapter provides information about thread safety of the __Client__ and other __Asio__-compliant objects +and provides examples of how to write thread-safe code in multithreaded environments. + +[section:thread_safety Thread safety of ASIO-compliant objects] + +A common misconception exists regarding the "thread safety" of ASIO-compliant asynchronous objects, +specifically around the belief that initialising such an object with a __STRAND__ +[footnote An executor that provides serialised handler execution.] +executor allows its asynchronous functions (`async_xxx`) to be freely called from any executor or thread. +That is not correct. Those `async_xxx` functions themselves *must* be called from within the same executor. + +Every `async_xxx` function in every ASIO-compliant object begins by executing some initiation code before +typically proceeding to call an intermediate lower-level ASIO `async_xxx` function, with an adapted handler serving as the callback. +It is worth noting that the thread safety of this initiation code, +which is called directly by the [@boost:doc/html/boost_asio/reference/async_initiate.html `boost::asio::async_initiate`] +function and executed by the same executor that called the `async_xxx` function, is not guaranteed +and depends on the implementation itself. +This uncertainty around thread safety is what the notation "Thread Safety: Shared objects: Unsafe" means, +which appears in the documentation for any ASIO-compliant object. + +Consequently, similar to the other ASIO-compliant objects, the __Client__ object *is not thread-safe*. +Invoking its member functions concurrently from separate threads will result in a race condition. + +This design choice is intentional and offloads the responsibility of managing concurrency to the user. +Given that many applications using __Asio__ often operate in a single-threaded environment for better performance, +ASIO-compliant objects do not want to pay the cost of the overhead associated with mutexes and other synchronization mechanisms. +Instead, it encourages developers to implement their own concurrency management strategies, tailoring them to the specific needs of their applications. + +[endsect] [/thread_safety] + +[section:executors_threads_strands Executors, threads, and strands] + +Before delving into thread-safe programming, it is essential to understand the distinction between executors and threads. +Executors are not threads but mechanisms for scheduling how and when work gets done. +An executor can distribute tasks across multiple threads, and a single thread can execute tasks from multiple executors. +Thus, when several threads invoke __IOC_RUN__ on the same __IOC__, +the underlying executor of that `io_context` has the flexibility to assign tasks to any of those threads. + +A __STRAND__ executor is particularly important in maintaining thread safety and managing multithreading. +As outlined earlier, this type of executor guarantees that tasks assigned to it are executed in a serialised manner, +preventing concurrent execution. +It is important to note that this serialisation does not mean that a single thread handles all tasks within a strand. +If the `io_context` associated with a strand operates across multiple threads, +these threads can independently undertake tasks within the strand. +However, these tasks are executed in a non-concurrent fashion as guaranteed by the strand. +Refer to __ASIO_STRANDS__ for more details. + +[endsect] [/executors_threads_strands] + +[section:thread_safe_code Writing thread-safe code] + +As mentioned previously, it is the user's responsibility to ensure that none of the __Client__'s member functions +are called concurrently from separate threads. +To achieve thread safety in a multithreaded environment, +all the __Client__'s member functions must be executed within the same implicit +[footnote Only one thread is calling __IOC_RUN__.] +or explicit strand. + +Specifically, use __POST__ or __DISPATCH__ to delegate a function call to a strand, +or __CO_SPAWN__ to spawn the coroutine into the strand. +For asynchronous functions, this will ensure that the initiation code is executed within the strand in a thread-safe manner. +The associated executor of all `async_xxx`'s completion handlers must be the same strand. +This will guarantee that the entire sequence of operations +- from the initiation code through any intermediate operations to the execution of the completion handler - +is carried out within the strand, thereby ensuring thread safety. + +[important + To conclude, to achieve thread safety, + all the member functions of the __Client__ *must* be executed in *the same strand*. + This strand *must* be the associated executor of all the completion handlers across + all `async_xxx` invocations. +] + +The examples below demonstrate how to publish a "Hello World" Application Message +in a multithreaded setting using callbacks (`post`/`dispatch`) and coroutines (`co_spawn`): + +* [link async_mqtt5.hello_world_in_multithreaded_env hello_world_in_multithreaded_env.cpp] +* [link async_mqtt5.hello_world_in_coro_multithreaded_env hello_world_in_coro_multithreaded_env.cpp] + + +[endsect] [/thread_safe_code] + +[endsect] [/multithreading] diff --git a/doc/qbk/15_examples.qbk b/doc/qbk/15_examples.qbk index 213f694..d6d7a93 100644 --- a/doc/qbk/15_examples.qbk +++ b/doc/qbk/15_examples.qbk @@ -46,6 +46,14 @@ The following list contains all the examples that showcase how to use the __Clie using awaitable operators. ] ] + [ + [[link async_mqtt5.hello_world_in_multithreaded_env hello_world_in_multithreaded_env.cpp]] + [Shows how to publish a "Hello World" message in a multithreaded environment using callbacks (`post`/`dispatch`).] + ] + [ + [[link async_mqtt5.hello_world_in_coro_multithreaded_env hello_world_in_coro_multithreaded_env.cpp]] + [Shows how to publish a "Hello World" message in a multithreaded environment using coroutines (`co_spawn`).] + ] ] [endsect][/examples] diff --git a/doc/qbk/examples/Examples.qbk b/doc/qbk/examples/Examples.qbk index 1108333..ab37c85 100644 --- a/doc/qbk/examples/Examples.qbk +++ b/doc/qbk/examples/Examples.qbk @@ -67,4 +67,18 @@ within `5 seconds`. Otherwise, they will be cancelled. [timeout_with_awaitable_operators] [endsect] +[section:hello_world_in_multithreaded_env Hello World in a multithreaded environment using callbacks] +This example demonstrates how to publish a "Hello World" message in a multithreaded environment using callbacks (`post`/`dispatch`). + +[import ../../../example/hello_world_in_multithreaded_env.cpp] +[hello_world_in_multithreaded_env] +[endsect] + +[section:hello_world_in_coro_multithreaded_env Hello World in a multithreaded environment using coroutines] +This example demonstrates how to publish a "Hello World" message in a multithreaded environment using coroutines (`co_spawn`). + +[import ../../../example/hello_world_in_coro_multithreaded_env.cpp] +[hello_world_in_coro_multithreaded_env] +[endsect] + [block''''''] diff --git a/example/hello_world_in_coro_multithreaded_env.cpp b/example/hello_world_in_coro_multithreaded_env.cpp new file mode 100644 index 0000000..b17d501 --- /dev/null +++ b/example/hello_world_in_coro_multithreaded_env.cpp @@ -0,0 +1,91 @@ +//[hello_world_in_coro_multithreaded_env +#include +#include + +#include +#ifdef BOOST_ASIO_HAS_CO_AWAIT + +#include +#include +#include +#include +#include + +#include + +#include + +using client_type = async_mqtt5::mqtt_client; + +// Modified completion token that will prevent co_await from throwing exceptions. +constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); + +boost::asio::awaitable publish_hello_world( + client_type& client, + const boost::asio::strand& strand +) { + // Confirmation that the coroutine running in the strand. + assert(strand.running_in_this_thread()); + + // All these function calls will be executed by the strand that is executing the coroutine. + // All the completion handler's associated executors will be the same strand + // because the Client was constructed with it as the default associated executor, + // and no executors were associated with the async_run call. + + // Note: you can spawn the coroutine in another strand that is different + // from the one used in constructing the Client. + // However, then you must bind the second strand to async_run's handler. + + client.async_run(boost::asio::detached); + + auto&& [ec, rc, puback_props] = co_await client.async_publish( + "", "Hello world!", async_mqtt5::retain_e::no, + async_mqtt5::publish_props {}, use_nothrow_awaitable); + + co_await client.async_disconnect(use_nothrow_awaitable); + co_return; +} + +int main() { + // Create a multithreaded environment where 4 threads + // will be calling ioc.run(). + + // Number of threads that will call io_context::run(). + int thread_num = 4; + boost::asio::io_context ioc(4); + + // Create the remaining threads (aside of this one). + std::vector threads; + threads.reserve(thread_num - 1); + + // Create an explicit strand from io_context's executor. + // The strand guarantees a serialised handler execution regardless of the + // number of threads running in the io_context. + boost::asio::strand strand = boost::asio::make_strand(ioc.get_executor()); + + // Create the Client with the explicit strand as the default associated executor. + client_type client(strand); + + client.brokers("", 1883); + + // Spawn the coroutine. + // The executor that executes the coroutine must be the same executor + // that is the Client's default associated executor. + co_spawn(strand, publish_hello_world(client, strand), boost::asio::detached); + + // Call ioc.run() in the other threads. + for (int i = 0; i < thread_num - 1; ++i) + threads.emplace_back([&ioc] { ioc.run(); }); + + // Call ioc.run() on this thread. + ioc.run(); + + for (auto& t : threads) + if (t.joinable()) t.join(); + + return 0; +} + +#endif + +//] diff --git a/example/hello_world_in_multithreaded_env.cpp b/example/hello_world_in_multithreaded_env.cpp new file mode 100644 index 0000000..910ad2f --- /dev/null +++ b/example/hello_world_in_multithreaded_env.cpp @@ -0,0 +1,90 @@ +//[hello_world_in_multithreaded_env +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +int main() { + // Create a multithreaded environment where 4 threads + // will be calling ioc.run(). + + // Number of threads that will call io_context::run(). + int thread_num = 4; + boost::asio::io_context ioc(4); + + // Create the remaining threads (aside of this one). + std::vector threads; + threads.reserve(thread_num - 1); + + // Create an explicit strand from io_context's executor. + // The strand guarantees a serialised handler execution regardless of the + // number of threads running in the io_context. + boost::asio::strand strand = boost::asio::make_strand(ioc.get_executor()); + + // Create the Client with the explicit strand as the default associated executor. + async_mqtt5::mqtt_client client(strand); + + // Configure the client. + client.brokers("", 1883); + + // Start the Client. + // The async_run function call must be posted/dispatched to the strand. + boost::asio::post( + strand, + [&client, &strand] { + // Considering that the default associated executor of all completion handlers is the strand, + // it is not necessary to explicitly bind it to async_run or other async_xxx's handlers. + client.async_run(boost::asio::detached); + } + ); + + // The async_publish function call must be posted/dispatched to the strand. + // The associated executor of async_publish's completion handler must be the same strand. + boost::asio::post( + strand, + [&client, &strand] { + assert(strand.running_in_this_thread()); + + client.async_publish( + "", "Hello world!", async_mqtt5::retain_e::no, + async_mqtt5::publish_props {}, + // You may bind the strand to this handler, but it is not necessary + // as the strand is already the default associated handler. + // However, you must not bind it to any other executor! + [&client, &strand]( + async_mqtt5::error_code ec, async_mqtt5::reason_code rc, + async_mqtt5::puback_props props + ) { + assert(strand.running_in_this_thread()); + + std::cout << ec.message() << std::endl; + std::cout << rc.message() << std::endl; + + // Stop the Client. This will cause ioc.run() to return. + client.cancel(); + } + ); + } + ); + + // Call ioc.run() on the other threads. + for (int i = 0; i < thread_num - 1; ++i) + threads.emplace_back([&ioc] { ioc.run(); }); + + // Call ioc.run() on this thread. + ioc.run(); + + for (auto& t : threads) + if (t.joinable()) t.join(); + + return 0; +} + +//]