#ifndef ASYNC_MQTT5_ASSEMBLE_OP_HPP #define ASYNC_MQTT5_ASSEMBLE_OP_HPP #include #include #include #include #include #include #include #include #include #include #include #include namespace async_mqtt5::detail { namespace asio = boost::asio; class data_span : private std::pair { using base = std::pair; public: using base::base; auto first() const { return base::first; } auto last() const { return base::second; } void expand_suffix(size_t num_chars) { base::second += num_chars; } void remove_prefix(size_t num_chars) { base::first += num_chars; } size_t size() const { return std::distance(base::first, base::second); } }; template class assemble_op { using client_service = ClientService; struct on_read {}; static constexpr size_t max_packet_size = 65536; client_service& _svc; std::decay_t _handler; std::string& _read_buff; data_span& _data_span; public: assemble_op( client_service& svc, Handler&& handler, std::string& read_buff, data_span& active_span ) : _svc(svc), _handler(std::move(handler)), _read_buff(read_buff), _data_span(active_span) {} assemble_op(assemble_op&&) noexcept = default; assemble_op(const assemble_op&) = delete; using executor_type = typename client_service::executor_type; executor_type get_executor() const noexcept { return _svc.get_executor(); } using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } template void perform(duration wait_for, CompletionCondition cc) { _read_buff.erase( _read_buff.cbegin(), _data_span.first() ); // TODO: respect max packet size from CONNACK _read_buff.resize(max_packet_size); _data_span = { _read_buff.cbegin(), _read_buff.cbegin() + _data_span.size() }; if (cc(error_code {}, 0) == 0 && _data_span.size()) { return asio::post( asio::prepend( std::move(*this), on_read {}, error_code {}, 0, wait_for, std::move(cc) ) ); } // Must be evaluated before this is moved auto store_begin = _read_buff.data() + _data_span.size(); auto store_size = std::distance(_data_span.last(), _read_buff.cend()); _svc._stream.async_read_some( asio::buffer(store_begin, store_size), wait_for, asio::prepend( asio::append(std::move(*this), wait_for, std::move(cc)), on_read {} ) ); } template void operator()( on_read, error_code ec, size_t bytes_read, duration wait_for, CompletionCondition cc ) { if (ec == asio::error::try_again) { _svc._async_sender.resend(); _data_span = { _read_buff.cend(), _read_buff.cend() }; return perform(wait_for, std::move(cc)); } if (ec) return complete(ec, 0, 0, {}, {}); _data_span.expand_suffix(bytes_read); assert(_data_span.size()); auto control_code = uint8_t(*_data_span.first()); if ((control_code & 0b11110000) == 0) // close the connection, cancel return complete(client::error::malformed_packet, 0, 0, {}, {}); auto first = _data_span.first() + 1; auto varlen = decoders::type_parse( first, _data_span.last(), decoders::basic::varint_ ); if (!varlen) { if (_data_span.size() < 5) return perform(wait_for, asio::transfer_at_least(1)); return complete(client::error::malformed_packet, 0, 0, {}, {}); } // TODO: respect max packet size which could be dinamically set by the broker if (*varlen > max_packet_size - std::distance(_data_span.first(), first)) return complete(client::error::malformed_packet, 0, 0, {}, {}); if (std::distance(first, _data_span.last()) < *varlen) return perform(wait_for, asio::transfer_at_least(1)); _data_span.remove_prefix( std::distance(_data_span.first(), first) + *varlen ); dispatch(wait_for, control_code, first, first + *varlen); } private: static bool valid_header(uint8_t control_byte) { using enum control_code_e; auto code = control_code_e(control_byte & 0b11110000); if (code == publish) return true; auto res = control_byte & 0b00001111; if (code == pubrel) return res == 0b00000010; return res == 0b00000000; } static bool contains_packet_id(control_code_e code) { using enum control_code_e; return code == puback || code == pubrec || code == pubrel || code == pubcomp || code == subscribe || code == suback || code == unsubscribe || code == unsuback; } void dispatch( duration wait_for, uint8_t control_code, byte_citer first, byte_citer last ) { using namespace decoders; using enum control_code_e; if (!valid_header(control_code)) return complete(client::error::malformed_packet, 0, 0, {}, {}); auto code = control_code_e(control_code & 0b11110000); if (code == pingresp) return perform(wait_for, asio::transfer_at_least(0)); uint16_t packet_id = 0; if (contains_packet_id(code)) packet_id = decoders::decode_packet_id(first).value(); bool is_reply = code != publish && code != auth && code != disconnect; if (is_reply) { _svc._replies.dispatch(error_code {}, code, packet_id, first, last); return perform(wait_for, asio::transfer_at_least(0)); } complete(error_code {}, packet_id, control_code, first, last); } void complete( error_code ec, uint16_t packet_id, uint8_t control_code, byte_citer first, byte_citer last ) { asio::dispatch( get_executor(), asio::prepend( std::move(_handler), ec, packet_id, control_code, first, last ) ); } }; } // end namespace async_mqtt5::detail #endif // !ASYNC_MQTT5_ASSEMBLE_OP_HPP