Files
mqtt5/include/async_mqtt5/impl/unsubscribe_op.hpp
Bruno Iljazovic d78fdd3208 use associated executors for intermediate handlers
Summary: * per-operation cancellation changed: total/partial signals only prevent further resending, terminal signal cancels the whole client

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina

Differential Revision: https://repo.mireo.local/D27246
2024-01-09 16:35:16 +01:00

221 lines
5.8 KiB
C++

#ifndef ASYNC_MQTT5_UNSUBSCRIBE_OP_HPP
#define ASYNC_MQTT5_UNSUBSCRIBE_OP_HPP
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/detached.hpp>
#include <async_mqtt5/error.hpp>
#include <async_mqtt5/detail/cancellable_handler.hpp>
#include <async_mqtt5/detail/control_packet.hpp>
#include <async_mqtt5/detail/internal_types.hpp>
#include <async_mqtt5/detail/topic_validation.hpp>
#include <async_mqtt5/impl/disconnect_op.hpp>
#include <async_mqtt5/impl/codecs/message_decoders.hpp>
#include <async_mqtt5/impl/codecs/message_encoders.hpp>
namespace async_mqtt5::detail {
namespace asio = boost::asio;
template <typename ClientService, typename Handler>
class unsubscribe_op {
using client_service = ClientService;
struct on_unsubscribe {};
struct on_unsuback {};
std::shared_ptr<client_service> _svc_ptr;
using handler_type = cancellable_handler<
Handler,
typename client_service::executor_type
>;
handler_type _handler;
public:
unsubscribe_op(
const std::shared_ptr<client_service>& svc_ptr,
Handler&& handler
) :
_svc_ptr(svc_ptr),
_handler(std::move(handler), _svc_ptr->get_executor())
{
auto slot = asio::get_associated_cancellation_slot(_handler);
if (slot.is_connected())
slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
svc.cancel();
});
}
unsubscribe_op(unsubscribe_op&&) noexcept = default;
unsubscribe_op(const unsubscribe_op&) noexcept = delete;
using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}
using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
}
void perform(
const std::vector<std::string>& topics,
const unsubscribe_props& props
) {
uint16_t packet_id = _svc_ptr->allocate_pid();
if (packet_id == 0)
return complete_post(
client::error::pid_overrun, packet_id, topics.size()
);
auto ec = validate_unsubscribe(topics, props);
if (ec)
return complete_post(ec, packet_id, topics.size());
auto unsubscribe = control_packet<allocator_type>::of(
with_pid, get_allocator(),
encoders::encode_unsubscribe, packet_id,
topics, props
);
auto max_packet_size = static_cast<size_t>(
_svc_ptr->connack_property(prop::maximum_packet_size)
.value_or(default_max_send_size)
);
if (unsubscribe.size() > max_packet_size)
return complete_post(
client::error::packet_too_large, packet_id, topics.size()
);
send_unsubscribe(std::move(unsubscribe));
}
void send_unsubscribe(control_packet<allocator_type> unsubscribe) {
auto wire_data = unsubscribe.wire_data();
_svc_ptr->async_send(
wire_data,
no_serial, send_flag::none,
asio::prepend(
std::move(*this), on_unsubscribe {}, std::move(unsubscribe)
)
);
}
void resend_unsubscribe(control_packet<allocator_type> subscribe) {
if (_handler.cancelled() != asio::cancellation_type_t::none)
return complete(
asio::error::operation_aborted, subscribe.packet_id()
);
send_unsubscribe(std::move(subscribe));
}
void operator()(
on_unsubscribe, control_packet<allocator_type> packet,
error_code ec
) {
if (ec == asio::error::try_again)
return resend_unsubscribe(std::move(packet));
auto packet_id = packet.packet_id();
if (ec)
return complete(ec, packet_id);
_svc_ptr->async_wait_reply(
control_code_e::unsuback, packet_id,
asio::prepend(std::move(*this), on_unsuback{}, std::move(packet))
);
}
void operator()(
on_unsuback, control_packet<allocator_type> packet,
error_code ec, byte_citer first, byte_citer last
) {
if (ec == asio::error::try_again) // "resend unanswered"
return resend_unsubscribe(std::move(packet));
uint16_t packet_id = packet.packet_id();
if (ec)
return complete(ec, packet_id);
auto unsuback = decoders::decode_unsuback(
static_cast<uint32_t>(std::distance(first, last)), first
);
if (!unsuback.has_value()) {
on_malformed_packet("Malformed UNSUBACK: cannot decode");
return resend_unsubscribe(std::move(packet));
}
auto& [props, reason_codes] = *unsuback;
complete(
ec, packet_id,
to_reason_codes(std::move(reason_codes)), std::move(props)
);
}
private:
static error_code validate_unsubscribe(
const std::vector<std::string>& topics,
const unsubscribe_props& props
) {
for (const auto& topic : topics)
if (validate_topic_filter(topic) != validation_result::valid)
return client::error::invalid_topic;
auto user_properties = props[prop::user_property];
for (const auto& user_prop: user_properties)
if (validate_mqtt_utf8(user_prop) != validation_result::valid)
return client::error::malformed_packet;
return error_code {};
}
static std::vector<reason_code> to_reason_codes(std::vector<uint8_t> codes) {
std::vector<reason_code> ret;
for (uint8_t code : codes) {
auto rc = to_reason_code<reason_codes::category::unsuback>(code);
if (rc)
ret.push_back(*rc);
}
return ret;
}
void on_malformed_packet(
const std::string& reason
) {
auto props = disconnect_props {};
props[prop::reason_string] = reason;
async_disconnect(
disconnect_rc_e::malformed_packet, props, false, _svc_ptr,
asio::detached
);
}
void complete_post(error_code ec, uint16_t packet_id, size_t num_topics) {
if (packet_id != 0)
_svc_ptr->free_pid(packet_id);
_handler.complete_post(
ec, std::vector<reason_code>(num_topics, reason_codes::empty),
unsuback_props {}
);
}
void complete(
error_code ec, uint16_t packet_id,
std::vector<reason_code> reason_codes = {}, unsuback_props props = {}
) {
_svc_ptr->free_pid(packet_id);
_handler.complete(ec, std::move(reason_codes), std::move(props));
}
};
} // end namespace async_mqtt5::detail
#endif // !ASYNC_MQTT5_UNSUBSCRIBE_OP_HPP