mirror of
https://github.com/boostorg/mqtt5.git
synced 2025-06-25 04:01:33 +02:00
Allow IPv6 connections
Summary: related to T13767, #19 Reviewers: ivica Reviewed By: ivica Subscribers: iljazovic, miljen Differential Revision: https://repo.mireo.local/D32140
This commit is contained in:
@ -61,9 +61,6 @@ private:
|
||||
template <typename Stream>
|
||||
friend class reconnect_op;
|
||||
|
||||
template <typename Owner, typename DisconnectContext>
|
||||
friend class disconnect_op;
|
||||
|
||||
public:
|
||||
autoconnect_stream(
|
||||
const executor_type& ex, stream_context_type& context
|
||||
@ -106,7 +103,7 @@ public:
|
||||
}
|
||||
|
||||
void open() {
|
||||
open_lowest_layer(_stream_ptr);
|
||||
open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
|
||||
}
|
||||
|
||||
void cancel() {
|
||||
@ -169,10 +166,10 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
static void open_lowest_layer(stream_ptr sptr) {
|
||||
static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
|
||||
error_code ec;
|
||||
auto& layer = lowest_layer(*sptr);
|
||||
layer.open(asio::ip::tcp::v4(), ec);
|
||||
layer.open(protocol, ec);
|
||||
layer.set_option(asio::socket_base::reuse_address(true), ec);
|
||||
layer.set_option(asio::ip::tcp::no_delay(true), ec);
|
||||
}
|
||||
@ -189,9 +186,9 @@ private:
|
||||
return sptr;
|
||||
}
|
||||
|
||||
stream_ptr construct_and_open_next_layer() const {
|
||||
stream_ptr construct_and_open_next_layer(asio::ip::tcp protocol) const {
|
||||
auto sptr = construct_next_layer();
|
||||
open_lowest_layer(sptr);
|
||||
open_lowest_layer(sptr, protocol);
|
||||
return sptr;
|
||||
}
|
||||
|
||||
@ -212,11 +209,11 @@ private:
|
||||
using Signature = void (error_code);
|
||||
|
||||
auto initiation = [](auto handler, self_type& self, stream_ptr s) {
|
||||
reconnect_op { self, std::move(handler) }.perform(s);
|
||||
reconnect_op { self, std::move(handler) }.perform(std::move(s));
|
||||
};
|
||||
|
||||
return asio::async_initiate<CompletionToken, Signature>(
|
||||
initiation, token, std::ref(*this), s
|
||||
initiation, token, std::ref(*this), std::move(s)
|
||||
);
|
||||
}
|
||||
};
|
||||
|
@ -68,12 +68,11 @@ class connect_op {
|
||||
asio::cancellation_state _cancellation_state;
|
||||
|
||||
using endpoint = asio::ip::tcp::endpoint;
|
||||
using epoints = asio::ip::tcp::resolver::results_type;
|
||||
|
||||
public:
|
||||
template <typename Handler>
|
||||
connect_op(
|
||||
Stream& stream, Handler&& handler, mqtt_ctx& ctx
|
||||
Stream& stream, mqtt_ctx& ctx, Handler&& handler
|
||||
) :
|
||||
_stream(stream), _ctx(ctx),
|
||||
_handler(std::forward<Handler>(handler)),
|
||||
@ -106,14 +105,12 @@ public:
|
||||
return asio::get_associated_executor(_handler);
|
||||
}
|
||||
|
||||
void perform(
|
||||
const epoints& eps, authority_path ap
|
||||
) {
|
||||
void perform(const endpoint& ep, authority_path ap) {
|
||||
lowest_layer(_stream).async_connect(
|
||||
*std::begin(eps),
|
||||
ep,
|
||||
asio::append(
|
||||
asio::prepend(std::move(*this), on_connect {}),
|
||||
*std::begin(eps), std::move(ap)
|
||||
ep, std::move(ap)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ public:
|
||||
if (ec == asio::error::operation_aborted)
|
||||
// cancelled without acquiring the lock (by calling client.cancel())
|
||||
return std::move(_handler)(ec);
|
||||
|
||||
|
||||
if (!_owner.is_open())
|
||||
return complete(asio::error::operation_aborted);
|
||||
|
||||
@ -148,8 +148,6 @@ public:
|
||||
on_next_endpoint, error_code ec,
|
||||
epoints eps, authority_path ap
|
||||
) {
|
||||
namespace asioex = boost::asio::experimental;
|
||||
|
||||
// the three error codes below are the only possible codes
|
||||
// that may be returned from async_next_endpont
|
||||
|
||||
@ -162,7 +160,17 @@ public:
|
||||
if (ec == asio::error::host_not_found)
|
||||
return complete(asio::error::no_recovery);
|
||||
|
||||
auto sptr = _owner.construct_and_open_next_layer();
|
||||
connect(std::move(eps), std::move(ap));
|
||||
}
|
||||
|
||||
void connect(epoints eps, authority_path ap) {
|
||||
namespace asioex = boost::asio::experimental;
|
||||
|
||||
if (eps.empty())
|
||||
return do_reconnect();
|
||||
|
||||
const auto& ep = eps->endpoint();
|
||||
auto sptr = _owner.construct_and_open_next_layer(ep.protocol());
|
||||
|
||||
if constexpr (has_tls_context<typename Owner::stream_context_type>)
|
||||
setup_tls_sni(
|
||||
@ -174,53 +182,58 @@ public:
|
||||
|
||||
auto init_connect = [](
|
||||
auto handler, typename Owner::stream_type& stream,
|
||||
mqtt_ctx& context, const epoints& eps, authority_path ap
|
||||
mqtt_ctx& context, endpoint ep, authority_path ap
|
||||
) {
|
||||
connect_op { stream, std::move(handler), context }
|
||||
.perform(eps, std::move(ap));
|
||||
connect_op { stream, context, std::move(handler) }
|
||||
.perform(ep, std::move(ap));
|
||||
};
|
||||
|
||||
auto timed_connect = asioex::make_parallel_group(
|
||||
asio::async_initiate<const asio::deferred_t, void (error_code)>(
|
||||
asio::async_initiate<const asio::deferred_t, void(error_code)>(
|
||||
init_connect, asio::deferred, std::ref(*sptr),
|
||||
std::ref(_owner._stream_context.mqtt_context()),
|
||||
eps, std::move(ap)
|
||||
ep, ap
|
||||
),
|
||||
_owner._connect_timer.async_wait(asio::deferred)
|
||||
);
|
||||
|
||||
timed_connect.async_wait(
|
||||
asioex::wait_for_one(),
|
||||
asio::prepend(std::move(*this), on_connect {}, std::move(sptr))
|
||||
asio::prepend(
|
||||
std::move(*this), on_connect {},
|
||||
std::move(sptr), std::move(eps), std::move(ap)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
void operator()(
|
||||
on_connect, typename Owner::stream_ptr sptr,
|
||||
on_connect,
|
||||
typename Owner::stream_ptr sptr, epoints eps, authority_path ap,
|
||||
std::array<std::size_t, 2> ord,
|
||||
error_code connect_ec, error_code timer_ec
|
||||
) {
|
||||
// connect_ec may be any of stream.async_connect() error codes
|
||||
// plus access_denied, connection_refused and
|
||||
// client::error::malformed_packet
|
||||
// connect_ec may be any of:
|
||||
// 1) async_connect error codes
|
||||
// 2) async_handshake (TLS) error codes
|
||||
// 3) async_handshake (WebSocket) error codes
|
||||
// 4) async_write error codes
|
||||
// 5) async_read error codes
|
||||
// 5) client::error::malformed_packet
|
||||
if (
|
||||
ord[0] == 0 && connect_ec == asio::error::operation_aborted ||
|
||||
ord[0] == 1 && timer_ec == asio::error::operation_aborted ||
|
||||
(ord[0] == 0 && connect_ec == asio::error::operation_aborted) ||
|
||||
(ord[0] == 1 && timer_ec == asio::error::operation_aborted) ||
|
||||
!_owner.is_open()
|
||||
)
|
||||
return complete(asio::error::operation_aborted);
|
||||
|
||||
// operation timed out so retry
|
||||
if (ord[0] == 1)
|
||||
return do_reconnect();
|
||||
|
||||
if (connect_ec == asio::error::access_denied)
|
||||
return complete(asio::error::no_recovery);
|
||||
|
||||
// retry for any other stream.async_connect() error or
|
||||
// connection_refused, client::error::malformed_packet
|
||||
if (connect_ec)
|
||||
// retry for operation timed out and any other error_code or client::error::malformed_packet
|
||||
if (ord[0] == 1 || connect_ec) {
|
||||
// if the hostname resolved into more endpoints, try the next one
|
||||
if (++eps != eps.end())
|
||||
return connect(std::move(eps), std::move(ap));
|
||||
// try next server
|
||||
return do_reconnect();
|
||||
}
|
||||
|
||||
_owner.replace_next_layer(std::move(sptr));
|
||||
complete(error_code {});
|
||||
|
122
test/include/test_common/test_autoconnect_stream.hpp
Normal file
122
test/include/test_common/test_autoconnect_stream.hpp
Normal file
@ -0,0 +1,122 @@
|
||||
//
|
||||
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0.
|
||||
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
#ifndef ASYNC_MQTT5_TEST_AUTOCONNECT_STREAM_HPP
|
||||
#define ASYNC_MQTT5_TEST_AUTOCONNECT_STREAM_HPP
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include <async_mqtt5/detail/async_mutex.hpp>
|
||||
#include <async_mqtt5/detail/async_traits.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/endpoints.hpp>
|
||||
|
||||
namespace async_mqtt5::test {
|
||||
|
||||
namespace asio = boost::asio;
|
||||
using error_code = boost::system::error_code;
|
||||
|
||||
template <
|
||||
typename StreamType,
|
||||
typename StreamContext = std::monostate
|
||||
>
|
||||
class test_autoconnect_stream {
|
||||
public:
|
||||
using stream_type = StreamType;
|
||||
using stream_ptr = std::shared_ptr<stream_type>;
|
||||
using stream_context_type = StreamContext;
|
||||
using executor_type = typename stream_type::executor_type;
|
||||
|
||||
private:
|
||||
executor_type _stream_executor;
|
||||
detail::async_mutex _conn_mtx;
|
||||
asio::steady_timer _connect_timer;
|
||||
detail::endpoints _endpoints;
|
||||
|
||||
stream_ptr _stream_ptr;
|
||||
stream_context_type& _stream_context;
|
||||
|
||||
template <typename Stream>
|
||||
friend class async_mqtt5::detail::reconnect_op;
|
||||
|
||||
public:
|
||||
test_autoconnect_stream(
|
||||
const executor_type& ex, stream_context_type& context
|
||||
) :
|
||||
_stream_executor(ex),
|
||||
_conn_mtx(_stream_executor),
|
||||
_connect_timer(_stream_executor),
|
||||
_endpoints(_stream_executor, _connect_timer),
|
||||
_stream_context(context)
|
||||
{
|
||||
replace_next_layer(construct_next_layer());
|
||||
open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
|
||||
}
|
||||
|
||||
test_autoconnect_stream(const test_autoconnect_stream&) = delete;
|
||||
test_autoconnect_stream& operator=(const test_autoconnect_stream&) = delete;
|
||||
|
||||
stream_ptr stream_pointer() const {
|
||||
return _stream_ptr;
|
||||
}
|
||||
|
||||
bool is_open() const noexcept {
|
||||
return detail::lowest_layer(*_stream_ptr).is_open();
|
||||
}
|
||||
|
||||
void brokers(std::string hosts, uint16_t default_port) {
|
||||
_endpoints.brokers(std::move(hosts), default_port);
|
||||
}
|
||||
|
||||
static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
|
||||
error_code ec;
|
||||
auto& layer = detail::lowest_layer(*sptr);
|
||||
layer.open(protocol, ec);
|
||||
layer.set_option(asio::socket_base::reuse_address(true), ec);
|
||||
layer.set_option(asio::ip::tcp::no_delay(true), ec);
|
||||
}
|
||||
|
||||
void close() {
|
||||
error_code ec;
|
||||
detail::lowest_layer(*_stream_ptr).shutdown(asio::ip::tcp::socket::shutdown_both, ec);
|
||||
detail::lowest_layer(*_stream_ptr).close(ec);
|
||||
}
|
||||
|
||||
stream_ptr construct_next_layer() const {
|
||||
stream_ptr sptr;
|
||||
if constexpr (detail::has_tls_context<StreamContext>)
|
||||
sptr = std::make_shared<stream_type>(
|
||||
_stream_executor, _stream_context.tls_context()
|
||||
);
|
||||
else
|
||||
sptr = std::make_shared<stream_type>(_stream_executor);
|
||||
|
||||
return sptr;
|
||||
}
|
||||
|
||||
stream_ptr construct_and_open_next_layer(asio::ip::tcp protocol) const {
|
||||
auto sptr = construct_next_layer();
|
||||
open_lowest_layer(sptr, protocol);
|
||||
return sptr;
|
||||
}
|
||||
|
||||
void replace_next_layer(stream_ptr sptr) {
|
||||
if (_stream_ptr)
|
||||
close();
|
||||
std::exchange(_stream_ptr, std::move(sptr));
|
||||
}
|
||||
};
|
||||
|
||||
} // end namespace async_mqtt5::test
|
||||
|
||||
#endif // !ASYNC_MQTT5_TEST_AUTOCONNECT_STREAM_HPP
|
@ -54,7 +54,7 @@ private:
|
||||
friend class write_op;
|
||||
|
||||
public:
|
||||
explicit test_stream_impl(executor_type ex) : _ex(std::move(ex)) {}
|
||||
explicit test_stream_impl(executor_type ex) : _ex(std::move(ex)) {}
|
||||
|
||||
test_stream_impl(test_stream_impl&&) = default;
|
||||
test_stream_impl(const test_stream_impl&) = delete;
|
||||
|
@ -65,8 +65,8 @@ void run_unit_test(
|
||||
};
|
||||
|
||||
detail::connect_op<test::test_stream>(
|
||||
stream, std::move(handler), mqtt_ctx
|
||||
).perform(eps, ap);
|
||||
stream, mqtt_ctx, std::move(handler)
|
||||
).perform(*std::begin(eps), std::move(ap));
|
||||
|
||||
ioc.run_for(1s);
|
||||
BOOST_TEST(handlers_called == expected_handlers_called);
|
||||
|
@ -8,18 +8,25 @@
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/prepend.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/client_service.hpp>
|
||||
#include <async_mqtt5/impl/reconnect_op.hpp>
|
||||
|
||||
#include "test_common/test_autoconnect_stream.hpp"
|
||||
#include "test_common/test_broker.hpp"
|
||||
#include "test_common/test_stream.hpp"
|
||||
|
||||
using namespace async_mqtt5;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(reconnect_op/*, *boost::unit_test::disabled()*/)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(exponential_backoff) {
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
detail::exponential_backoff generator;
|
||||
|
||||
auto first_iter = generator.generate();
|
||||
@ -41,4 +48,115 @@ BOOST_AUTO_TEST_CASE(exponential_backoff) {
|
||||
BOOST_TEST((sixth_iter >= 15500ms && sixth_iter <= 16500ms));
|
||||
}
|
||||
|
||||
struct test_tcp_stream : public test::test_stream {
|
||||
|
||||
test_tcp_stream(
|
||||
typename test::test_stream::executor_type ex
|
||||
) :
|
||||
test::test_stream(std::move(ex))
|
||||
{}
|
||||
|
||||
static int& succeed_after() {
|
||||
static int _succed_after = 0;
|
||||
return _succed_after;
|
||||
}
|
||||
|
||||
template <typename ConnectToken>
|
||||
decltype(auto) async_connect(
|
||||
const endpoint_type& ep, ConnectToken&& token
|
||||
) {
|
||||
auto initiation = [this](auto handler, const endpoint_type& ep) {
|
||||
error_code ec = --succeed_after() < 0 ? error_code {} : asio::error::connection_refused;
|
||||
if (!ec) {
|
||||
error_code cec;
|
||||
test::test_stream::open(ep.protocol(), cec);
|
||||
test::test_stream::connect(ep, cec);
|
||||
}
|
||||
asio::post(get_executor(), asio::prepend(std::move(handler), ec));
|
||||
};
|
||||
|
||||
return asio::async_initiate<ConnectToken, void(error_code)>(
|
||||
std::move(initiation), token, ep
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
using underlying_stream = test_tcp_stream;
|
||||
using stream_context = detail::stream_context<underlying_stream, std::monostate>;
|
||||
using astream = test::test_autoconnect_stream<underlying_stream, stream_context>;
|
||||
|
||||
void run_connect_to_localhost_test(int succeed_after) {
|
||||
using test::after;
|
||||
error_code success {};
|
||||
|
||||
const std::string connect = encoders::encode_connect(
|
||||
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
|
||||
);
|
||||
const std::string connack = encoders::encode_connack(
|
||||
true, reason_codes::success.value(), {}
|
||||
);
|
||||
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
test::msg_exchange broker_side;
|
||||
broker_side
|
||||
.expect(connect)
|
||||
.complete_with(success, after(2ms))
|
||||
.reply_with(connack, after(4ms));
|
||||
|
||||
auto& broker = asio::make_service<test::test_broker>(
|
||||
ioc, ioc.get_executor(), std::move(broker_side)
|
||||
);
|
||||
|
||||
auto stream_ctx = stream_context(std::monostate {});
|
||||
auto auto_stream = astream(ioc.get_executor(), stream_ctx);
|
||||
auto_stream.brokers("localhost", 1883);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec) {
|
||||
++handlers_called;
|
||||
BOOST_TEST(!ec);
|
||||
};
|
||||
|
||||
test_tcp_stream::succeed_after() = succeed_after;
|
||||
detail::reconnect_op(auto_stream, std::move(handler))
|
||||
.perform(auto_stream.stream_pointer());
|
||||
|
||||
ioc.run();
|
||||
BOOST_TEST(expected_handlers_called == handlers_called);
|
||||
BOOST_TEST(broker.received_all_expected());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(connect_to_first_localhost) {
|
||||
// connect to first in the resolver list
|
||||
run_connect_to_localhost_test(2);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(connect_to_second_localhost) {
|
||||
// connect to second in the resolver list
|
||||
run_connect_to_localhost_test(3);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(no_servers) {
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
auto stream_ctx = stream_context(std::monostate{});
|
||||
auto auto_stream = astream(ioc.get_executor(), stream_ctx);
|
||||
auto_stream.brokers("", 1883);
|
||||
|
||||
auto handler = [&handlers_called](error_code ec) {
|
||||
++handlers_called;
|
||||
BOOST_TEST(ec == asio::error::no_recovery);
|
||||
};
|
||||
|
||||
detail::reconnect_op(auto_stream, std::move(handler))
|
||||
.perform(auto_stream.stream_pointer());
|
||||
|
||||
ioc.run();
|
||||
BOOST_TEST(expected_handlers_called == handlers_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END();
|
||||
|
Reference in New Issue
Block a user