Gracefully shutdown Websocket and TLS streams

Summary:
Resolves #18
Ref T15241

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina, miljen

Maniphest Tasks: T15241

Differential Revision: https://repo.mireo.local/D33395
This commit is contained in:
Bruno Iljazovic
2025-01-30 18:16:30 +01:00
parent 4df9dbbb07
commit 8b41f0dd4d
23 changed files with 460 additions and 59 deletions

View File

@ -0,0 +1,36 @@
#ifndef BOOST_MQTT5_SHUTDOWN_HPP
#define BOOST_MQTT5_SHUTDOWN_HPP
#include <boost/asio/basic_stream_socket.hpp>
namespace boost::mqtt5::detail {
template <typename Stream, typename ShutdownHandler>
void async_shutdown(Stream& /* stream */, ShutdownHandler&& /* handler */) {
/*
If you are trying to use beast::websocket::stream and/or OpenSSL
and this goes off, you need to add an include for one of these
* <boost/mqtt5/websocket.hpp>
* <boost/mqtt5/ssl.hpp>
* <boost/mqtt5/websocket_ssl.hpp>
If you are trying to use mqtt_client with user-defined stream type, you must
provide an overload of async_shutdown that is discoverable via
argument-dependent lookup (ADL).
*/
static_assert(sizeof(Stream) == -1,
"Unknown Stream type in async_shutdown.");
}
template <typename P, typename E, typename ShutdownHandler>
void async_shutdown(
asio::basic_stream_socket<P, E>& socket, ShutdownHandler&& handler
) {
boost::system::error_code ec;
socket.shutdown(asio::socket_base::shutdown_both, ec);
return std::move(handler)(ec);
}
} // end namespace boost::mqtt5::detail
#endif // !BOOST_MQTT5_SHUTDOWN_HPP

View File

