From f80c1897675f65174788e96f907775b8fb33c0b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Wed, 13 Nov 2024 08:50:16 +0100 Subject: [PATCH] Allow IPv6 connections Summary: related to T13767, #19 Reviewers: ivica Reviewed By: ivica Subscribers: iljazovic, miljen Differential Revision: https://repo.mireo.local/D32140 --- .../async_mqtt5/impl/autoconnect_stream.hpp | 17 +-- include/async_mqtt5/impl/connect_op.hpp | 11 +- include/async_mqtt5/impl/reconnect_op.hpp | 65 +++++---- .../test_common/test_autoconnect_stream.hpp | 122 +++++++++++++++++ test/include/test_common/test_stream.hpp | 2 +- test/unit/connect_op.cpp | 4 +- test/unit/reconnect_op.cpp | 124 +++++++++++++++++- 7 files changed, 296 insertions(+), 49 deletions(-) create mode 100644 test/include/test_common/test_autoconnect_stream.hpp diff --git a/include/async_mqtt5/impl/autoconnect_stream.hpp b/include/async_mqtt5/impl/autoconnect_stream.hpp index 88ed696..06a2e16 100644 --- a/include/async_mqtt5/impl/autoconnect_stream.hpp +++ b/include/async_mqtt5/impl/autoconnect_stream.hpp @@ -61,9 +61,6 @@ private: template friend class reconnect_op; - template - friend class disconnect_op; - public: autoconnect_stream( const executor_type& ex, stream_context_type& context @@ -106,7 +103,7 @@ public: } void open() { - open_lowest_layer(_stream_ptr); + open_lowest_layer(_stream_ptr, asio::ip::tcp::v4()); } void cancel() { @@ -169,10 +166,10 @@ public: } private: - static void open_lowest_layer(stream_ptr sptr) { + static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) { error_code ec; auto& layer = lowest_layer(*sptr); - layer.open(asio::ip::tcp::v4(), ec); + layer.open(protocol, ec); layer.set_option(asio::socket_base::reuse_address(true), ec); layer.set_option(asio::ip::tcp::no_delay(true), ec); } @@ -189,9 +186,9 @@ private: return sptr; } - stream_ptr construct_and_open_next_layer() const { + stream_ptr construct_and_open_next_layer(asio::ip::tcp protocol) const { auto sptr = construct_next_layer(); - open_lowest_layer(sptr); + open_lowest_layer(sptr, protocol); return sptr; } @@ -212,11 +209,11 @@ private: using Signature = void (error_code); auto initiation = [](auto handler, self_type& self, stream_ptr s) { - reconnect_op { self, std::move(handler) }.perform(s); + reconnect_op { self, std::move(handler) }.perform(std::move(s)); }; return asio::async_initiate( - initiation, token, std::ref(*this), s + initiation, token, std::ref(*this), std::move(s) ); } }; diff --git a/include/async_mqtt5/impl/connect_op.hpp b/include/async_mqtt5/impl/connect_op.hpp index be227a2..4ea4f56 100644 --- a/include/async_mqtt5/impl/connect_op.hpp +++ b/include/async_mqtt5/impl/connect_op.hpp @@ -68,12 +68,11 @@ class connect_op { asio::cancellation_state _cancellation_state; using endpoint = asio::ip::tcp::endpoint; - using epoints = asio::ip::tcp::resolver::results_type; public: template connect_op( - Stream& stream, Handler&& handler, mqtt_ctx& ctx + Stream& stream, mqtt_ctx& ctx, Handler&& handler ) : _stream(stream), _ctx(ctx), _handler(std::forward(handler)), @@ -106,14 +105,12 @@ public: return asio::get_associated_executor(_handler); } - void perform( - const epoints& eps, authority_path ap - ) { + void perform(const endpoint& ep, authority_path ap) { lowest_layer(_stream).async_connect( - *std::begin(eps), + ep, asio::append( asio::prepend(std::move(*this), on_connect {}), - *std::begin(eps), std::move(ap) + ep, std::move(ap) ) ); } diff --git a/include/async_mqtt5/impl/reconnect_op.hpp b/include/async_mqtt5/impl/reconnect_op.hpp index 568dbf0..2b00769 100644 --- a/include/async_mqtt5/impl/reconnect_op.hpp +++ b/include/async_mqtt5/impl/reconnect_op.hpp @@ -114,7 +114,7 @@ public: if (ec == asio::error::operation_aborted) // cancelled without acquiring the lock (by calling client.cancel()) return std::move(_handler)(ec); - + if (!_owner.is_open()) return complete(asio::error::operation_aborted); @@ -148,8 +148,6 @@ public: on_next_endpoint, error_code ec, epoints eps, authority_path ap ) { - namespace asioex = boost::asio::experimental; - // the three error codes below are the only possible codes // that may be returned from async_next_endpont @@ -162,7 +160,17 @@ public: if (ec == asio::error::host_not_found) return complete(asio::error::no_recovery); - auto sptr = _owner.construct_and_open_next_layer(); + connect(std::move(eps), std::move(ap)); + } + + void connect(epoints eps, authority_path ap) { + namespace asioex = boost::asio::experimental; + + if (eps.empty()) + return do_reconnect(); + + const auto& ep = eps->endpoint(); + auto sptr = _owner.construct_and_open_next_layer(ep.protocol()); if constexpr (has_tls_context) setup_tls_sni( @@ -174,53 +182,58 @@ public: auto init_connect = []( auto handler, typename Owner::stream_type& stream, - mqtt_ctx& context, const epoints& eps, authority_path ap + mqtt_ctx& context, endpoint ep, authority_path ap ) { - connect_op { stream, std::move(handler), context } - .perform(eps, std::move(ap)); + connect_op { stream, context, std::move(handler) } + .perform(ep, std::move(ap)); }; auto timed_connect = asioex::make_parallel_group( - asio::async_initiate( + asio::async_initiate( init_connect, asio::deferred, std::ref(*sptr), std::ref(_owner._stream_context.mqtt_context()), - eps, std::move(ap) + ep, ap ), _owner._connect_timer.async_wait(asio::deferred) ); timed_connect.async_wait( asioex::wait_for_one(), - asio::prepend(std::move(*this), on_connect {}, std::move(sptr)) + asio::prepend( + std::move(*this), on_connect {}, + std::move(sptr), std::move(eps), std::move(ap) + ) ); } void operator()( - on_connect, typename Owner::stream_ptr sptr, + on_connect, + typename Owner::stream_ptr sptr, epoints eps, authority_path ap, std::array ord, error_code connect_ec, error_code timer_ec ) { - // connect_ec may be any of stream.async_connect() error codes - // plus access_denied, connection_refused and - // client::error::malformed_packet + // connect_ec may be any of: + // 1) async_connect error codes + // 2) async_handshake (TLS) error codes + // 3) async_handshake (WebSocket) error codes + // 4) async_write error codes + // 5) async_read error codes + // 5) client::error::malformed_packet if ( - ord[0] == 0 && connect_ec == asio::error::operation_aborted || - ord[0] == 1 && timer_ec == asio::error::operation_aborted || + (ord[0] == 0 && connect_ec == asio::error::operation_aborted) || + (ord[0] == 1 && timer_ec == asio::error::operation_aborted) || !_owner.is_open() ) return complete(asio::error::operation_aborted); - // operation timed out so retry - if (ord[0] == 1) - return do_reconnect(); - - if (connect_ec == asio::error::access_denied) - return complete(asio::error::no_recovery); - - // retry for any other stream.async_connect() error or - // connection_refused, client::error::malformed_packet - if (connect_ec) + // retry for operation timed out and any other error_code or client::error::malformed_packet + if (ord[0] == 1 || connect_ec) { + // if the hostname resolved into more endpoints, try the next one + if (++eps != eps.end()) + return connect(std::move(eps), std::move(ap)); + // try next server return do_reconnect(); + } _owner.replace_next_layer(std::move(sptr)); complete(error_code {}); diff --git a/test/include/test_common/test_autoconnect_stream.hpp b/test/include/test_common/test_autoconnect_stream.hpp new file mode 100644 index 0000000..7e4fb4b --- /dev/null +++ b/test/include/test_common/test_autoconnect_stream.hpp @@ -0,0 +1,122 @@ +// +// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#ifndef ASYNC_MQTT5_TEST_AUTOCONNECT_STREAM_HPP +#define ASYNC_MQTT5_TEST_AUTOCONNECT_STREAM_HPP + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include + +namespace async_mqtt5::test { + +namespace asio = boost::asio; +using error_code = boost::system::error_code; + +template < + typename StreamType, + typename StreamContext = std::monostate +> +class test_autoconnect_stream { +public: + using stream_type = StreamType; + using stream_ptr = std::shared_ptr; + using stream_context_type = StreamContext; + using executor_type = typename stream_type::executor_type; + +private: + executor_type _stream_executor; + detail::async_mutex _conn_mtx; + asio::steady_timer _connect_timer; + detail::endpoints _endpoints; + + stream_ptr _stream_ptr; + stream_context_type& _stream_context; + + template + friend class async_mqtt5::detail::reconnect_op; + +public: + test_autoconnect_stream( + const executor_type& ex, stream_context_type& context + ) : + _stream_executor(ex), + _conn_mtx(_stream_executor), + _connect_timer(_stream_executor), + _endpoints(_stream_executor, _connect_timer), + _stream_context(context) + { + replace_next_layer(construct_next_layer()); + open_lowest_layer(_stream_ptr, asio::ip::tcp::v4()); + } + + test_autoconnect_stream(const test_autoconnect_stream&) = delete; + test_autoconnect_stream& operator=(const test_autoconnect_stream&) = delete; + + stream_ptr stream_pointer() const { + return _stream_ptr; + } + + bool is_open() const noexcept { + return detail::lowest_layer(*_stream_ptr).is_open(); + } + + void brokers(std::string hosts, uint16_t default_port) { + _endpoints.brokers(std::move(hosts), default_port); + } + + static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) { + error_code ec; + auto& layer = detail::lowest_layer(*sptr); + layer.open(protocol, ec); + layer.set_option(asio::socket_base::reuse_address(true), ec); + layer.set_option(asio::ip::tcp::no_delay(true), ec); + } + + void close() { + error_code ec; + detail::lowest_layer(*_stream_ptr).shutdown(asio::ip::tcp::socket::shutdown_both, ec); + detail::lowest_layer(*_stream_ptr).close(ec); + } + + stream_ptr construct_next_layer() const { + stream_ptr sptr; + if constexpr (detail::has_tls_context) + sptr = std::make_shared( + _stream_executor, _stream_context.tls_context() + ); + else + sptr = std::make_shared(_stream_executor); + + return sptr; + } + + stream_ptr construct_and_open_next_layer(asio::ip::tcp protocol) const { + auto sptr = construct_next_layer(); + open_lowest_layer(sptr, protocol); + return sptr; + } + + void replace_next_layer(stream_ptr sptr) { + if (_stream_ptr) + close(); + std::exchange(_stream_ptr, std::move(sptr)); + } +}; + +} // end namespace async_mqtt5::test + +#endif // !ASYNC_MQTT5_TEST_AUTOCONNECT_STREAM_HPP diff --git a/test/include/test_common/test_stream.hpp b/test/include/test_common/test_stream.hpp index 5175ade..062fe1b 100644 --- a/test/include/test_common/test_stream.hpp +++ b/test/include/test_common/test_stream.hpp @@ -54,7 +54,7 @@ private: friend class write_op; public: - explicit test_stream_impl(executor_type ex) : _ex(std::move(ex)) {} + explicit test_stream_impl(executor_type ex) : _ex(std::move(ex)) {} test_stream_impl(test_stream_impl&&) = default; test_stream_impl(const test_stream_impl&) = delete; diff --git a/test/unit/connect_op.cpp b/test/unit/connect_op.cpp index 18cb938..560cbce 100644 --- a/test/unit/connect_op.cpp +++ b/test/unit/connect_op.cpp @@ -65,8 +65,8 @@ void run_unit_test( }; detail::connect_op( - stream, std::move(handler), mqtt_ctx - ).perform(eps, ap); + stream, mqtt_ctx, std::move(handler) + ).perform(*std::begin(eps), std::move(ap)); ioc.run_for(1s); BOOST_TEST(handlers_called == expected_handlers_called); diff --git a/test/unit/reconnect_op.cpp b/test/unit/reconnect_op.cpp index bc7a2bc..1af4409 100644 --- a/test/unit/reconnect_op.cpp +++ b/test/unit/reconnect_op.cpp @@ -8,18 +8,25 @@ #include #include +#include -#include +#include +#include +#include +#include #include +#include "test_common/test_autoconnect_stream.hpp" +#include "test_common/test_broker.hpp" +#include "test_common/test_stream.hpp" + using namespace async_mqtt5; +using namespace std::chrono_literals; BOOST_AUTO_TEST_SUITE(reconnect_op/*, *boost::unit_test::disabled()*/) BOOST_AUTO_TEST_CASE(exponential_backoff) { - using namespace std::chrono_literals; - detail::exponential_backoff generator; auto first_iter = generator.generate(); @@ -41,4 +48,115 @@ BOOST_AUTO_TEST_CASE(exponential_backoff) { BOOST_TEST((sixth_iter >= 15500ms && sixth_iter <= 16500ms)); } +struct test_tcp_stream : public test::test_stream { + + test_tcp_stream( + typename test::test_stream::executor_type ex + ) : + test::test_stream(std::move(ex)) + {} + + static int& succeed_after() { + static int _succed_after = 0; + return _succed_after; + } + + template + decltype(auto) async_connect( + const endpoint_type& ep, ConnectToken&& token + ) { + auto initiation = [this](auto handler, const endpoint_type& ep) { + error_code ec = --succeed_after() < 0 ? error_code {} : asio::error::connection_refused; + if (!ec) { + error_code cec; + test::test_stream::open(ep.protocol(), cec); + test::test_stream::connect(ep, cec); + } + asio::post(get_executor(), asio::prepend(std::move(handler), ec)); + }; + + return asio::async_initiate( + std::move(initiation), token, ep + ); + } +}; + +using underlying_stream = test_tcp_stream; +using stream_context = detail::stream_context; +using astream = test::test_autoconnect_stream; + +void run_connect_to_localhost_test(int succeed_after) { + using test::after; + error_code success {}; + + const std::string connect = encoders::encode_connect( + "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt + ); + const std::string connack = encoders::encode_connack( + true, reason_codes::success.value(), {} + ); + + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(2ms)) + .reply_with(connack, after(4ms)); + + auto& broker = asio::make_service( + ioc, ioc.get_executor(), std::move(broker_side) + ); + + auto stream_ctx = stream_context(std::monostate {}); + auto auto_stream = astream(ioc.get_executor(), stream_ctx); + auto_stream.brokers("localhost", 1883); + + auto handler = [&handlers_called](error_code ec) { + ++handlers_called; + BOOST_TEST(!ec); + }; + + test_tcp_stream::succeed_after() = succeed_after; + detail::reconnect_op(auto_stream, std::move(handler)) + .perform(auto_stream.stream_pointer()); + + ioc.run(); + BOOST_TEST(expected_handlers_called == handlers_called); + BOOST_TEST(broker.received_all_expected()); +} + +BOOST_AUTO_TEST_CASE(connect_to_first_localhost) { + // connect to first in the resolver list + run_connect_to_localhost_test(2); +} + +BOOST_AUTO_TEST_CASE(connect_to_second_localhost) { + // connect to second in the resolver list + run_connect_to_localhost_test(3); +} + +BOOST_AUTO_TEST_CASE(no_servers) { + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + asio::io_context ioc; + auto stream_ctx = stream_context(std::monostate{}); + auto auto_stream = astream(ioc.get_executor(), stream_ctx); + auto_stream.brokers("", 1883); + + auto handler = [&handlers_called](error_code ec) { + ++handlers_called; + BOOST_TEST(ec == asio::error::no_recovery); + }; + + detail::reconnect_op(auto_stream, std::move(handler)) + .perform(auto_stream.stream_pointer()); + + ioc.run(); + BOOST_TEST(expected_handlers_called == handlers_called); +} + BOOST_AUTO_TEST_SUITE_END();