Add support for default completion tokens

Summary: related to #12 T13767

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen

Differential Revision: https://repo.mireo.local/D30725
This commit is contained in:
Korina Šimičević
2024-08-06 07:41:29 +02:00
parent d52090f438
commit 7bc1ccf072
6 changed files with 213 additions and 24 deletions

View File

@@ -56,7 +56,7 @@ public:
return _cancellation_state.slot();
}
asio::cancellation_type_t cancelled() const {
asio::cancellation_type_t cancelled() const noexcept {
return _cancellation_state.cancelled();
}

View File

@@ -0,0 +1,40 @@
//
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASYNC_MQTT5_REBIND_EXECUTOR_HPP
#define ASYNC_MQTT5_REBIND_EXECUTOR_HPP
#include <boost/asio/ssl/stream.hpp>
#include <boost/beast/websocket/stream.hpp>
namespace async_mqtt5::detail {
namespace asio = boost::asio;
template <typename Stream, typename Executor>
struct rebind_executor {
using other = typename Stream::template rebind_executor<Executor>::other;
};
// asio::ssl::stream does not define a rebind_executor member type
template <typename Stream, typename Executor>
struct rebind_executor<asio::ssl::stream<Stream>, Executor> {
using other = typename asio::ssl::stream<typename rebind_executor<Stream, Executor>::other>;
};
template <typename Stream, typename Executor>
struct rebind_executor<boost::beast::websocket::stream<asio::ssl::stream<Stream>>, Executor> {
using other = typename boost::beast::websocket::stream<
asio::ssl::stream<typename rebind_executor<Stream, Executor>::other>,
boost::beast::websocket::stream<asio::ssl::stream<Stream>>::is_deflate_supported::value
>;
};
} // end namespace async_mqtt5::detail
#endif // !ASYNC_MQTT5_REBIND_EXECUTOR_HPP

View File

@@ -11,6 +11,7 @@
#include <boost/asio/append.hpp>
#include <boost/asio/cancellation_state.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/completion_condition.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/asio/read.hpp>
@@ -248,7 +249,7 @@ public:
auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
asio::async_read(
_stream, buff,
_stream, buff, asio::transfer_all(),
asio::prepend(std::move(*this), on_fixed_header {})
);
}
@@ -287,7 +288,7 @@ public:
auto last = first + *varlen;
asio::async_read(
_stream, buff,
_stream, buff, asio::transfer_all(),
asio::prepend(
asio::append(std::move(*this), code, first, last),
on_read_packet {}
@@ -407,7 +408,7 @@ public:
auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
asio::async_read(
_stream, buff,
_stream, buff, asio::transfer_all(),
asio::prepend(std::move(*this), on_fixed_header {})
);
}

View File

@@ -8,11 +8,14 @@
#ifndef ASYNC_MQTT5_MQTT_CLIENT_HPP
#define ASYNC_MQTT5_MQTT_CLIENT_HPP
#include <boost/asio/async_result.hpp>
#include <boost/system/error_code.hpp>
#include <async_mqtt5/error.hpp>
#include <async_mqtt5/types.hpp>
#include <async_mqtt5/detail/rebind_executor.hpp>
#include <async_mqtt5/impl/client_service.hpp>
#include <async_mqtt5/impl/publish_send_op.hpp>
#include <async_mqtt5/impl/read_message_op.hpp>
@@ -46,6 +49,17 @@ class mqtt_client {
public:
/// The executor type associated with the client.
using executor_type = typename StreamType::executor_type;
/// Rebinds the client type to another executor.
template <typename Executor>
struct rebind_executor {
/// The client type when rebound to the specified executor.
using other = mqtt_client<
typename detail::rebind_executor<StreamType, Executor>::other,
TlsContext
>;
};
private:
using stream_type = StreamType;
using tls_context_type = TlsContext;
@@ -182,9 +196,12 @@ public:
* This asynchronous operation supports cancellation for the following \__CANCELLATION_TYPE\__ values:\n
* - `cancellation_type::terminal` - invokes \ref mqtt_client::cancel \n
*/
template <typename CompletionToken>
decltype(auto) async_run(CompletionToken&& token) {
using Signature = void(error_code);
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_run(CompletionToken&& token = {}) {
using Signature = void (error_code);
auto initiation = [] (auto handler, const impl_type& impl) {
auto ex = asio::get_associated_executor(handler, impl->get_executor());
@@ -484,11 +501,14 @@ public:
* - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__PUBLISH\__ packet \n
*
*/
template <qos_e qos_type, typename CompletionToken>
template <qos_e qos_type,
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_publish(
std::string topic, std::string payload,
retain_e retain, const publish_props& props,
CompletionToken&& token
CompletionToken&& token = {}
) {
using Signature = detail::on_publish_signature<qos_type>;
@@ -567,11 +587,14 @@ public:
* - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__SUBSCRIBE\__ packet \n
*
*/
template <typename CompletionToken>
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_subscribe(
const std::vector<subscribe_topic>& topics,
const subscribe_props& props,
CompletionToken&& token
CompletionToken&& token = {}
) {
using Signature = void (
error_code, std::vector<reason_code>, suback_props
@@ -592,7 +615,7 @@ public:
/**
* \brief Send a \__SUBSCRIBE\__ packet to Broker to create a subscription
* to one Topics of interest.
* to one Topic of interest.
*
* \details After the subscription has been established, the Broker will send
* \__PUBLISH\__ packets to the Client to forward Application Messages that were published
@@ -645,10 +668,13 @@ public:
* - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__SUBSCRIBE\__ packet \n
*
*/
template <typename CompletionToken>
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_subscribe(
const subscribe_topic& topic, const subscribe_props& props,
CompletionToken&& token
CompletionToken&& token = {}
) {
return async_subscribe(
std::vector<subscribe_topic> { topic }, props,
@@ -708,10 +734,13 @@ public:
* - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__UNSUBSCRIBE\__ packet \n
*
*/
template <typename CompletionToken>
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_unsubscribe(
const std::vector<std::string>& topics, const unsubscribe_props& props,
CompletionToken&& token
CompletionToken&& token = {}
) {
using Signature = void (
error_code, std::vector<reason_code>, unsuback_props
@@ -782,10 +811,13 @@ public:
* - `cancellation_type::partial` & `cancellation_type::total` - prevents potential resending of the \__UNSUBSCRIBE\__ packet \n
*
*/
template <typename CompletionToken>
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_unsubscribe(
const std::string& topic, const unsubscribe_props& props,
CompletionToken&& token
CompletionToken&& token = {}
) {
return async_unsubscribe(
std::vector<std::string> { topic }, props,
@@ -842,8 +874,11 @@ public:
* - `cancellation_type::partial` \n
* - `cancellation_type::total` \n
*/
template <typename CompletionToken>
decltype(auto) async_receive(CompletionToken&& token) {
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_receive(CompletionToken&& token = {}) {
// Sig = void (error_code, std::string, std::string, publish_props)
return _impl->async_channel_receive(
std::forward<CompletionToken>(token)
@@ -900,10 +935,13 @@ public:
* - `cancellation_type::terminal` - invokes \ref mqtt_client::cancel \n
*
*/
template <typename CompletionToken>
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_disconnect(
disconnect_rc_e reason_code, const disconnect_props& props,
CompletionToken&& token
CompletionToken&& token = {}
) {
auto impl = _impl;
_impl = impl->dup();
@@ -960,8 +998,11 @@ public:
* This asynchronous operation supports cancellation for the following \__CANCELLATION_TYPE\__ values:\n
* - `cancellation_type::terminal` - invokes \ref mqtt_client::cancel \n
*/
template <typename CompletionToken>
decltype(auto) async_disconnect(CompletionToken&& token) {
template <
typename CompletionToken =
typename asio::default_completion_token<executor_type>::type
>
decltype(auto) async_disconnect(CompletionToken&& token = {}) {
return async_disconnect(
disconnect_rc_e::normal_disconnection,
disconnect_props {}, std::forward<CompletionToken>(token)