@ -15,6 +15,7 @@
#include <boost/mqtt5/impl/endpoints.hpp>
#include <boost/mqtt5/impl/read_op.hpp>
#include <boost/mqtt5/impl/reconnect_op.hpp>
#include <boost/mqtt5/impl/shutdown_op.hpp>
#include <boost/mqtt5/impl/write_op.hpp>
#include <boost/asio/async_result.hpp>
@ -66,6 +67,9 @@ private:
template <typename Owner>
friend class reconnect_op;
template <typename Owner>
friend class shutdown_op;
public:
autoconnect_stream(
const executor_type& ex, stream_context_type& context,
@ -114,21 +118,26 @@ public:
}
void cancel() {
error_code ec;
lowest_layer(*_stream_ptr).cancel(ec);
_conn_mtx.cancel();
_connect_timer.cancel();
}
void close() {
error_code ec;
shutdown(asio::ip::tcp::socket::shutdown_both);
lowest_layer(*_stream_ptr).close(ec);
}
void shutdown(asio::ip::tcp::socket::shutdown_type what) {
error_code ec;
lowest_layer(*_stream_ptr).shutdown(what, ec);
template <typename CompletionToken>
void async_shutdown(CompletionToken&& token) {
using Signature = void (error_code);
auto initiation = [](auto handler, self_type& self) {
shutdown_op { self, std::move(handler) }.perform();
};
return asio::async_initiate<CompletionToken, Signature>(
initiation, token, std::ref(*this)
);
}
bool was_connected() const {

View File

@ -400,8 +400,9 @@ public:
return _stream.is_open();
}
void close_stream() {
_stream.close();
template <typename CompletionToken>
decltype(auto) async_shutdown(CompletionToken&& token) {
return _stream.async_shutdown(std::forward<CompletionToken>(token));
}
void cancel() {

View File

@ -15,6 +15,7 @@
#include <boost/mqtt5/detail/control_packet.hpp>
#include <boost/mqtt5/detail/internal_types.hpp>
#include <boost/mqtt5/detail/log_invoke.hpp>
#include <boost/mqtt5/detail/shutdown.hpp>
#include <boost/mqtt5/impl/codecs/message_decoders.hpp>
#include <boost/mqtt5/impl/codecs/message_encoders.hpp>
@ -54,6 +55,7 @@ class connect_op {
struct on_auth_data {};
struct on_send_auth {};
struct on_complete_auth {};
struct on_shutdown {};
Stream& _stream;
mqtt_ctx& _ctx;
@ -94,8 +96,7 @@ public:
return asio::get_associated_allocator(_handler);
}
using cancellation_slot_type =
asio::associated_cancellation_slot_t<handler_type>;
using cancellation_slot_type = asio::cancellation_slot;
cancellation_slot_type get_cancellation_slot() const noexcept {
return _cancellation_state.slot();
}
@ -207,7 +208,7 @@ public:
return complete(asio::error::operation_aborted);
if (ec)
return complete(asio::error::try_again);
return do_shutdown(asio::error::try_again);
_ctx.co_props[prop::authentication_data] = std::move(data);
send_connect();
@ -238,7 +239,7 @@ public:
return complete(asio::error::operation_aborted);
if (ec)
return complete(ec);
return do_shutdown(ec);
_buffer_ptr = std::make_unique<std::string>(min_packet_sz, char(0));
@ -256,12 +257,12 @@ public:
return complete(asio::error::operation_aborted);
if (ec)
return complete(ec);
return do_shutdown(ec);
auto code = control_code_e((*_buffer_ptr)[0] & 0b11110000);
if (code != control_code_e::auth && code != control_code_e::connack)
return complete(asio::error::try_again);
return do_shutdown(asio::error::try_again);
auto varlen_ptr = _buffer_ptr->cbegin() + 1;
auto varlen = decoders::type_parse(
@ -269,7 +270,7 @@ public:
);
if (!varlen)
return complete(asio::error::try_again);
return do_shutdown(asio::error::try_again);
auto varlen_sz = std::distance(_buffer_ptr->cbegin() + 1, varlen_ptr);
auto remain_len = *varlen -
@ -299,13 +300,13 @@ public:
return complete(asio::error::operation_aborted);
if (ec)
return complete(ec);
return do_shutdown(ec);
if (code == control_code_e::connack)
return on_connack(first, last);
if (!_ctx.co_props[prop::authentication_method].has_value())
return complete(client::error::malformed_packet);
return do_shutdown(client::error::malformed_packet);
on_auth(first, last);
}
@ -314,7 +315,7 @@ public:
auto packet_length = static_cast<uint32_t>(std::distance(first, last));
auto rv = decoders::decode_connack(packet_length, first);
if (!rv.has_value())
return complete(client::error::malformed_packet);
return do_shutdown(client::error::malformed_packet);
const auto& [session_present, reason_code, ca_props] = *rv;
_ctx.ca_props = ca_props;
@ -328,11 +329,11 @@ public:
auto rc = to_reason_code<reason_codes::category::connack>(reason_code);
if (!rc.has_value()) // reason code not allowed in CONNACK
return complete(client::error::malformed_packet);
return do_shutdown(client::error::malformed_packet);
_log.at_connack(*rc, session_present, ca_props);
if (*rc)
return complete(asio::error::try_again);
return do_shutdown(asio::error::try_again);
if (_ctx.co_props[prop::authentication_method].has_value())
return _ctx.authenticator.async_auth(
@ -348,7 +349,7 @@ public:
auto packet_length = static_cast<uint32_t>(std::distance(first, last));
auto rv = decoders::decode_auth(packet_length, first);
if (!rv.has_value())
return complete(client::error::malformed_packet);
return do_shutdown(client::error::malformed_packet);
const auto& [reason_code, auth_props] = *rv;
auto rc = to_reason_code<reason_codes::category::auth>(reason_code);
@ -357,7 +358,7 @@ public:
auth_props[prop::authentication_method]
!= _ctx.co_props[prop::authentication_method]
)
return complete(client::error::malformed_packet);
return do_shutdown(client::error::malformed_packet);
_ctx.authenticator.async_auth(
auth_step_e::server_challenge,
@ -371,7 +372,7 @@ public:
return complete(asio::error::operation_aborted);
if (ec)
return complete(asio::error::try_again);
return do_shutdown(asio::error::try_again);
auth_props props;
props[prop::authentication_method] =
@ -400,7 +401,7 @@ public:
return complete(asio::error::operation_aborted);
if (ec)
return complete(ec);
return do_shutdown(ec);
auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
asio::async_read(
@ -414,18 +415,34 @@ public:
return complete(asio::error::operation_aborted);
if (ec)
return complete(asio::error::try_again);
return do_shutdown(asio::error::try_again);
complete(error_code {});
}
void do_shutdown(error_code connect_ec) {
auto init_shutdown = [&stream = _stream](auto handler) {
async_shutdown(stream, std::move(handler));
};
auto token = asio::prepend(std::move(*this), on_shutdown{}, connect_ec);
return asio::async_initiate<decltype(token), void(error_code)>(
init_shutdown, token
);
}
void operator()(on_shutdown, error_code connect_ec, error_code) {
// ignore shutdown error_code
complete(connect_ec);
}
private:
bool is_cancelled() const {
return _cancellation_state.cancelled() != asio::cancellation_type::none;
}
void complete(error_code ec) {
_cancellation_state.slot().clear();
asio::get_associated_cancellation_slot(_handler).clear();
std::move(_handler)(ec);
}
};

View File

@ -45,6 +45,7 @@ class disconnect_op {
using client_service = ClientService;
struct on_disconnect {};
struct on_shutdown {};
std::shared_ptr<client_service> _svc_ptr;
DisconnectContext _context;
@ -143,15 +144,15 @@ public:
return complete(error_code {});
}
if (_context.terminal) {
return _svc_ptr->async_shutdown(
asio::prepend(std::move(*this), on_shutdown {})
);
}
void operator()(on_shutdown, error_code ec) {
if (_context.terminal)
_svc_ptr->cancel();
return complete(error_code {});
}
_svc_ptr->close_stream();
_svc_ptr->open_stream();
complete(error_code {});
complete(ec);
}
private:

View File

@ -18,6 +18,7 @@
#include <boost/mqtt5/impl/publish_rec_op.hpp>
#include <boost/mqtt5/impl/re_auth_op.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/asio/recycling_allocator.hpp>
@ -128,8 +129,9 @@ private:
.value_or(reason_codes::unspecified_error),
props
);
_svc_ptr->close_stream();
_svc_ptr->open_stream();
return _svc_ptr->async_shutdown(
asio::prepend(std::move(*this), on_disconnect {})
);
}
break;
case control_code_e::auth: {

View File

@ -0,0 +1,160 @@
//
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_MQTT5_SHUTDOWN_OP_HPP
#define BOOST_MQTT5_SHUTDOWN_OP_HPP
#include <boost/mqtt5/types.hpp>
#include <boost/mqtt5/detail/async_traits.hpp>
#include <boost/mqtt5/detail/shutdown.hpp>
#include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_cancellation_slot.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/prepend.hpp>
#include <array>
#include <chrono>
namespace boost::mqtt5::detail {
template <typename>
constexpr bool is_basic_socket = false;
template <typename P, typename E>
constexpr bool is_basic_socket<asio::basic_stream_socket<P, E>> = true;
namespace asio = boost::asio;
template <typename Owner>
class shutdown_op {
struct on_locked {};
struct on_shutdown {};
Owner& _owner;
using handler_type = asio::any_completion_handler<void (error_code)>;
handler_type _handler;
public:
template <typename Handler>
shutdown_op(Owner& owner, Handler&& handler) :
_owner(owner), _handler(std::move(handler))
{}
shutdown_op(shutdown_op&&) = default;
shutdown_op(const shutdown_op&) = delete;
shutdown_op& operator=(shutdown_op&&) = default;
shutdown_op& operator=(const shutdown_op&) = delete;
using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
}
using cancellation_slot_type =
asio::associated_cancellation_slot_t<handler_type>;
cancellation_slot_type get_cancellation_slot() const noexcept {
return asio::get_associated_cancellation_slot(_handler);
}
using executor_type = typename Owner::executor_type;
executor_type get_executor() const noexcept {
return _owner.get_executor();
}
void perform() {
if constexpr (is_basic_socket<typename Owner::stream_type>) {
error_code ec;
_owner._stream_ptr->shutdown(asio::socket_base::shutdown_both, ec);
return std::move(_handler)(error_code {});
}
else {
if (_owner._conn_mtx.is_locked())
return std::move(_handler)(error_code{});
auto s = std::move(_owner._stream_ptr);
_owner.replace_next_layer(_owner.construct_next_layer());
_owner.open();
_owner._conn_mtx.lock(
asio::prepend(std::move(*this), on_locked {}, std::move(s))
);
}
}
void operator()(on_locked, typename Owner::stream_ptr s, error_code ec) {
if (ec == asio::error::operation_aborted)
return complete(s, asio::error::operation_aborted);
if (!_owner.is_open()) {
_owner._conn_mtx.unlock();
return complete(s, asio::error::operation_aborted);
}
namespace asioex = boost::asio::experimental;
// wait max 5 seconds for the shutdown op to finish
_owner._connect_timer.expires_after(std::chrono::seconds(5));
auto init_shutdown = [](
auto handler, typename Owner::stream_type& stream
) {
async_shutdown(stream, std::move(handler));
};
auto timed_shutdown = asioex::make_parallel_group(
asio::async_initiate<const asio::deferred_t, void(error_code)>(
init_shutdown, asio::deferred, std::ref(*s)
),
_owner._connect_timer.async_wait(asio::deferred)
);
timed_shutdown.async_wait(
asioex::wait_for_one(),
asio::prepend(
std::move(*this), on_shutdown {},
std::move(s)
)
);
}
void operator()(
on_shutdown, typename Owner::stream_ptr sptr,
std::array<std::size_t, 2> /* ord */,
error_code /* shutdown_ec */, error_code /* timer_ec */
) {
_owner._conn_mtx.unlock();
if (!_owner.is_open())
return complete(sptr, asio::error::operation_aborted);
// ignore shutdown error_code
complete(sptr, error_code {});
}
private:
void complete(const typename Owner::stream_ptr& sptr, error_code ec) {
asio::get_associated_cancellation_slot(_handler).clear();
error_code close_ec;
lowest_layer(*sptr).close(close_ec);
std::move(_handler)(ec);
}
};
} // end namespace boost::mqtt5::detail
#endif // !BOOST_MQTT5_SHUTDOWN_OP_HPP

