Syntax consolidation.

This commit is contained in:
Ivica Siladic
2023-12-04 18:42:57 +01:00
parent d00acde499
commit d9489597b3
9 changed files with 98 additions and 62 deletions

View File

@@ -120,11 +120,14 @@ public:
// It's the responsibility of the completion handler to unlock the mutex. // It's the responsibility of the completion handler to unlock the mutex.
template <typename CompletionToken> template <typename CompletionToken>
decltype(auto) lock(CompletionToken&& token) noexcept { decltype(auto) lock(CompletionToken&& token) noexcept {
auto initiation = [this] (auto handler) { using Signature = void (error_code);
this->execute_or_queue(std::move(handler));
auto initiation = [] (auto handler, async_mutex& self) {
self.execute_or_queue(std::move(handler));
}; };
return asio::async_initiate<CompletionToken, void (error_code)>(
std::move(initiation), token return asio::async_initiate<CompletionToken, Signature>(
initiation, token, std::ref(*this)
); );
} }

View File

@@ -4,7 +4,6 @@
#include <boost/asio/any_completion_handler.hpp> #include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/prepend.hpp> #include <boost/asio/prepend.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5/detail/internal_types.hpp> #include <async_mqtt5/detail/internal_types.hpp>
@@ -19,35 +18,50 @@ class write_req {
asio::const_buffer _buffer; asio::const_buffer _buffer;
serial_num_t _serial_num; serial_num_t _serial_num;
unsigned _flags; unsigned _flags;
asio::any_completion_handler<void (error_code)> _handler;
using handler_type = asio::any_completion_handler<void (error_code)>;
handler_type _handler;
public: public:
write_req( write_req(
asio::const_buffer buffer, asio::const_buffer buffer,
serial_num_t serial_num, unsigned flags, serial_num_t serial_num, unsigned flags,
asio::any_completion_handler<void (error_code)> handler handler_type handler
) : _buffer(buffer), _serial_num(serial_num), _flags(flags), ) :
_handler(std::move(handler)) {} _buffer(buffer), _serial_num(serial_num), _flags(flags),
_handler(std::move(handler))
{}
static serial_num_t next_serial_num(serial_num_t last) { static serial_num_t next_serial_num(serial_num_t last) {
return last + 1; return last + 1;
} }
asio::const_buffer buffer() const { return _buffer; } asio::const_buffer buffer() const {
void complete(error_code ec) { std::move(_handler)(ec); } return _buffer;
bool throttled() const { return _flags & send_flag::throttled; } }
bool terminal() const { return _flags & send_flag::terminal; }
void complete(error_code ec) {
std::move(_handler)(ec);
}
bool throttled() const {
return _flags & send_flag::throttled;
}
bool terminal() const {
return _flags & send_flag::terminal;
}
bool operator<(const write_req& other) const { bool operator<(const write_req& other) const {
if (prioritized() != other.prioritized()) { if (prioritized() != other.prioritized())
return prioritized(); return prioritized();
}
auto s1 = _serial_num; auto s1 = _serial_num;
auto s2 = other._serial_num; auto s2 = other._serial_num;
if (s1 < s2) if (s1 < s2)
return (s2 - s1) < (1 << (SERIAL_BITS - 1)); return (s2 - s1) < (1 << (SERIAL_BITS - 1));
return (s1 - s2) >= (1 << (SERIAL_BITS - 1)); return (s1 - s2) >= (1 << (SERIAL_BITS - 1));
} }
@@ -55,8 +69,11 @@ private:
bool prioritized() const { return _flags & send_flag::prioritized; } bool prioritized() const { return _flags & send_flag::prioritized; }
}; };
template <typename ClientService> template <typename ClientService>
class async_sender { class async_sender {
using self_type = async_sender<ClientService>;
using client_service = ClientService; using client_service = ClientService;
using queue_allocator_type = asio::recycling_allocator<write_req>; using queue_allocator_type = asio::recycling_allocator<write_req>;
@@ -95,18 +112,21 @@ public:
serial_num_t serial_num, unsigned flags, serial_num_t serial_num, unsigned flags,
CompletionToken&& token CompletionToken&& token
) { ) {
auto initiation = [this]( using Signature = void (error_code);
auto handler, const BufferType& buffer,
auto initiation = [](
auto handler, self_type& self, const BufferType& buffer,
serial_num_t serial_num, unsigned flags serial_num_t serial_num, unsigned flags
) { ) {
_write_queue.emplace_back( self._write_queue.emplace_back(
asio::buffer(buffer), serial_num, flags, std::move(handler) asio::buffer(buffer), serial_num, flags, std::move(handler)
); );
do_write(); self.do_write();
}; };
return asio::async_initiate<CompletionToken, void (error_code)>( return asio::async_initiate<CompletionToken, Signature>(
std::move(initiation), token, buffer, serial_num, flags initiation, token, std::ref(*this),
buffer, serial_num, flags
); );
} }
@@ -120,8 +140,8 @@ public:
if (_write_in_progress) if (_write_in_progress)
return; return;
// The _write_in_progress flag is set to true to prevent any write // The _write_in_progress flag is set to true to prevent any write
// operations executing before the _write_queue is filled with // operations executing before the _write_queue is filled with
// all the packets that require resending. // all the packets that require resending.
_write_in_progress = true; _write_in_progress = true;
@@ -188,12 +208,14 @@ private:
_write_queue.begin(), _write_queue.end(), _write_queue.begin(), _write_queue.end(),
[](const auto& op) { return op.terminal(); } [](const auto& op) { return op.terminal(); }
); );
if (terminal_req != _write_queue.end()) { if (terminal_req != _write_queue.end()) {
write_queue.push_back(std::move(*terminal_req)); write_queue.push_back(std::move(*terminal_req));
_write_queue.erase(terminal_req); _write_queue.erase(terminal_req);
} }
else if (_limit == MAX_LIMIT) else if (_limit == MAX_LIMIT) {
write_queue = std::move(_write_queue); write_queue = std::move(_write_queue);
}
else { else {
auto throttled_ptr = std::stable_partition( auto throttled_ptr = std::stable_partition(
_write_queue.begin(), _write_queue.end(), _write_queue.begin(), _write_queue.end(),
@@ -214,6 +236,7 @@ private:
std::make_move_iterator(_write_queue.begin()), std::make_move_iterator(_write_queue.begin()),
std::make_move_iterator(throttled_ptr) std::make_move_iterator(throttled_ptr)
); );
_write_queue.erase(_write_queue.begin(), throttled_ptr); _write_queue.erase(_write_queue.begin(), throttled_ptr);
} }
@@ -225,8 +248,7 @@ private:
_svc._replies.clear_fast_replies(); _svc._replies.clear_fast_replies();
_svc._stream.async_write( _svc._stream.async_write(
buffers, buffers, asio::prepend(std::ref(*this), std::move(write_queue))
asio::prepend(std::ref(*this), std::move(write_queue))
); );
} }

