Files
mqtt5/test/include/test_common/test_broker.hpp
Bruno Iljazovic 6198b0e44a Mock timer and resolver in unit tests.
Summary:
* Removed all usage of real timers and resolvers in unit tests
* Moved most of the tests to test/unit folder
* cmake: split boost_mqtt5_tests into boost_mqtt5_unittests and boost_mqtt5_integrationtests

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen

Differential Revision: https://repo.mireo.local/D38186
2025-11-17 13:10:36 +01:00

310 lines
9.0 KiB
C++

//
// Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_MQTT5_TEST_TEST_BROKER_HPP
#define BOOST_MQTT5_TEST_TEST_BROKER_HPP
#include <boost/mqtt5/types.hpp>
#include <boost/mqtt5/impl/codecs/message_decoders.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/execution_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/system/error_code.hpp>
#include <boost/test/tools/interface.hpp>
#include <boost/test/unit_test.hpp>
#include <algorithm>
#include <cstdint>
#include <deque>
#include <memory>
#include <numeric>
#include <string>
#include <vector>
#include "message_exchange.hpp"
#include "packet_util.hpp"
#include "test_timer.hpp"
#include "test_resolver.hpp"
#define BOOST_MQTT5_DETAIL_RESOLVER_TYPE boost::mqtt5::test::test_resolver
#define BOOST_MQTT5_DETAIL_CLOCK_TYPE boost::mqtt5::test::clock
namespace boost::mqtt5::test {
namespace asio = boost::asio;
using error_code = boost::system::error_code;
class pending_read {
void* _buffer_data { nullptr };
size_t _buffer_size { 0 };
asio::any_completion_handler<void (error_code, size_t)> _handler {};
public:
template <typename MutableBuffer, typename Handler>
pending_read(const MutableBuffer& buffer, Handler&& handler) :
_buffer_data(buffer.data()),
_buffer_size(buffer.size()),
_handler(std::move(handler))
{}
pending_read() = default;
pending_read(pending_read&&) = default;
pending_read(const pending_read&) = delete;
pending_read& operator=(pending_read&&) = default;
pending_read& operator=(const pending_read&) = delete;
size_t consume(const std::vector<uint8_t>& data) {
size_t num_bytes = (std::min)(_buffer_size, data.size());
if (num_bytes == 0)
return 0;
std::memcpy(_buffer_data, data.data(), num_bytes);
return num_bytes;
}
template <typename Executor>
void complete(const Executor& ex, error_code ec, size_t bytes_read) {
if (empty())
return;
if (ec || bytes_read || _buffer_size == 0)
asio::post(ex, asio::prepend(std::move(_handler), ec, bytes_read));
}
constexpr bool empty() const {
return !_handler;
}
};
class test_broker : public asio::execution_context::service {
public:
using executor_type = asio::any_io_executor;
using protocol_type = asio::ip::tcp;
using endpoint_type = asio::ip::tcp::endpoint;
static inline asio::execution_context::id id {};
private:
using base = asio::execution_context::service;
struct on_receive {};
struct on_delayed_complete {};
struct broker_data {
error_code ec;
std::vector<uint8_t> bytes;
};
executor_type _ex;
std::deque<broker_data> _broker_data;
pending_read _pending_read;
msg_exchange _broker_side;
public:
explicit test_broker(
asio::execution_context& context,
asio::any_io_executor ex = {}, msg_exchange broker_side = {}
) :
base(context), _ex(std::move(ex)), _broker_side(std::move(broker_side))
{
launch_broker_ops();
}
test_broker(const test_broker&) = delete;
test_broker& operator=(const test_broker&) = delete;
executor_type get_executor() const noexcept {
return _ex;
}
void close_connection() {
_pending_read.complete(
get_executor(), asio::error::operation_aborted, 0
);
_broker_data.clear();
}
bool received_all_expected() {
return !_broker_side.has_remaining_messages();
}
template <typename ConstBufferSequence, typename WriteToken>
decltype(auto) write_to_network(
const ConstBufferSequence& buffers,
WriteToken&& token
) {
auto initiation = [this](
auto handler, const ConstBufferSequence& buffers
) {
auto reply_action = _broker_side.pop_reply_action();
size_t bytes_written = std::accumulate(
asio::buffer_sequence_begin(buffers),
asio::buffer_sequence_end(buffers),
size_t(0), [](size_t a, const auto& b) { return a + b.size(); }
);
executor_type ex = get_executor();
if (!reply_action.has_value()) {
BOOST_ERROR(
"Broker side did not expect: " <<
boost::algorithm::join(to_readable_packets(buffers), ",")
);
return asio::post(asio::prepend(std::move(handler), error_code{}, 0));
}
const auto& expected = reply_action->expected_packets();
size_t buffers_size = std::distance(
asio::buffer_sequence_begin(buffers), asio::buffer_sequence_end(buffers)
);
BOOST_TEST(buffers_size == expected.size());
size_t num_packets = (std::min)(buffers_size, expected.size());
auto it = asio::buffer_sequence_begin(buffers);
for (size_t i = 0; i < num_packets; ++i, ++it) {
BOOST_TEST(it->size() == expected[i].size());
size_t len = (std::min)(it->size(), expected[i].size());
if (memcmp(it->data(), expected[i].data(), len))
BOOST_ERROR(
concat_strings(
"Packet mismatch!\nExpected: ",
to_readable_packet(expected[i]),
"\nReceived: ",
to_readable_packet(std::string((const char*)it->data(), it->size()))
)
);
}
async_delay(
_ex, reply_action->write_completion(),
asio::prepend(
std::ref(*this), on_delayed_complete {},
std::move(handler), bytes_written
)
);
for (auto& op : reply_action->pop_reply_ops())
async_delay(
_ex, std::move(op),
asio::prepend(std::ref(*this), on_receive {})
);
};
return asio::async_initiate<WriteToken, void (error_code, size_t)>(
std::move(initiation), token, buffers
);
}
template <typename MutableBuffer, typename ReadToken>
decltype(auto) read_from_network(
const MutableBuffer& buffer, ReadToken&& token
) {
auto initiation = [this](
auto handler, const MutableBuffer& buffer
) {
_pending_read.complete(_ex, asio::error::operation_aborted, 0);
_pending_read = pending_read(buffer, std::move(handler));
complete_read();
};
return asio::async_initiate<ReadToken, void (error_code, size_t)>(
std::move(initiation), token, buffer
);
}
void operator()(
on_receive, error_code delay_ec,
error_code ec, std::vector<uint8_t> bytes
) {
if (delay_ec) // asio::operation_aborted
return;
_broker_data.push_back({ ec, std::move(bytes) });
complete_read();
}
template <typename Handler>
void operator()(
on_delayed_complete, Handler handler, size_t bytes,
error_code delay_ec, error_code ec
) {
if (delay_ec) { // asio::operation_aborted
ec = delay_ec;
bytes = 0;
}
asio::dispatch(asio::prepend(std::move(handler), ec, bytes));
}
void cancel_pending_read() {
_pending_read.complete(get_executor(), asio::error::operation_aborted, 0);
}
static void run(asio::io_context& ioc) {
while (!ioc.stopped()) {
ioc.poll();
asio::use_service<timer_service<clock>>(ioc).advance();
}
}
private:
void shutdown() override {
cancel_pending_read();
}
void launch_broker_ops() {
for (auto& op: _broker_side.pop_broker_ops()) {
async_delay(
_ex,
std::move(op),
asio::prepend(std::ref(*this), on_receive {})
);
}
}
void complete_read() {
if (_pending_read.empty())
return;
error_code ec = {};
size_t bytes_read = 0;
if (!_broker_data.empty()) {
auto& [read_ec, bytes] = _broker_data.front();
ec = read_ec;
bytes_read = _pending_read.consume(bytes);
if (bytes_read == bytes.size())
_broker_data.pop_front();
else
bytes.erase(bytes.begin(), bytes.begin() + bytes_read);
}
_pending_read.complete(get_executor(), ec, bytes_read);
}
};
} // end namespace boost::mqtt5::test
#endif // BOOST_MQTT5_TEST_TEST_BROKER_HPP