From d9489597b3f6051a86067ef456f37ae55cad5bdb Mon Sep 17 00:00:00 2001 From: Ivica Siladic Date: Mon, 4 Dec 2023 18:42:57 +0100 Subject: [PATCH] Syntax consolidation. --- include/async_mqtt5/detail/async_mutex.hpp | 11 ++-- include/async_mqtt5/impl/async_sender.hpp | 66 ++++++++++++------- .../async_mqtt5/impl/autoconnect_stream.hpp | 1 + include/async_mqtt5/impl/client_service.hpp | 3 +- include/async_mqtt5/impl/disconnect_op.hpp | 4 +- include/async_mqtt5/impl/reconnect_op.hpp | 5 +- include/async_mqtt5/impl/replies.hpp | 55 ++++++++++------ include/async_mqtt5/impl/write_op.hpp | 3 +- include/async_mqtt5/mqtt_client.hpp | 12 ++-- 9 files changed, 98 insertions(+), 62 deletions(-) diff --git a/include/async_mqtt5/detail/async_mutex.hpp b/include/async_mqtt5/detail/async_mutex.hpp index b9e6515..bdf0c5e 100644 --- a/include/async_mqtt5/detail/async_mutex.hpp +++ b/include/async_mqtt5/detail/async_mutex.hpp @@ -120,11 +120,14 @@ public: // It's the responsibility of the completion handler to unlock the mutex. template decltype(auto) lock(CompletionToken&& token) noexcept { - auto initiation = [this] (auto handler) { - this->execute_or_queue(std::move(handler)); + using Signature = void (error_code); + + auto initiation = [] (auto handler, async_mutex& self) { + self.execute_or_queue(std::move(handler)); }; - return asio::async_initiate( - std::move(initiation), token + + return asio::async_initiate( + initiation, token, std::ref(*this) ); } diff --git a/include/async_mqtt5/impl/async_sender.hpp b/include/async_mqtt5/impl/async_sender.hpp index c1d0928..dbc1c9d 100644 --- a/include/async_mqtt5/impl/async_sender.hpp +++ b/include/async_mqtt5/impl/async_sender.hpp @@ -4,7 +4,6 @@ #include #include #include - #include #include @@ -19,35 +18,50 @@ class write_req { asio::const_buffer _buffer; serial_num_t _serial_num; unsigned _flags; - asio::any_completion_handler _handler; + + using handler_type = asio::any_completion_handler; + handler_type _handler; public: write_req( asio::const_buffer buffer, serial_num_t serial_num, unsigned flags, - asio::any_completion_handler handler - ) : _buffer(buffer), _serial_num(serial_num), _flags(flags), - _handler(std::move(handler)) {} + handler_type handler + ) : + _buffer(buffer), _serial_num(serial_num), _flags(flags), + _handler(std::move(handler)) + {} static serial_num_t next_serial_num(serial_num_t last) { return last + 1; } - asio::const_buffer buffer() const { return _buffer; } - void complete(error_code ec) { std::move(_handler)(ec); } - bool throttled() const { return _flags & send_flag::throttled; } - bool terminal() const { return _flags & send_flag::terminal; } + asio::const_buffer buffer() const { + return _buffer; + } + + void complete(error_code ec) { + std::move(_handler)(ec); + } + + bool throttled() const { + return _flags & send_flag::throttled; + } + + bool terminal() const { + return _flags & send_flag::terminal; + } bool operator<(const write_req& other) const { - if (prioritized() != other.prioritized()) { + if (prioritized() != other.prioritized()) return prioritized(); - } auto s1 = _serial_num; auto s2 = other._serial_num; if (s1 < s2) return (s2 - s1) < (1 << (SERIAL_BITS - 1)); + return (s1 - s2) >= (1 << (SERIAL_BITS - 1)); } @@ -55,8 +69,11 @@ private: bool prioritized() const { return _flags & send_flag::prioritized; } }; + template class async_sender { + using self_type = async_sender; + using client_service = ClientService; using queue_allocator_type = asio::recycling_allocator; @@ -95,18 +112,21 @@ public: serial_num_t serial_num, unsigned flags, CompletionToken&& token ) { - auto initiation = [this]( - auto handler, const BufferType& buffer, + using Signature = void (error_code); + + auto initiation = []( + auto handler, self_type& self, const BufferType& buffer, serial_num_t serial_num, unsigned flags ) { - _write_queue.emplace_back( + self._write_queue.emplace_back( asio::buffer(buffer), serial_num, flags, std::move(handler) ); - do_write(); + self.do_write(); }; - return asio::async_initiate( - std::move(initiation), token, buffer, serial_num, flags + return asio::async_initiate( + initiation, token, std::ref(*this), + buffer, serial_num, flags ); } @@ -120,8 +140,8 @@ public: if (_write_in_progress) return; - // The _write_in_progress flag is set to true to prevent any write - // operations executing before the _write_queue is filled with + // The _write_in_progress flag is set to true to prevent any write + // operations executing before the _write_queue is filled with // all the packets that require resending. _write_in_progress = true; @@ -188,12 +208,14 @@ private: _write_queue.begin(), _write_queue.end(), [](const auto& op) { return op.terminal(); } ); + if (terminal_req != _write_queue.end()) { write_queue.push_back(std::move(*terminal_req)); _write_queue.erase(terminal_req); } - else if (_limit == MAX_LIMIT) + else if (_limit == MAX_LIMIT) { write_queue = std::move(_write_queue); + } else { auto throttled_ptr = std::stable_partition( _write_queue.begin(), _write_queue.end(), @@ -214,6 +236,7 @@ private: std::make_move_iterator(_write_queue.begin()), std::make_move_iterator(throttled_ptr) ); + _write_queue.erase(_write_queue.begin(), throttled_ptr); } @@ -225,8 +248,7 @@ private: _svc._replies.clear_fast_replies(); _svc._stream.async_write( - buffers, - asio::prepend(std::ref(*this), std::move(write_queue)) + buffers, asio::prepend(std::ref(*this), std::move(write_queue)) ); } diff --git a/include/async_mqtt5/impl/autoconnect_stream.hpp b/include/async_mqtt5/impl/autoconnect_stream.hpp index 8b5136c..6a01e92 100644 --- a/include/async_mqtt5/impl/autoconnect_stream.hpp +++ b/include/async_mqtt5/impl/autoconnect_stream.hpp @@ -88,6 +88,7 @@ public: void open() { error_code ec; lowest_layer(*_stream_ptr).open(asio::ip::tcp::v4(), ec); + lowest_layer(*_stream_ptr).set_option(asio::ip::tcp::no_delay(true)); } void cancel() { diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index e252592..4872e6e 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -294,8 +294,7 @@ public: }; return asio::async_initiate ( - initiation, - token, std::ref(*this), + initiation, token, std::ref(*this), wait_for, std::ref(_read_buff), std::ref(_active_span) ); } diff --git a/include/async_mqtt5/impl/disconnect_op.hpp b/include/async_mqtt5/impl/disconnect_op.hpp index c9a11fe..3dcf5fe 100644 --- a/include/async_mqtt5/impl/disconnect_op.hpp +++ b/include/async_mqtt5/impl/disconnect_op.hpp @@ -121,7 +121,7 @@ decltype(auto) async_disconnect( ) { using Signature = void (error_code); - auto initiate = []( + auto initiation = []( auto handler, disconnect_context ctx, const std::shared_ptr& svc_ptr ) { @@ -131,7 +131,7 @@ decltype(auto) async_disconnect( }; return asio::async_initiate( - std::move(initiate), token, + initiation, token, disconnect_context { reason_code, props, terminal }, svc_ptr ); diff --git a/include/async_mqtt5/impl/reconnect_op.hpp b/include/async_mqtt5/impl/reconnect_op.hpp index 0438d2d..f105cea 100644 --- a/include/async_mqtt5/impl/reconnect_op.hpp +++ b/include/async_mqtt5/impl/reconnect_op.hpp @@ -4,16 +4,13 @@ #include #include #include +#include #include #include -#include - #include - #include - #include namespace async_mqtt5::detail { diff --git a/include/async_mqtt5/impl/replies.hpp b/include/async_mqtt5/impl/replies.hpp index 29a740d..7e44cec 100644 --- a/include/async_mqtt5/impl/replies.hpp +++ b/include/async_mqtt5/impl/replies.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -14,12 +15,12 @@ namespace async_mqtt5::detail { namespace asio = boost::asio; class replies { - using signature = void (error_code, byte_citer, byte_citer); + using Signature = void (error_code, byte_citer, byte_citer); static constexpr auto max_reply_time = std::chrono::seconds(20); - class handler_type : public asio::any_completion_handler { - using base = asio::any_completion_handler; + class handler_type : public asio::any_completion_handler { + using base = asio::any_completion_handler; control_code_e _code; uint16_t _packet_id; std::chrono::time_point _ts; @@ -81,32 +82,42 @@ public: } auto freply = find_fast_reply(code, packet_id); + if (freply == _fast_replies.end()) { - auto initiate = [this]( - auto handler, control_code_e code, uint16_t packet_id + auto initiation = []( + auto handler, replies& self, + control_code_e code, uint16_t packet_id ) { - _handlers.emplace_back(code, packet_id, std::move(handler)); + self._handlers.emplace_back( + code, packet_id, std::move(handler) + ); }; - return asio::async_initiate( - std::move(initiate), token, code, packet_id + return asio::async_initiate( + initiation, token, std::ref(*this), code, packet_id ); } auto fdata = std::move(*freply); _fast_replies.erase(freply); - byte_citer first = fdata.packet->cbegin(), last = fdata.packet->cend(); - auto with_packet = asio::consign( - std::forward(token), std::move(fdata.packet) - ); - auto initiate = [](auto handler, byte_citer first, byte_citer last) { + auto initiation = []( + auto handler, std::unique_ptr packet + ) { auto ex = asio::get_associated_executor(handler); - asio::post(ex, [h = std::move(handler), first, last]() mutable { - std::move(h)(error_code {}, first, last); - }); + byte_citer first = packet->cbegin(); + byte_citer last = packet->cend(); + + asio::post( + ex, + asio::consign( + asio::append(std::move(handler), error_code{}, first, last), + std::move(packet) + ) + ); }; - return asio::async_initiate( - std::move(initiate), with_packet, first, last + + return asio::async_initiate( + initiation, token, std::move(fdata.packet) ); } @@ -115,12 +126,15 @@ public: byte_citer first, byte_citer last ) { auto handler_ptr = find_handler(code, packet_id); + if (handler_ptr == _handlers.end()) { _fast_replies.push_back({ - code, packet_id, std::make_unique(first, last) + code, packet_id, + std::make_unique(first, last) }); return; } + auto handler = std::move(*handler_ptr); _handlers.erase(handler_ptr); std::move(handler)(ec, first, last); @@ -159,7 +173,8 @@ public: for (auto it = _handlers.begin(); it != _handlers.end();) { if (it->code() == control_code_e::pubrel) { std::move(*it)( - asio::error::operation_aborted, byte_citer {}, byte_citer {} + asio::error::operation_aborted, + byte_citer {}, byte_citer {} ); it = _handlers.erase(it); } diff --git a/include/async_mqtt5/impl/write_op.hpp b/include/async_mqtt5/impl/write_op.hpp index 94bc707..1a95af7 100644 --- a/include/async_mqtt5/impl/write_op.hpp +++ b/include/async_mqtt5/impl/write_op.hpp @@ -20,8 +20,7 @@ class write_op { handler_type _handler; public: - write_op( - Owner& owner, Handler&& handler) : + write_op(Owner& owner, Handler&& handler) : _owner(owner), _handler(std::move(handler)) {} diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp index 7266c58..b508d00 100644 --- a/include/async_mqtt5/mqtt_client.hpp +++ b/include/async_mqtt5/mqtt_client.hpp @@ -363,7 +363,7 @@ public: ) { using Signature = detail::on_publish_signature; - auto initiate = [] ( + auto initiation = [] ( auto handler, std::string topic, std::string payload, retain_e retain, const publish_props& props, const clisvc_ptr& svc_ptr @@ -378,7 +378,7 @@ public: }; return asio::async_initiate( - std::move(initiate), token, + initiation, token, std::move(topic), std::move(payload), retain, props, _svc_ptr ); } @@ -437,7 +437,7 @@ public: error_code, std::vector, suback_props ); - auto initiate = [] ( + auto initiation = [] ( auto handler, const std::vector& topics, const subscribe_props& props, const clisvc_ptr& impl ) { @@ -446,7 +446,7 @@ public: }; return asio::async_initiate( - std::move(initiate), token, topics, props, _svc_ptr + initiation, token, topics, props, _svc_ptr ); } @@ -558,7 +558,7 @@ public: error_code, std::vector, unsuback_props ); - auto initiate = []( + auto initiation = []( auto handler, const std::vector& topics, const unsubscribe_props& props, const clisvc_ptr& impl @@ -568,7 +568,7 @@ public: }; return asio::async_initiate( - std::move(initiate), token, topics, props, _svc_ptr + initiation, token, topics, props, _svc_ptr ); }