diff --git a/doc/qbk/reference/concepts/StreamType.qbk b/doc/qbk/reference/concepts/StreamType.qbk index 0b57678..8639dd1 100644 --- a/doc/qbk/reference/concepts/StreamType.qbk +++ b/doc/qbk/reference/concepts/StreamType.qbk @@ -10,10 +10,13 @@ `StreamType` should meet the [beastconceptslink streams AsyncStream] concept. -Additionally, it should follow Asio's layered stream model by having a `lowest_layer_type` member type, +It should follow Asio's layered stream model by having a `lowest_layer_type` member type, and a `lowest_layer` member function, returing a `lowest_layer_type&`. The `lowest_layer_type` should inherit from __TCP_SOCKET__. +Additionally, it should have an overload of [ghreflink include/boost/mqtt5/detail/shutdown.hpp async_shutdown] +function that is discoverable via argument-dependent lookup (ADL). + The types __TCP_SOCKET__, __SSL_STREAM__ and __WEBSOCKET_STREAM__ meet these requirements. [endsect] diff --git a/example/hello_world_over_tls.cpp b/example/hello_world_over_tls.cpp index 35f6a65..7b7bf48 100644 --- a/example/hello_world_over_tls.cpp +++ b/example/hello_world_over_tls.cpp @@ -9,6 +9,7 @@ #include #include #include +#include // OpenSSL traits #include #include diff --git a/example/hello_world_over_websocket_tls.cpp b/example/hello_world_over_websocket_tls.cpp index a6d89df..bff7b4f 100644 --- a/example/hello_world_over_websocket_tls.cpp +++ b/example/hello_world_over_websocket_tls.cpp @@ -9,13 +9,12 @@ #include #include #include -#include // WebSocket traits +#include // WebSocket and OpenSSL traits #include #include #include #include -#include // async_teardown specialization for WebSocket SSL stream #include #include diff --git a/include/boost/mqtt5/detail/shutdown.hpp b/include/boost/mqtt5/detail/shutdown.hpp new file mode 100644 index 0000000..a3fb972 --- /dev/null +++ b/include/boost/mqtt5/detail/shutdown.hpp @@ -0,0 +1,36 @@ +#ifndef BOOST_MQTT5_SHUTDOWN_HPP +#define BOOST_MQTT5_SHUTDOWN_HPP + +#include + +namespace boost::mqtt5::detail { + +template +void async_shutdown(Stream& /* stream */, ShutdownHandler&& /* handler */) { +/* + If you are trying to use beast::websocket::stream and/or OpenSSL + and this goes off, you need to add an include for one of these + * + * + * + + If you are trying to use mqtt_client with user-defined stream type, you must + provide an overload of async_shutdown that is discoverable via + argument-dependent lookup (ADL). +*/ + static_assert(sizeof(Stream) == -1, + "Unknown Stream type in async_shutdown."); +} + +template +void async_shutdown( + asio::basic_stream_socket& socket, ShutdownHandler&& handler +) { + boost::system::error_code ec; + socket.shutdown(asio::socket_base::shutdown_both, ec); + return std::move(handler)(ec); +} + +} // end namespace boost::mqtt5::detail + +#endif // !BOOST_MQTT5_SHUTDOWN_HPP diff --git a/include/boost/mqtt5/impl/autoconnect_stream.hpp b/include/boost/mqtt5/impl/autoconnect_stream.hpp index 20af127..e40cc29 100644 --- a/include/boost/mqtt5/impl/autoconnect_stream.hpp +++ b/include/boost/mqtt5/impl/autoconnect_stream.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -66,6 +67,9 @@ private: template friend class reconnect_op; + template + friend class shutdown_op; + public: autoconnect_stream( const executor_type& ex, stream_context_type& context, @@ -114,21 +118,26 @@ public: } void cancel() { - error_code ec; - lowest_layer(*_stream_ptr).cancel(ec); _conn_mtx.cancel(); _connect_timer.cancel(); } void close() { error_code ec; - shutdown(asio::ip::tcp::socket::shutdown_both); lowest_layer(*_stream_ptr).close(ec); } - void shutdown(asio::ip::tcp::socket::shutdown_type what) { - error_code ec; - lowest_layer(*_stream_ptr).shutdown(what, ec); + template + void async_shutdown(CompletionToken&& token) { + using Signature = void (error_code); + + auto initiation = [](auto handler, self_type& self) { + shutdown_op { self, std::move(handler) }.perform(); + }; + + return asio::async_initiate( + initiation, token, std::ref(*this) + ); } bool was_connected() const { diff --git a/include/boost/mqtt5/impl/client_service.hpp b/include/boost/mqtt5/impl/client_service.hpp index 38302b7..acb56ac 100644 --- a/include/boost/mqtt5/impl/client_service.hpp +++ b/include/boost/mqtt5/impl/client_service.hpp @@ -400,8 +400,9 @@ public: return _stream.is_open(); } - void close_stream() { - _stream.close(); + template + decltype(auto) async_shutdown(CompletionToken&& token) { + return _stream.async_shutdown(std::forward(token)); } void cancel() { diff --git a/include/boost/mqtt5/impl/connect_op.hpp b/include/boost/mqtt5/impl/connect_op.hpp index b71e4d3..a351496 100644 --- a/include/boost/mqtt5/impl/connect_op.hpp +++ b/include/boost/mqtt5/impl/connect_op.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -54,6 +55,7 @@ class connect_op { struct on_auth_data {}; struct on_send_auth {}; struct on_complete_auth {}; + struct on_shutdown {}; Stream& _stream; mqtt_ctx& _ctx; @@ -94,8 +96,7 @@ public: return asio::get_associated_allocator(_handler); } - using cancellation_slot_type = - asio::associated_cancellation_slot_t; + using cancellation_slot_type = asio::cancellation_slot; cancellation_slot_type get_cancellation_slot() const noexcept { return _cancellation_state.slot(); } @@ -207,7 +208,7 @@ public: return complete(asio::error::operation_aborted); if (ec) - return complete(asio::error::try_again); + return do_shutdown(asio::error::try_again); _ctx.co_props[prop::authentication_data] = std::move(data); send_connect(); @@ -238,7 +239,7 @@ public: return complete(asio::error::operation_aborted); if (ec) - return complete(ec); + return do_shutdown(ec); _buffer_ptr = std::make_unique(min_packet_sz, char(0)); @@ -256,12 +257,12 @@ public: return complete(asio::error::operation_aborted); if (ec) - return complete(ec); + return do_shutdown(ec); auto code = control_code_e((*_buffer_ptr)[0] & 0b11110000); if (code != control_code_e::auth && code != control_code_e::connack) - return complete(asio::error::try_again); + return do_shutdown(asio::error::try_again); auto varlen_ptr = _buffer_ptr->cbegin() + 1; auto varlen = decoders::type_parse( @@ -269,7 +270,7 @@ public: ); if (!varlen) - return complete(asio::error::try_again); + return do_shutdown(asio::error::try_again); auto varlen_sz = std::distance(_buffer_ptr->cbegin() + 1, varlen_ptr); auto remain_len = *varlen - @@ -299,13 +300,13 @@ public: return complete(asio::error::operation_aborted); if (ec) - return complete(ec); + return do_shutdown(ec); if (code == control_code_e::connack) return on_connack(first, last); if (!_ctx.co_props[prop::authentication_method].has_value()) - return complete(client::error::malformed_packet); + return do_shutdown(client::error::malformed_packet); on_auth(first, last); } @@ -314,7 +315,7 @@ public: auto packet_length = static_cast(std::distance(first, last)); auto rv = decoders::decode_connack(packet_length, first); if (!rv.has_value()) - return complete(client::error::malformed_packet); + return do_shutdown(client::error::malformed_packet); const auto& [session_present, reason_code, ca_props] = *rv; _ctx.ca_props = ca_props; @@ -328,11 +329,11 @@ public: auto rc = to_reason_code(reason_code); if (!rc.has_value()) // reason code not allowed in CONNACK - return complete(client::error::malformed_packet); + return do_shutdown(client::error::malformed_packet); _log.at_connack(*rc, session_present, ca_props); if (*rc) - return complete(asio::error::try_again); + return do_shutdown(asio::error::try_again); if (_ctx.co_props[prop::authentication_method].has_value()) return _ctx.authenticator.async_auth( @@ -348,7 +349,7 @@ public: auto packet_length = static_cast(std::distance(first, last)); auto rv = decoders::decode_auth(packet_length, first); if (!rv.has_value()) - return complete(client::error::malformed_packet); + return do_shutdown(client::error::malformed_packet); const auto& [reason_code, auth_props] = *rv; auto rc = to_reason_code(reason_code); @@ -357,7 +358,7 @@ public: auth_props[prop::authentication_method] != _ctx.co_props[prop::authentication_method] ) - return complete(client::error::malformed_packet); + return do_shutdown(client::error::malformed_packet); _ctx.authenticator.async_auth( auth_step_e::server_challenge, @@ -371,7 +372,7 @@ public: return complete(asio::error::operation_aborted); if (ec) - return complete(asio::error::try_again); + return do_shutdown(asio::error::try_again); auth_props props; props[prop::authentication_method] = @@ -400,7 +401,7 @@ public: return complete(asio::error::operation_aborted); if (ec) - return complete(ec); + return do_shutdown(ec); auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz); asio::async_read( @@ -414,18 +415,34 @@ public: return complete(asio::error::operation_aborted); if (ec) - return complete(asio::error::try_again); + return do_shutdown(asio::error::try_again); complete(error_code {}); } + void do_shutdown(error_code connect_ec) { + auto init_shutdown = [&stream = _stream](auto handler) { + async_shutdown(stream, std::move(handler)); + }; + auto token = asio::prepend(std::move(*this), on_shutdown{}, connect_ec); + + return asio::async_initiate( + init_shutdown, token + ); + } + + void operator()(on_shutdown, error_code connect_ec, error_code) { + // ignore shutdown error_code + complete(connect_ec); + } + private: bool is_cancelled() const { return _cancellation_state.cancelled() != asio::cancellation_type::none; } void complete(error_code ec) { - _cancellation_state.slot().clear(); + asio::get_associated_cancellation_slot(_handler).clear(); std::move(_handler)(ec); } }; diff --git a/include/boost/mqtt5/impl/disconnect_op.hpp b/include/boost/mqtt5/impl/disconnect_op.hpp index d822267..a913de4 100644 --- a/include/boost/mqtt5/impl/disconnect_op.hpp +++ b/include/boost/mqtt5/impl/disconnect_op.hpp @@ -45,6 +45,7 @@ class disconnect_op { using client_service = ClientService; struct on_disconnect {}; + struct on_shutdown {}; std::shared_ptr _svc_ptr; DisconnectContext _context; @@ -143,15 +144,15 @@ public: return complete(error_code {}); } - if (_context.terminal) { + return _svc_ptr->async_shutdown( + asio::prepend(std::move(*this), on_shutdown {}) + ); + } + + void operator()(on_shutdown, error_code ec) { + if (_context.terminal) _svc_ptr->cancel(); - return complete(error_code {}); - } - - _svc_ptr->close_stream(); - _svc_ptr->open_stream(); - - complete(error_code {}); + complete(ec); } private: diff --git a/include/boost/mqtt5/impl/read_message_op.hpp b/include/boost/mqtt5/impl/read_message_op.hpp index a8c00bb..338dad4 100644 --- a/include/boost/mqtt5/impl/read_message_op.hpp +++ b/include/boost/mqtt5/impl/read_message_op.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -128,8 +129,9 @@ private: .value_or(reason_codes::unspecified_error), props ); - _svc_ptr->close_stream(); - _svc_ptr->open_stream(); + return _svc_ptr->async_shutdown( + asio::prepend(std::move(*this), on_disconnect {}) + ); } break; case control_code_e::auth: { diff --git a/include/boost/mqtt5/impl/shutdown_op.hpp b/include/boost/mqtt5/impl/shutdown_op.hpp new file mode 100644 index 0000000..3bd24c2 --- /dev/null +++ b/include/boost/mqtt5/impl/shutdown_op.hpp @@ -0,0 +1,160 @@ +// +// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_MQTT5_SHUTDOWN_OP_HPP +#define BOOST_MQTT5_SHUTDOWN_OP_HPP + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace boost::mqtt5::detail { + +template +constexpr bool is_basic_socket = false; + +template +constexpr bool is_basic_socket> = true; + +namespace asio = boost::asio; + +template +class shutdown_op { + struct on_locked {}; + struct on_shutdown {}; + + Owner& _owner; + + using handler_type = asio::any_completion_handler; + handler_type _handler; + +public: + template + shutdown_op(Owner& owner, Handler&& handler) : + _owner(owner), _handler(std::move(handler)) + {} + + shutdown_op(shutdown_op&&) = default; + shutdown_op(const shutdown_op&) = delete; + + shutdown_op& operator=(shutdown_op&&) = default; + shutdown_op& operator=(const shutdown_op&) = delete; + + using allocator_type = asio::associated_allocator_t; + allocator_type get_allocator() const noexcept { + return asio::get_associated_allocator(_handler); + } + + using cancellation_slot_type = + asio::associated_cancellation_slot_t; + cancellation_slot_type get_cancellation_slot() const noexcept { + return asio::get_associated_cancellation_slot(_handler); + } + + using executor_type = typename Owner::executor_type; + executor_type get_executor() const noexcept { + return _owner.get_executor(); + } + + void perform() { + if constexpr (is_basic_socket) { + error_code ec; + _owner._stream_ptr->shutdown(asio::socket_base::shutdown_both, ec); + return std::move(_handler)(error_code {}); + } + else { + if (_owner._conn_mtx.is_locked()) + return std::move(_handler)(error_code{}); + + auto s = std::move(_owner._stream_ptr); + _owner.replace_next_layer(_owner.construct_next_layer()); + _owner.open(); + + _owner._conn_mtx.lock( + asio::prepend(std::move(*this), on_locked {}, std::move(s)) + ); + } + } + + void operator()(on_locked, typename Owner::stream_ptr s, error_code ec) { + if (ec == asio::error::operation_aborted) + return complete(s, asio::error::operation_aborted); + + if (!_owner.is_open()) { + _owner._conn_mtx.unlock(); + return complete(s, asio::error::operation_aborted); + } + + namespace asioex = boost::asio::experimental; + + // wait max 5 seconds for the shutdown op to finish + _owner._connect_timer.expires_after(std::chrono::seconds(5)); + + auto init_shutdown = []( + auto handler, typename Owner::stream_type& stream + ) { + async_shutdown(stream, std::move(handler)); + }; + + auto timed_shutdown = asioex::make_parallel_group( + asio::async_initiate( + init_shutdown, asio::deferred, std::ref(*s) + ), + _owner._connect_timer.async_wait(asio::deferred) + ); + + timed_shutdown.async_wait( + asioex::wait_for_one(), + asio::prepend( + std::move(*this), on_shutdown {}, + std::move(s) + ) + ); + } + + void operator()( + on_shutdown, typename Owner::stream_ptr sptr, + std::array /* ord */, + error_code /* shutdown_ec */, error_code /* timer_ec */ + ) { + _owner._conn_mtx.unlock(); + + if (!_owner.is_open()) + return complete(sptr, asio::error::operation_aborted); + + // ignore shutdown error_code + complete(sptr, error_code {}); + } + +private: + void complete(const typename Owner::stream_ptr& sptr, error_code ec) { + asio::get_associated_cancellation_slot(_handler).clear(); + error_code close_ec; + lowest_layer(*sptr).close(close_ec); + std::move(_handler)(ec); + } +}; + + +} // end namespace boost::mqtt5::detail + +#endif // !BOOST_MQTT5_SHUTDOWN_OP_HPP diff --git a/include/boost/mqtt5/ssl.hpp b/include/boost/mqtt5/ssl.hpp new file mode 100644 index 0000000..cd41c37 --- /dev/null +++ b/include/boost/mqtt5/ssl.hpp @@ -0,0 +1,34 @@ +// +// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_MQTT5_SSL_HPP +#define BOOST_MQTT5_SSL_HPP + +#include +#include + +#include + +#include + +namespace boost::mqtt5 { + +namespace detail { + +// in namespace boost::mqtt5::detail to enable ADL +template +void async_shutdown( + boost::asio::ssl::stream& stream, ShutdownHandler&& handler +) { + stream.async_shutdown(std::move(handler)); +} + +} // end namespace detail + +} // end namespace boost::mqtt5 + +#endif // !BOOST_MQTT5_SSL_HPP diff --git a/include/boost/mqtt5/websocket.hpp b/include/boost/mqtt5/websocket.hpp index 5491902..53cb4c3 100644 --- a/include/boost/mqtt5/websocket.hpp +++ b/include/boost/mqtt5/websocket.hpp @@ -8,6 +8,9 @@ #ifndef BOOST_MQTT5_WEBSOCKET_HPP #define BOOST_MQTT5_WEBSOCKET_HPP +#include +#include + #include #include @@ -49,6 +52,21 @@ struct ws_handshake_traits> { } }; +namespace detail { + +// in namespace boost::mqtt5::detail to enable ADL +template +void async_shutdown( + boost::beast::websocket::stream& stream, ShutdownHandler&& handler +) { + stream.async_close( + beast::websocket::close_code::normal, + std::move(handler) + ); +} + +} // end namespace detail + } // end namespace boost::mqtt5 #endif // !BOOST_MQTT5_WEBSOCKET_HPP diff --git a/include/boost/mqtt5/websocket_ssl.hpp b/include/boost/mqtt5/websocket_ssl.hpp new file mode 100644 index 0000000..a8eee2c --- /dev/null +++ b/include/boost/mqtt5/websocket_ssl.hpp @@ -0,0 +1,16 @@ +// +// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_MQTT5_WEBSOCKET_SSL_HPP +#define BOOST_MQTT5_WEBSOCKET_SSL_HPP + +#include +#include + +#include + +#endif // !BOOST_MQTT5_WEBSOCKET_SSL_HPP diff --git a/test/include/test_common/test_autoconnect_stream.hpp b/test/include/test_common/test_autoconnect_stream.hpp index f45df44..25e36e1 100644 --- a/test/include/test_common/test_autoconnect_stream.hpp +++ b/test/include/test_common/test_autoconnect_stream.hpp @@ -95,7 +95,6 @@ public: void close() { error_code ec; - detail::lowest_layer(*_stream_ptr).shutdown(asio::ip::tcp::socket::shutdown_both, ec); detail::lowest_layer(*_stream_ptr).close(ec); } diff --git a/test/include/test_common/test_broker.hpp b/test/include/test_common/test_broker.hpp index 40716ce..25cf99a 100644 --- a/test/include/test_common/test_broker.hpp +++ b/test/include/test_common/test_broker.hpp @@ -224,6 +224,7 @@ public: auto initiation = [this]( auto handler, const MutableBuffer& buffer ) { + _pending_read.complete(_ex, asio::error::operation_aborted, 0); _pending_read = pending_read(buffer, std::move(handler)); complete_read(); }; diff --git a/test/include/test_common/test_stream.hpp b/test/include/test_common/test_stream.hpp index d077d8e..cf05042 100644 --- a/test/include/test_common/test_stream.hpp +++ b/test/include/test_common/test_stream.hpp @@ -79,10 +79,6 @@ public: _test_broker = nullptr; } - void shutdown(asio::ip::tcp::socket::shutdown_type, error_code& ec) { - ec = {}; - } - void connect(const endpoint_type& ep, error_code& ec) { ec = {}; _remote_ep = ep; @@ -165,8 +161,6 @@ public: } void operator()(on_read, error_code ec, size_t bytes_read) { - if (ec) - _stream_impl->disconnect(); complete(ec, bytes_read); } @@ -223,8 +217,6 @@ public: } void operator()(on_write, error_code ec, size_t bytes_written) { - if (ec) - _stream_impl->disconnect(); complete(ec, bytes_written); } @@ -300,10 +292,6 @@ public: return _impl->is_connected(); } - void shutdown(asio::ip::tcp::socket::shutdown_type st, error_code& ec) { - return _impl->shutdown(st, ec); - } - endpoint_type remote_endpoint(error_code& ec) { return _impl->remote_endpoint(ec); } @@ -368,6 +356,10 @@ public: }; +template +void async_shutdown(test_stream&, ShutdownHandler&& handler) { + return std::move(handler)(error_code {}); +} } // end namespace boost::mqtt5::test diff --git a/test/integration/client.cpp b/test/integration/client.cpp index 37c75c0..f10af7e 100644 --- a/test/integration/client.cpp +++ b/test/integration/client.cpp @@ -11,7 +11,7 @@ #ifdef BOOST_ASIO_HAS_CO_AWAIT #include -#include +#include #include #include @@ -19,7 +19,6 @@ #include #include #include -#include // async_teardown specialization for websocket ssl stream #include #include diff --git a/test/integration/disconnect.cpp b/test/integration/disconnect.cpp index a087aa7..3bcb6cd 100644 --- a/test/integration/disconnect.cpp +++ b/test/integration/disconnect.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -311,4 +312,67 @@ BOOST_FIXTURE_TEST_CASE(omit_props, shared_test_data) { BOOST_TEST(handlers_called == expected_handlers_called); } +struct long_shutdown_stream : public test::test_stream { + long_shutdown_stream(typename test::test_stream::executor_type ex) : + test::test_stream(std::move(ex)) {} +}; + +template +void async_shutdown(long_shutdown_stream& stream, ShutdownHandler&& handler) { + auto timer = std::make_shared(stream.get_executor()); + timer->expires_after(std::chrono::seconds(10)); + timer->async_wait(asio::consign(std::move(handler), std::move(timer))); +} + +BOOST_DATA_TEST_CASE_F( + shared_test_data, cancel_disconnect_in_shutdown, + boost::unit_test::data::make({ 100, 8000 }), cancel_delay_ms +) { + asio::io_context ioc; + auto executor = ioc.get_executor(); + + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(0ms)) + .reply_with(connack, after(0ms)) + .expect(disconnect) + .complete_with(success, after(0ms)); + + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + asio::steady_timer timer(executor); + mqtt_client c(executor); + c.brokers("127.0.0.1") + .async_run(asio::detached); + + asio::cancellation_signal signal; + + c.async_disconnect( + asio::bind_cancellation_slot( + signal.slot(), + [&](error_code ec) { + handlers_called++; + BOOST_TEST(ec == asio::error::operation_aborted); + timer.cancel(); + } + ) + ); + + timer.expires_after(std::chrono::milliseconds(cancel_delay_ms)); + timer.async_wait([&signal](error_code) { + signal.emit(asio::cancellation_type::all); + }); + + ioc.run_for(6s); + + BOOST_TEST(broker.received_all_expected()); + BOOST_TEST(handlers_called == expected_handlers_called); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/integration/read_message.cpp b/test/integration/read_message.cpp index 18af78a..216051a 100644 --- a/test/integration/read_message.cpp +++ b/test/integration/read_message.cpp @@ -172,6 +172,52 @@ BOOST_FIXTURE_TEST_CASE(receive_disconnect, shared_test_data) { BOOST_TEST(broker.received_all_expected()); } +BOOST_FIXTURE_TEST_CASE(receive_disconnect_while_reconnecting, shared_test_data) { + // packets + auto disconnect = encoders::encode_disconnect(0x00, {}); + constexpr int expected_handlers_called = 1; + int handlers_called = 0; + + test::msg_exchange broker_side; + broker_side + .expect(connect) + .complete_with(success, after(1ms)) + .reply_with(connack, after(2ms)) + .expect(publish) + .complete_with(fail, after(20ms)) + .send(disconnect, after(30ms)) + .expect(connect) + .complete_with(success, after(20ms)) + .reply_with(connack, after(30ms)) + .expect(publish) + .complete_with(success, after(0ms)); + + asio::io_context ioc; + auto executor = ioc.get_executor(); + auto& broker = asio::make_service( + ioc, executor, std::move(broker_side) + ); + + using client_type = mqtt_client; + client_type c(executor); + c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff + .async_run(asio::detached); + + c.async_publish( + topic, payload, retain_e::no, {}, + [&](error_code ec) { + BOOST_TEST(handlers_called == 0); + handlers_called++; + BOOST_TEST(!ec); + c.cancel(); + } + ); + + ioc.run_for(1s); + BOOST_TEST(handlers_called == expected_handlers_called); + BOOST_TEST(broker.received_all_expected()); +} + template void run_receive_test( test::msg_exchange broker_side, int num_of_receives, diff --git a/test/integration/send_publish.cpp b/test/integration/send_publish.cpp index 7db2b28..89cd9e0 100644 --- a/test/integration/send_publish.cpp +++ b/test/integration/send_publish.cpp @@ -533,7 +533,7 @@ BOOST_FIXTURE_TEST_CASE(cancel_resending_publish, shared_test_data) { [&handlers_called, &c](error_code ec, reason_code rc, puback_props) { ++handlers_called; - BOOST_TEST(ec = asio::error::operation_aborted); + BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST(rc == reason_codes::empty); c.cancel(); diff --git a/test/unit/default_completion_tokens.cpp b/test/unit/default_completion_tokens.cpp index 7fe382a..6f16c19 100644 --- a/test/unit/default_completion_tokens.cpp +++ b/test/unit/default_completion_tokens.cpp @@ -11,13 +11,12 @@ #ifdef BOOST_ASIO_HAS_CO_AWAIT #include -#include +#include #include #include #include #include -#include // async_teardown for asio::ssl::socket #include #include diff --git a/test/unit/logger.cpp b/test/unit/logger.cpp index ef91907..5e9dc25 100644 --- a/test/unit/logger.cpp +++ b/test/unit/logger.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include @@ -16,7 +16,6 @@ #include #include #include -#include // async_teardown specialization for websocket ssl stream #include #include #include diff --git a/test/unit/reconnect_op.cpp b/test/unit/reconnect_op.cpp index 8fe1368..79476ed 100644 --- a/test/unit/reconnect_op.cpp +++ b/test/unit/reconnect_op.cpp @@ -84,6 +84,11 @@ struct test_tcp_stream : public test::test_stream { } }; +template +void async_shutdown(test_tcp_stream&, ShutdownHandler&& handler) { + return std::move(handler)(error_code {}); +} + using underlying_stream = test_tcp_stream; using stream_context = detail::stream_context; using astream = test::test_autoconnect_stream;