View File

@@ -88,6 +88,7 @@ public:
void open() { void open() {
error_code ec; error_code ec;
lowest_layer(*_stream_ptr).open(asio::ip::tcp::v4(), ec); lowest_layer(*_stream_ptr).open(asio::ip::tcp::v4(), ec);
lowest_layer(*_stream_ptr).set_option(asio::ip::tcp::no_delay(true));
} }
void cancel() { void cancel() {

View File

@@ -294,8 +294,7 @@ public:
}; };
return asio::async_initiate<CompletionToken, Signature> ( return asio::async_initiate<CompletionToken, Signature> (
initiation, initiation, token, std::ref(*this),
token, std::ref(*this),
wait_for, std::ref(_read_buff), std::ref(_active_span) wait_for, std::ref(_read_buff), std::ref(_active_span)
); );
} }

View File

@@ -121,7 +121,7 @@ decltype(auto) async_disconnect(
) { ) {
using Signature = void (error_code); using Signature = void (error_code);
auto initiate = []( auto initiation = [](
auto handler, disconnect_context ctx, auto handler, disconnect_context ctx,
const std::shared_ptr<ClientService>& svc_ptr const std::shared_ptr<ClientService>& svc_ptr
) { ) {
@@ -131,7 +131,7 @@ decltype(auto) async_disconnect(
}; };
return asio::async_initiate<CompletionToken, Signature>( return asio::async_initiate<CompletionToken, Signature>(
std::move(initiate), token, initiation, token,
disconnect_context { reason_code, props, terminal }, disconnect_context { reason_code, props, terminal },
svc_ptr svc_ptr
); );

View File

@@ -4,16 +4,13 @@
#include <boost/asio/deferred.hpp> #include <boost/asio/deferred.hpp>
#include <boost/asio/dispatch.hpp> #include <boost/asio/dispatch.hpp>
#include <boost/asio/prepend.hpp> #include <boost/asio/prepend.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/any_completion_handler.hpp> #include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/experimental/parallel_group.hpp> #include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <async_mqtt5/types.hpp> #include <async_mqtt5/types.hpp>
#include <async_mqtt5/detail/async_traits.hpp> #include <async_mqtt5/detail/async_traits.hpp>
#include <async_mqtt5/impl/connect_op.hpp> #include <async_mqtt5/impl/connect_op.hpp>
namespace async_mqtt5::detail { namespace async_mqtt5::detail {

View File

@@ -3,6 +3,7 @@
#include <boost/asio/any_completion_handler.hpp> #include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/consign.hpp> #include <boost/asio/consign.hpp>
#include <boost/asio/append.hpp>
#include <boost/asio/error.hpp> #include <boost/asio/error.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
@@ -14,12 +15,12 @@ namespace async_mqtt5::detail {
namespace asio = boost::asio; namespace asio = boost::asio;
class replies { class replies {
using signature = void (error_code, byte_citer, byte_citer); using Signature = void (error_code, byte_citer, byte_citer);
static constexpr auto max_reply_time = std::chrono::seconds(20); static constexpr auto max_reply_time = std::chrono::seconds(20);
class handler_type : public asio::any_completion_handler<signature> { class handler_type : public asio::any_completion_handler<Signature> {
using base = asio::any_completion_handler<signature>; using base = asio::any_completion_handler<Signature>;
control_code_e _code; control_code_e _code;
uint16_t _packet_id; uint16_t _packet_id;
std::chrono::time_point<std::chrono::system_clock> _ts; std::chrono::time_point<std::chrono::system_clock> _ts;
@@ -81,32 +82,42 @@ public:
} }
auto freply = find_fast_reply(code, packet_id); auto freply = find_fast_reply(code, packet_id);
if (freply == _fast_replies.end()) { if (freply == _fast_replies.end()) {
auto initiate = [this]( auto initiation = [](
auto handler, control_code_e code, uint16_t packet_id auto handler, replies& self,
control_code_e code, uint16_t packet_id
) { ) {
_handlers.emplace_back(code, packet_id, std::move(handler)); self._handlers.emplace_back(
code, packet_id, std::move(handler)
);
}; };
return asio::async_initiate<CompletionToken, signature>( return asio::async_initiate<CompletionToken, Signature>(
std::move(initiate), token, code, packet_id initiation, token, std::ref(*this), code, packet_id
); );
} }
auto fdata = std::move(*freply); auto fdata = std::move(*freply);
_fast_replies.erase(freply); _fast_replies.erase(freply);
byte_citer first = fdata.packet->cbegin(), last = fdata.packet->cend(); auto initiation = [](
auto with_packet = asio::consign( auto handler, std::unique_ptr<std::string> packet
std::forward<CompletionToken>(token), std::move(fdata.packet) ) {
);
auto initiate = [](auto handler, byte_citer first, byte_citer last) {
auto ex = asio::get_associated_executor(handler); auto ex = asio::get_associated_executor(handler);
asio::post(ex, [h = std::move(handler), first, last]() mutable { byte_citer first = packet->cbegin();
std::move(h)(error_code {}, first, last); byte_citer last = packet->cend();
});
asio::post(
ex,
asio::consign(
asio::append(std::move(handler), error_code{}, first, last),
std::move(packet)
)
);
}; };
return asio::async_initiate<decltype(with_packet), signature>(
std::move(initiate), with_packet, first, last return asio::async_initiate<CompletionToken, Signature>(
initiation, token, std::move(fdata.packet)
); );
} }
@@ -115,12 +126,15 @@ public:
byte_citer first, byte_citer last byte_citer first, byte_citer last
) { ) {
auto handler_ptr = find_handler(code, packet_id); auto handler_ptr = find_handler(code, packet_id);
if (handler_ptr == _handlers.end()) { if (handler_ptr == _handlers.end()) {
_fast_replies.push_back({ _fast_replies.push_back({
code, packet_id, std::make_unique<std::string>(first, last) code, packet_id,
std::make_unique<std::string>(first, last)
}); });
return; return;
} }
auto handler = std::move(*handler_ptr); auto handler = std::move(*handler_ptr);
_handlers.erase(handler_ptr); _handlers.erase(handler_ptr);
std::move(handler)(ec, first, last); std::move(handler)(ec, first, last);
@@ -159,7 +173,8 @@ public:
for (auto it = _handlers.begin(); it != _handlers.end();) { for (auto it = _handlers.begin(); it != _handlers.end();) {
if (it->code() == control_code_e::pubrel) { if (it->code() == control_code_e::pubrel) {
std::move(*it)( std::move(*it)(
asio::error::operation_aborted, byte_citer {}, byte_citer {} asio::error::operation_aborted,
byte_citer {}, byte_citer {}
); );
it = _handlers.erase(it); it = _handlers.erase(it);
} }

View File

@@ -20,8 +20,7 @@ class write_op {
handler_type _handler; handler_type _handler;
public: public:
write_op( write_op(Owner& owner, Handler&& handler) :
Owner& owner, Handler&& handler) :
_owner(owner), _owner(owner),
_handler(std::move(handler)) _handler(std::move(handler))
{} {}

View File

@@ -363,7 +363,7 @@ public:
) { ) {
using Signature = detail::on_publish_signature<qos_type>; using Signature = detail::on_publish_signature<qos_type>;
auto initiate = [] ( auto initiation = [] (
auto handler, std::string topic, std::string payload, auto handler, std::string topic, std::string payload,
retain_e retain, const publish_props& props, retain_e retain, const publish_props& props,
const clisvc_ptr& svc_ptr const clisvc_ptr& svc_ptr
@@ -378,7 +378,7 @@ public:
}; };
return asio::async_initiate<CompletionToken, Signature>( return asio::async_initiate<CompletionToken, Signature>(
std::move(initiate), token, initiation, token,
std::move(topic), std::move(payload), retain, props, _svc_ptr std::move(topic), std::move(payload), retain, props, _svc_ptr
); );
} }
@@ -437,7 +437,7 @@ public:
error_code, std::vector<reason_code>, suback_props error_code, std::vector<reason_code>, suback_props
); );
auto initiate = [] ( auto initiation = [] (
auto handler, const std::vector<subscribe_topic>& topics, auto handler, const std::vector<subscribe_topic>& topics,
const subscribe_props& props, const clisvc_ptr& impl const subscribe_props& props, const clisvc_ptr& impl
) { ) {
@@ -446,7 +446,7 @@ public:
}; };
return asio::async_initiate<CompletionToken, Signature>( return asio::async_initiate<CompletionToken, Signature>(
std::move(initiate), token, topics, props, _svc_ptr initiation, token, topics, props, _svc_ptr
); );
} }
@@ -558,7 +558,7 @@ public:
error_code, std::vector<reason_code>, unsuback_props error_code, std::vector<reason_code>, unsuback_props
); );
auto initiate = []( auto initiation = [](
auto handler, auto handler,
const std::vector<std::string>& topics, const std::vector<std::string>& topics,
const unsubscribe_props& props, const clisvc_ptr& impl const unsubscribe_props& props, const clisvc_ptr& impl
@@ -568,7 +568,7 @@ public:
}; };
return asio::async_initiate<CompletionToken, Signature>( return asio::async_initiate<CompletionToken, Signature>(
std::move(initiate), token, topics, props, _svc_ptr initiation, token, topics, props, _svc_ptr
); );
} }