View File

@ -0,0 +1,34 @@
//
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_MQTT5_SSL_HPP
#define BOOST_MQTT5_SSL_HPP
#include <boost/mqtt5/detail/async_traits.hpp>
#include <boost/mqtt5/detail/shutdown.hpp>
#include <boost/mqtt5/types.hpp>
#include <boost/asio/ssl.hpp>
namespace boost::mqtt5 {
namespace detail {
// in namespace boost::mqtt5::detail to enable ADL
template <typename Stream, typename ShutdownHandler>
void async_shutdown(
boost::asio::ssl::stream<Stream>& stream, ShutdownHandler&& handler
) {
stream.async_shutdown(std::move(handler));
}
} // end namespace detail
} // end namespace boost::mqtt5
#endif // !BOOST_MQTT5_SSL_HPP

View File

@ -8,6 +8,9 @@
#ifndef BOOST_MQTT5_WEBSOCKET_HPP
#define BOOST_MQTT5_WEBSOCKET_HPP
#include <boost/mqtt5/detail/async_traits.hpp>
#include <boost/mqtt5/detail/shutdown.hpp>
#include <boost/mqtt5/types.hpp>
#include <boost/beast/http/field.hpp>
@ -49,6 +52,21 @@ struct ws_handshake_traits<boost::beast::websocket::stream<Stream>> {
}
};
namespace detail {
// in namespace boost::mqtt5::detail to enable ADL
template <typename Stream, typename ShutdownHandler>
void async_shutdown(
boost::beast::websocket::stream<Stream>& stream, ShutdownHandler&& handler
) {
stream.async_close(
beast::websocket::close_code::normal,
std::move(handler)
);
}
} // end namespace detail
} // end namespace boost::mqtt5
#endif // !BOOST_MQTT5_WEBSOCKET_HPP

View File

@ -0,0 +1,16 @@
//
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_MQTT5_WEBSOCKET_SSL_HPP
#define BOOST_MQTT5_WEBSOCKET_SSL_HPP
#include <boost/mqtt5/ssl.hpp>
#include <boost/mqtt5/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#endif // !BOOST_MQTT5_WEBSOCKET_SSL_HPP