mirror of
https://github.com/boostorg/mqtt5.git
synced 2025-11-07 03:11:38 +01:00
Initiation lambdas replaced with classes with executor_type/get_executor
Summary: related to #13, T13767 `asio::cancel_at` and `asio::cancel_after` (coming with Boost 1.86) use associated executor from the initiation object to construct the underlying timer. Therefore, initiation lambdas are replaced with classes with executor_type/get_executor to allow that functionality (like Asio/Beast). Alternative solutions: 1) asio::bind_executor(get_executor(), initiation) 2) async_compose Reviewers: ivica Reviewed By: ivica Subscribers: iljazovic, miljen Differential Revision: https://repo.mireo.local/D30861
This commit is contained in:
@@ -20,9 +20,11 @@
|
||||
#include <async_mqtt5/impl/async_sender.hpp>
|
||||
#include <async_mqtt5/impl/autoconnect_stream.hpp>
|
||||
#include <async_mqtt5/impl/ping_op.hpp>
|
||||
#include <async_mqtt5/impl/read_message_op.hpp>
|
||||
#include <async_mqtt5/impl/replies.hpp>
|
||||
#include <async_mqtt5/impl/sentry_op.hpp>
|
||||
|
||||
|
||||
namespace async_mqtt5::detail {
|
||||
|
||||
namespace asio = boost::asio;
|
||||
@@ -438,6 +440,44 @@ public:
|
||||
return _async_sender.next_serial_num();
|
||||
}
|
||||
|
||||
bool subscriptions_present() const {
|
||||
return _stream_context.session_state().subscriptions_present();
|
||||
}
|
||||
|
||||
void subscriptions_present(bool present) {
|
||||
_stream_context.session_state().subscriptions_present(present);
|
||||
}
|
||||
|
||||
void update_session_state() {
|
||||
auto& session_state = _stream_context.session_state();
|
||||
|
||||
if (!session_state.session_present()) {
|
||||
_replies.clear_pending_pubrels();
|
||||
session_state.session_present(true);
|
||||
|
||||
if (session_state.subscriptions_present()) {
|
||||
channel_store_error(client::error::session_expired);
|
||||
session_state.subscriptions_present(false);
|
||||
}
|
||||
}
|
||||
|
||||
_cancel_ping.emit(asio::cancellation_type::total);
|
||||
}
|
||||
|
||||
bool channel_store(decoders::publish_message message) {
|
||||
auto& [topic, packet_id, flags, props, payload] = message;
|
||||
return _rec_channel.try_send(
|
||||
error_code {}, std::move(topic),
|
||||
std::move(payload), std::move(props)
|
||||
);
|
||||
}
|
||||
|
||||
bool channel_store_error(error_code ec) {
|
||||
return _rec_channel.try_send(
|
||||
ec, std::string {}, std::string {}, publish_props {}
|
||||
);
|
||||
}
|
||||
|
||||
template <typename BufferType, typename CompletionToken>
|
||||
decltype(auto) async_send(
|
||||
const BufferType& buffer,
|
||||
@@ -477,65 +517,37 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
bool subscriptions_present() const {
|
||||
return _stream_context.session_state().subscriptions_present();
|
||||
}
|
||||
|
||||
void subscriptions_present(bool present) {
|
||||
_stream_context.session_state().subscriptions_present(present);
|
||||
}
|
||||
|
||||
void update_session_state() {
|
||||
auto& session_state = _stream_context.session_state();
|
||||
|
||||
if (!session_state.session_present()) {
|
||||
_replies.clear_pending_pubrels();
|
||||
session_state.session_present(true);
|
||||
|
||||
if (session_state.subscriptions_present()) {
|
||||
channel_store_error(client::error::session_expired);
|
||||
session_state.subscriptions_present(false);
|
||||
}
|
||||
}
|
||||
|
||||
_cancel_ping.emit(asio::cancellation_type::total);
|
||||
}
|
||||
|
||||
bool channel_store(decoders::publish_message message) {
|
||||
auto& [topic, packet_id, flags, props, payload] = message;
|
||||
return _rec_channel.try_send(
|
||||
error_code {}, std::move(topic),
|
||||
std::move(payload), std::move(props)
|
||||
);
|
||||
}
|
||||
|
||||
bool channel_store_error(error_code ec) {
|
||||
return _rec_channel.try_send(
|
||||
ec, std::string {}, std::string {}, publish_props {}
|
||||
);
|
||||
}
|
||||
|
||||
template <typename CompletionToken>
|
||||
decltype(auto) async_channel_receive(CompletionToken&& token) {
|
||||
using Signature =
|
||||
void(error_code, std::string, std::string, publish_props);
|
||||
|
||||
auto initiation = [] (auto handler, self_type& self) {
|
||||
auto ex = asio::get_associated_executor(
|
||||
handler, self.get_executor()
|
||||
);
|
||||
return self._rec_channel.async_receive(
|
||||
asio::bind_executor(ex, std::move(handler))
|
||||
);
|
||||
};
|
||||
|
||||
return asio::async_initiate<CompletionToken, Signature> (
|
||||
initiation, token, std::ref(*this)
|
||||
);
|
||||
return _rec_channel.async_receive(std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
template <typename ClientService>
|
||||
class initiate_async_run {
|
||||
std::shared_ptr<ClientService> _svc_ptr;
|
||||
public:
|
||||
explicit initiate_async_run(std::shared_ptr<ClientService> svc_ptr) :
|
||||
_svc_ptr(std::move(svc_ptr))
|
||||
{}
|
||||
|
||||
using executor_type = typename ClientService::executor_type;
|
||||
executor_type get_executor() const noexcept {
|
||||
return _svc_ptr->get_executor();
|
||||
}
|
||||
|
||||
template <typename Handler>
|
||||
void operator()(Handler&& handler) {
|
||||
auto ex = asio::get_associated_executor(handler, get_executor());
|
||||
|
||||
_svc_ptr->run(std::move(handler));
|
||||
detail::ping_op { _svc_ptr, ex }.perform();
|
||||
detail::read_message_op { _svc_ptr, ex }.perform();
|
||||
detail::sentry_op { _svc_ptr, ex }.perform();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
} // namespace async_mqtt5::detail
|
||||
|
||||
|
||||
Reference in New Issue
Block a user