Avoid unnecessary read buffer shifting on every incoming packet

Summary: Fixes #33

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina, miljen

Differential Revision: https://repo.mireo.local/D36258
This commit is contained in:
Bruno Iljazovic
2025-07-15 10:42:42 +02:00
parent 53c33f23bb
commit 62e9dc8b41
3 changed files with 78 additions and 24 deletions

View File

@@ -97,15 +97,6 @@ public:
template <typename CompletionCondition>
void perform(CompletionCondition cc) {
_read_buff.erase(
_read_buff.cbegin(), _data_span.first()
);
_read_buff.resize(max_recv_size());
_data_span = {
_read_buff.cbegin(),
_read_buff.cbegin() + _data_span.size()
};
if (cc(error_code {}, 0) == 0 && _data_span.size()) {
return asio::post(
_svc.get_executor(),
@@ -116,8 +107,11 @@ public:
);
}
prepare_buffer(1);
// Must be evaluated before this is moved
auto store_begin = _read_buff.data() + _data_span.size();
auto store_begin = _read_buff.data()
+ std::distance(_read_buff.cbegin(), _data_span.last());
auto store_size = std::distance(_data_span.last(), _read_buff.cend());
_svc._stream.async_read_some(
@@ -137,7 +131,7 @@ public:
if (ec == asio::error::try_again) {
_svc.update_session_state();
_svc._async_sender.resend();
_data_span = { _read_buff.cend(), _read_buff.cend() };
_data_span = { _read_buff.cbegin(), _read_buff.cbegin() };
return perform(std::move(cc));
}
@@ -170,8 +164,10 @@ public:
)
return complete(client::error::packet_too_large, 0, {}, {});
if (std::distance(first, _data_span.last()) < *varlen)
if (std::distance(first, _data_span.last()) < *varlen) {
prepare_buffer(*varlen - std::distance(first, _data_span.last()));
return perform(asio::transfer_at_least(1));
}
_data_span.remove_prefix(
std::distance(_data_span.first(), first) + *varlen
@@ -181,6 +177,22 @@ public:
}
private:
void prepare_buffer(std::ptrdiff_t extra_len) {
if (std::distance(_data_span.last(), _read_buff.cend()) >= extra_len)
return;
// make room for the packet by erasing bytes we already parsed from the
// beginning of the read buffer
const auto data_span_size = _data_span.size();
_read_buff.erase(_read_buff.cbegin(), _data_span.first());
_read_buff.resize(max_recv_size());
_data_span = {
_read_buff.cbegin(),
_read_buff.cbegin() + data_span_size
};
}
uint32_t max_recv_size() const {
return std::min(
_svc.connect_property(prop::maximum_packet_size)
@@ -239,7 +251,7 @@ private:
byte_citer first, byte_citer last
) {
if (ec)
_data_span = { _read_buff.cend(), _read_buff.cend() };
_data_span = { _read_buff.cbegin(), _read_buff.cbegin() };
std::move(_handler)(ec, control_code, first, last);
}
};

View File

@@ -276,7 +276,7 @@ private:
_stream(_executor, _stream_context, _log),
_replies(_executor),
_async_sender(*this),
_active_span(_read_buff.cend(), _read_buff.cend()),
_active_span(_read_buff.cbegin(), _read_buff.cbegin()),
_rec_channel(_executor, (std::numeric_limits<size_t>::max)()),
_ping_timer(_executor),
_sentry_timer(_executor)
@@ -296,7 +296,7 @@ public:
_stream(ex, _stream_context, _log),
_replies(ex),
_async_sender(*this),
_active_span(_read_buff.cend(), _read_buff.cend()),
_active_span(_read_buff.cbegin(), _read_buff.cbegin()),
_rec_channel(ex, (std::numeric_limits<size_t>::max)()),
_ping_timer(ex),
_sentry_timer(ex)

View File

@@ -12,6 +12,7 @@
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/test/data/test_case.hpp>
#include <boost/test/unit_test.hpp>
#include <chrono>
@@ -224,7 +225,7 @@ BOOST_FIXTURE_TEST_CASE(receive_disconnect_while_reconnecting, shared_test_data)
template <typename VerifyFun>
void run_receive_test(
test::msg_exchange broker_side, int num_of_receives,
VerifyFun&& verify_fun
size_t max_packet_size, VerifyFun&& verify_fun
) {
const int expected_handlers_called = num_of_receives;
int handlers_called = 0;
@@ -238,6 +239,7 @@ void run_receive_test(
using client_type = mqtt_client<test::test_stream>;
client_type c(executor);
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
.connect_property(prop::maximum_packet_size, max_packet_size)
.async_run(asio::detached);
for (int i = 0; i < num_of_receives; ++i)
@@ -277,7 +279,47 @@ BOOST_FIXTURE_TEST_CASE(receive_byte_by_byte, shared_test_data) {
BOOST_TEST(payload == payload_);
};
run_receive_test(std::move(broker_side), 1, std::move(verify_fun));
run_receive_test(std::move(broker_side), 1, 65'536, std::move(verify_fun));
}
namespace bdata = boost::unit_test::data;
BOOST_DATA_TEST_CASE_F(
shared_test_data, receive_with_different_max_packet_sizes,
bdata::make(5, 500, 50'000) * bdata::make(1, 2) * bdata::xrange(-1, 5),
publish_payload_size, max_packet_size_multiplier, max_packet_size_offset
) {
auto payload1 = std::string(publish_payload_size, 'a');
auto publish1 = encoders::encode_publish(
0, topic, payload1, qos_e::at_most_once, retain_e::no, dup_e::no, {}
);
const uint32_t max_packet_size = std::max(
publish1.size(),
publish1.size() * max_packet_size_multiplier + max_packet_size_offset
);
connect_props cprops;
cprops[prop::maximum_packet_size] = max_packet_size;
auto connect1 = encoders::encode_connect(
"", std::nullopt, std::nullopt, 60, false, cprops, std::nullopt
);
test::msg_exchange broker_side;
broker_side
.expect(connect1)
.complete_with(success, after(1ms))
.reply_with(connack, after(2ms))
.send(publish1, publish1, publish1, publish1, publish1, after(100ms));
auto verify_fun = [&](
error_code ec, std::string topic_, std::string payload_, publish_props
) {
BOOST_TEST(!ec);
BOOST_TEST(topic == topic_);
BOOST_TEST(payload1 == payload_);
};
run_receive_test(std::move(broker_side), 5, max_packet_size, std::move(verify_fun));
}
BOOST_FIXTURE_TEST_CASE(receive_multiple_packets_at_once, shared_test_data) {
@@ -290,13 +332,13 @@ BOOST_FIXTURE_TEST_CASE(receive_multiple_packets_at_once, shared_test_data) {
auto verify_fun = [&](
error_code ec, std::string topic_, std::string payload_, publish_props
) {
BOOST_TEST(!ec);
BOOST_TEST(topic == topic_);
BOOST_TEST(payload == payload_);
};
) {
BOOST_TEST(!ec);
BOOST_TEST(topic == topic_);
BOOST_TEST(payload == payload_);
};
run_receive_test(std::move(broker_side), 5, std::move(verify_fun));
run_receive_test(std::move(broker_side), 5, 65'536, std::move(verify_fun));
}
BOOST_FIXTURE_TEST_CASE(receive_multiple_packets_with_malformed, shared_test_data) {
@@ -329,7 +371,7 @@ BOOST_FIXTURE_TEST_CASE(receive_multiple_packets_with_malformed, shared_test_dat
BOOST_TEST(payload == payload_);
};
run_receive_test(std::move(broker_side), 3, std::move(verify_fun));
run_receive_test(std::move(broker_side), 3, 65'536, std::move(verify_fun));
}
BOOST_AUTO_TEST_SUITE_END();