mirror of
https://github.com/boostorg/mqtt5.git
synced 2025-07-30 04:27:34 +02:00
Add log invoke and logger
Summary: related to T15252, #24 Reviewers: ivica Reviewed By: ivica Subscribers: iljazovic, miljen Maniphest Tasks: T15252 Differential Revision: https://repo.mireo.local/D32382
This commit is contained in:
@ -39,6 +39,7 @@ make xml/index.xml
|
|||||||
|
|
||||||
# additional dependencies
|
# additional dependencies
|
||||||
../include/async_mqtt5/error.hpp
|
../include/async_mqtt5/error.hpp
|
||||||
|
../include/async_mqtt5/logger.hpp
|
||||||
../include/async_mqtt5/reason_codes.hpp
|
../include/async_mqtt5/reason_codes.hpp
|
||||||
../include/async_mqtt5/types.hpp
|
../include/async_mqtt5/types.hpp
|
||||||
../include/async_mqtt5/mqtt_client.hpp
|
../include/async_mqtt5/mqtt_client.hpp
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
<member><link linkend="async_mqtt5.ref.subscribe_options">subscribe_options</link></member>
|
<member><link linkend="async_mqtt5.ref.subscribe_options">subscribe_options</link></member>
|
||||||
<member><link linkend="async_mqtt5.ref.subscribe_topic">subscribe_topic</link></member>
|
<member><link linkend="async_mqtt5.ref.subscribe_topic">subscribe_topic</link></member>
|
||||||
<member><link linkend="async_mqtt5.ref.will">will</link></member>
|
<member><link linkend="async_mqtt5.ref.will">will</link></member>
|
||||||
|
<member><link linkend="async_mqtt5.ref.logger">logger</link></member>
|
||||||
</simplelist>
|
</simplelist>
|
||||||
<bridgehead renderas="sect3">Concepts</bridgehead>
|
<bridgehead renderas="sect3">Concepts</bridgehead>
|
||||||
<simplelist type="vert" columns="1">
|
<simplelist type="vert" columns="1">
|
||||||
@ -41,6 +42,7 @@
|
|||||||
<member><link linkend="async_mqtt5.ref.disconnect_rc_e">disconnect_rc_e</link></member>
|
<member><link linkend="async_mqtt5.ref.disconnect_rc_e">disconnect_rc_e</link></member>
|
||||||
<member><link linkend="async_mqtt5.ref.qos_e">qos_e</link></member>
|
<member><link linkend="async_mqtt5.ref.qos_e">qos_e</link></member>
|
||||||
<member><link linkend="async_mqtt5.ref.retain_e">retain_e</link></member>
|
<member><link linkend="async_mqtt5.ref.retain_e">retain_e</link></member>
|
||||||
|
<member><link linkend="async_mqtt5.ref.log_level">log_level</link></member>
|
||||||
</simplelist>
|
</simplelist>
|
||||||
<bridgehead renderas="sect3">Functions</bridgehead>
|
<bridgehead renderas="sect3">Functions</bridgehead>
|
||||||
<simplelist type="vert" columns="1">
|
<simplelist type="vert" columns="1">
|
||||||
|
@ -70,6 +70,7 @@ WARN_LOGFILE =
|
|||||||
# configuration options related to the input files
|
# configuration options related to the input files
|
||||||
#---------------------------------------------------------------------------
|
#---------------------------------------------------------------------------
|
||||||
INPUT = ../include/async_mqtt5/error.hpp \
|
INPUT = ../include/async_mqtt5/error.hpp \
|
||||||
|
../include/async_mqtt5/logger.hpp \
|
||||||
../include/async_mqtt5/reason_codes.hpp \
|
../include/async_mqtt5/reason_codes.hpp \
|
||||||
../include/async_mqtt5/types.hpp \
|
../include/async_mqtt5/types.hpp \
|
||||||
../include/async_mqtt5/mqtt_client.hpp
|
../include/async_mqtt5/mqtt_client.hpp
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
#define ASYNC_MQTT5_HPP
|
#define ASYNC_MQTT5_HPP
|
||||||
|
|
||||||
#include <async_mqtt5/error.hpp>
|
#include <async_mqtt5/error.hpp>
|
||||||
|
#include <async_mqtt5/logger.hpp>
|
||||||
#include <async_mqtt5/mqtt_client.hpp>
|
#include <async_mqtt5/mqtt_client.hpp>
|
||||||
#include <async_mqtt5/property_types.hpp>
|
#include <async_mqtt5/property_types.hpp>
|
||||||
#include <async_mqtt5/reason_codes.hpp>
|
#include <async_mqtt5/reason_codes.hpp>
|
||||||
|
@ -58,7 +58,7 @@ class control_packet {
|
|||||||
control_packet(
|
control_packet(
|
||||||
const Allocator& a,
|
const Allocator& a,
|
||||||
uint16_t packet_id, std::string packet
|
uint16_t packet_id, std::string packet
|
||||||
) noexcept :
|
) :
|
||||||
_packet_id(packet_id),
|
_packet_id(packet_id),
|
||||||
_packet(boost::allocate_unique<std::string>(a, std::move(packet)))
|
_packet(boost::allocate_unique<std::string>(a, std::move(packet)))
|
||||||
{}
|
{}
|
||||||
|
147
include/async_mqtt5/detail/log_invoke.hpp
Normal file
147
include/async_mqtt5/detail/log_invoke.hpp
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
//
|
||||||
|
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0.
|
||||||
|
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef ASYNC_MQTT5_LOG_INVOKE_HPP
|
||||||
|
#define ASYNC_MQTT5_LOG_INVOKE_HPP
|
||||||
|
|
||||||
|
#include <string_view>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
|
#include <boost/system/error_code.hpp>
|
||||||
|
#include <boost/type_traits/is_detected.hpp>
|
||||||
|
|
||||||
|
#include <async_mqtt5/reason_codes.hpp>
|
||||||
|
#include <async_mqtt5/property_types.hpp>
|
||||||
|
#include <async_mqtt5/types.hpp>
|
||||||
|
|
||||||
|
namespace async_mqtt5::detail {
|
||||||
|
|
||||||
|
namespace asio = boost::asio;
|
||||||
|
using boost::system::error_code;
|
||||||
|
|
||||||
|
// NOOP Logger
|
||||||
|
class noop_logger {};
|
||||||
|
|
||||||
|
// at_resolve
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
using at_resolve_sig = decltype(
|
||||||
|
std::declval<T&>().at_resolve(
|
||||||
|
std::declval<error_code>(),
|
||||||
|
std::declval<std::string_view>(), std::declval<std::string_view>(),
|
||||||
|
std::declval<const asio::ip::tcp::resolver::results_type&>()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
template <typename T>
|
||||||
|
constexpr bool has_at_resolve = boost::is_detected<at_resolve_sig, T>::value;
|
||||||
|
|
||||||
|
// at_tcp_connect
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
using at_tcp_connect_sig = decltype(
|
||||||
|
std::declval<T&>().at_tcp_connect(
|
||||||
|
std::declval<error_code>(), std::declval<asio::ip::tcp::endpoint>()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
template <typename T>
|
||||||
|
constexpr bool has_at_tcp_connect = boost::is_detected<at_tcp_connect_sig, T>::value;
|
||||||
|
|
||||||
|
// at_tls_handshake
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
using at_tls_handshake_sig = decltype(
|
||||||
|
std::declval<T&>().at_tls_handshake(
|
||||||
|
std::declval<error_code>(), std::declval<asio::ip::tcp::endpoint>()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
template <typename T>
|
||||||
|
constexpr bool has_at_tls_handshake = boost::is_detected<at_tls_handshake_sig, T>::value;
|
||||||
|
|
||||||
|
// at_ws_handshake
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
using at_ws_handshake_sig = decltype(
|
||||||
|
std::declval<T&>().at_ws_handshake(
|
||||||
|
std::declval<error_code>(), std::declval<asio::ip::tcp::endpoint>()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
template <typename T>
|
||||||
|
constexpr bool has_at_ws_handshake = boost::is_detected<at_ws_handshake_sig, T>::value;
|
||||||
|
|
||||||
|
// at_connack
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
using at_connack_sig = decltype(
|
||||||
|
std::declval<T&>().at_connack(
|
||||||
|
std::declval<reason_code>(),
|
||||||
|
std::declval<bool>(), std::declval<const connack_props&>()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
template <typename T>
|
||||||
|
constexpr bool has_at_connack = boost::is_detected<at_connack_sig, T>::value;
|
||||||
|
|
||||||
|
// at_disconnect
|
||||||
|
template <typename T>
|
||||||
|
using at_disconnect_sig = decltype(
|
||||||
|
std::declval<T&>().at_disconnect(
|
||||||
|
std::declval<reason_code>(), std::declval<const disconnect_props&>()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
template <typename T>
|
||||||
|
constexpr bool has_at_disconnect = boost::is_detected<at_disconnect_sig, T>::value;
|
||||||
|
|
||||||
|
template <typename LoggerType = noop_logger>
|
||||||
|
class log_invoke {
|
||||||
|
LoggerType _logger;
|
||||||
|
public:
|
||||||
|
explicit log_invoke(LoggerType&& logger = {}) :
|
||||||
|
_logger(std::forward<LoggerType>(logger))
|
||||||
|
{}
|
||||||
|
|
||||||
|
void at_resolve(
|
||||||
|
error_code ec, std::string_view host, std::string_view port,
|
||||||
|
const asio::ip::tcp::resolver::results_type& eps
|
||||||
|
) {
|
||||||
|
if constexpr (has_at_resolve<LoggerType>)
|
||||||
|
_logger.at_resolve(ec, host, port, eps);
|
||||||
|
}
|
||||||
|
|
||||||
|
void at_tcp_connect(error_code ec, asio::ip::tcp::endpoint ep) {
|
||||||
|
if constexpr (has_at_tcp_connect<LoggerType>)
|
||||||
|
_logger.at_tcp_connect(ec, ep);
|
||||||
|
}
|
||||||
|
|
||||||
|
void at_tls_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
|
||||||
|
if constexpr (has_at_tls_handshake<LoggerType>)
|
||||||
|
_logger.at_tls_handshake(ec, ep);
|
||||||
|
}
|
||||||
|
|
||||||
|
void at_ws_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
|
||||||
|
if constexpr (has_at_ws_handshake<LoggerType>)
|
||||||
|
_logger.at_ws_handshake(ec, ep);
|
||||||
|
}
|
||||||
|
|
||||||
|
void at_connack(
|
||||||
|
reason_code rc,
|
||||||
|
bool session_present, const connack_props& ca_props
|
||||||
|
) {
|
||||||
|
if constexpr (has_at_connack<LoggerType>)
|
||||||
|
_logger.at_connack(rc, session_present, ca_props);
|
||||||
|
}
|
||||||
|
|
||||||
|
void at_disconnect(reason_code rc, const disconnect_props& dc_props) {
|
||||||
|
if constexpr (has_at_disconnect<LoggerType>)
|
||||||
|
_logger.at_disconnect(rc, dc_props);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
} // end namespace async_mqtt5::detail
|
||||||
|
|
||||||
|
|
||||||
|
#endif // !ASYNC_MQTT5_LOG_INVOKE_HPP
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
#include <async_mqtt5/detail/async_mutex.hpp>
|
#include <async_mqtt5/detail/async_mutex.hpp>
|
||||||
#include <async_mqtt5/detail/async_traits.hpp>
|
#include <async_mqtt5/detail/async_traits.hpp>
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/impl/endpoints.hpp>
|
#include <async_mqtt5/impl/endpoints.hpp>
|
||||||
#include <async_mqtt5/impl/read_op.hpp>
|
#include <async_mqtt5/impl/read_op.hpp>
|
||||||
@ -33,13 +34,15 @@ using error_code = boost::system::error_code;
|
|||||||
|
|
||||||
template <
|
template <
|
||||||
typename StreamType,
|
typename StreamType,
|
||||||
typename StreamContext = std::monostate
|
typename StreamContext = std::monostate,
|
||||||
|
typename LoggerType = noop_logger
|
||||||
>
|
>
|
||||||
class autoconnect_stream {
|
class autoconnect_stream {
|
||||||
public:
|
public:
|
||||||
using self_type = autoconnect_stream<StreamType, StreamContext>;
|
using self_type = autoconnect_stream<StreamType, StreamContext, LoggerType>;
|
||||||
using stream_type = StreamType;
|
using stream_type = StreamType;
|
||||||
using stream_context_type = StreamContext;
|
using stream_context_type = StreamContext;
|
||||||
|
using logger_type = LoggerType;
|
||||||
using executor_type = typename stream_type::executor_type;
|
using executor_type = typename stream_type::executor_type;
|
||||||
private:
|
private:
|
||||||
using stream_ptr = std::shared_ptr<stream_type>;
|
using stream_ptr = std::shared_ptr<stream_type>;
|
||||||
@ -47,29 +50,33 @@ private:
|
|||||||
executor_type _stream_executor;
|
executor_type _stream_executor;
|
||||||
async_mutex _conn_mtx;
|
async_mutex _conn_mtx;
|
||||||
asio::steady_timer _read_timer, _connect_timer;
|
asio::steady_timer _read_timer, _connect_timer;
|
||||||
endpoints _endpoints;
|
endpoints<logger_type> _endpoints;
|
||||||
|
|
||||||
stream_ptr _stream_ptr;
|
stream_ptr _stream_ptr;
|
||||||
stream_context_type& _stream_context;
|
stream_context_type& _stream_context;
|
||||||
|
|
||||||
|
log_invoke<logger_type>& _log;
|
||||||
|
|
||||||
template <typename Owner, typename Handler>
|
template <typename Owner, typename Handler>
|
||||||
friend class read_op;
|
friend class read_op;
|
||||||
|
|
||||||
template <typename Owner, typename Handler>
|
template <typename Owner, typename Handler>
|
||||||
friend class write_op;
|
friend class write_op;
|
||||||
|
|
||||||
template <typename Stream>
|
template <typename Owner>
|
||||||
friend class reconnect_op;
|
friend class reconnect_op;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
autoconnect_stream(
|
autoconnect_stream(
|
||||||
const executor_type& ex, stream_context_type& context
|
const executor_type& ex, stream_context_type& context,
|
||||||
|
log_invoke<logger_type>& log
|
||||||
) :
|
) :
|
||||||
_stream_executor(ex),
|
_stream_executor(ex),
|
||||||
_conn_mtx(_stream_executor),
|
_conn_mtx(_stream_executor),
|
||||||
_read_timer(_stream_executor), _connect_timer(_stream_executor),
|
_read_timer(_stream_executor), _connect_timer(_stream_executor),
|
||||||
_endpoints(_stream_executor, _connect_timer),
|
_endpoints(_stream_executor, _connect_timer, log),
|
||||||
_stream_context(context)
|
_stream_context(context),
|
||||||
|
_log(log)
|
||||||
{
|
{
|
||||||
replace_next_layer(construct_next_layer());
|
replace_next_layer(construct_next_layer());
|
||||||
}
|
}
|
||||||
@ -166,6 +173,11 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
log_invoke<logger_type>& log() {
|
||||||
|
return _log;
|
||||||
|
}
|
||||||
|
|
||||||
static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
|
static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
|
||||||
error_code ec;
|
error_code ec;
|
||||||
auto& layer = lowest_layer(*sptr);
|
auto& layer = lowest_layer(*sptr);
|
||||||
|
@ -19,9 +19,8 @@
|
|||||||
#include <boost/asio/prepend.hpp>
|
#include <boost/asio/prepend.hpp>
|
||||||
#include <boost/asio/experimental/basic_channel.hpp>
|
#include <boost/asio/experimental/basic_channel.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/types.hpp>
|
|
||||||
|
|
||||||
#include <async_mqtt5/detail/channel_traits.hpp>
|
#include <async_mqtt5/detail/channel_traits.hpp>
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
#include <async_mqtt5/detail/internal_types.hpp>
|
#include <async_mqtt5/detail/internal_types.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/impl/assemble_op.hpp>
|
#include <async_mqtt5/impl/assemble_op.hpp>
|
||||||
@ -213,18 +212,20 @@ public:
|
|||||||
|
|
||||||
template <
|
template <
|
||||||
typename StreamType,
|
typename StreamType,
|
||||||
typename TlsContext = std::monostate
|
typename TlsContext = std::monostate,
|
||||||
|
typename LoggerType = noop_logger
|
||||||
>
|
>
|
||||||
class client_service {
|
class client_service {
|
||||||
using self_type = client_service<StreamType, TlsContext>;
|
using self_type = client_service<StreamType, TlsContext, LoggerType>;
|
||||||
using stream_context_type = stream_context<StreamType, TlsContext>;
|
using stream_context_type = stream_context<StreamType, TlsContext>;
|
||||||
using stream_type = autoconnect_stream<
|
using stream_type = autoconnect_stream<
|
||||||
StreamType, stream_context_type
|
StreamType, stream_context_type, LoggerType
|
||||||
>;
|
>;
|
||||||
public:
|
public:
|
||||||
using executor_type = typename stream_type::executor_type;
|
using executor_type = typename stream_type::executor_type;
|
||||||
private:
|
private:
|
||||||
using tls_context_type = TlsContext;
|
using tls_context_type = TlsContext;
|
||||||
|
using logger_type = LoggerType;
|
||||||
using receive_channel = asio::experimental::basic_channel<
|
using receive_channel = asio::experimental::basic_channel<
|
||||||
executor_type,
|
executor_type,
|
||||||
channel_traits<>,
|
channel_traits<>,
|
||||||
@ -251,6 +252,8 @@ private:
|
|||||||
|
|
||||||
executor_type _executor;
|
executor_type _executor;
|
||||||
|
|
||||||
|
log_invoke<logger_type> _log;
|
||||||
|
|
||||||
stream_context_type _stream_context;
|
stream_context_type _stream_context;
|
||||||
stream_type _stream;
|
stream_type _stream;
|
||||||
|
|
||||||
@ -267,8 +270,10 @@ private:
|
|||||||
asio::steady_timer _sentry_timer;
|
asio::steady_timer _sentry_timer;
|
||||||
|
|
||||||
client_service(const client_service& other) :
|
client_service(const client_service& other) :
|
||||||
_executor(other._executor), _stream_context(other._stream_context),
|
_executor(other._executor),
|
||||||
_stream(_executor, _stream_context),
|
_log(other._log),
|
||||||
|
_stream_context(other._stream_context),
|
||||||
|
_stream(_executor, _stream_context, _log),
|
||||||
_replies(_executor),
|
_replies(_executor),
|
||||||
_async_sender(*this),
|
_async_sender(*this),
|
||||||
_active_span(_read_buff.cend(), _read_buff.cend()),
|
_active_span(_read_buff.cend(), _read_buff.cend()),
|
||||||
@ -283,11 +288,12 @@ public:
|
|||||||
|
|
||||||
explicit client_service(
|
explicit client_service(
|
||||||
const executor_type& ex,
|
const executor_type& ex,
|
||||||
tls_context_type tls_context = {}
|
tls_context_type tls_context = {}, logger_type logger = {}
|
||||||
) :
|
) :
|
||||||
_executor(ex),
|
_executor(ex),
|
||||||
|
_log(std::move(logger)),
|
||||||
_stream_context(std::move(tls_context)),
|
_stream_context(std::move(tls_context)),
|
||||||
_stream(ex, _stream_context),
|
_stream(ex, _stream_context, _log),
|
||||||
_replies(ex),
|
_replies(ex),
|
||||||
_async_sender(*this),
|
_async_sender(*this),
|
||||||
_active_span(_read_buff.cend(), _read_buff.cend()),
|
_active_span(_read_buff.cend(), _read_buff.cend()),
|
||||||
@ -411,6 +417,10 @@ public:
|
|||||||
_stream.close();
|
_stream.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log_invoke<LoggerType>& log() {
|
||||||
|
return _log;
|
||||||
|
}
|
||||||
|
|
||||||
uint16_t allocate_pid() {
|
uint16_t allocate_pid() {
|
||||||
return _pid_allocator.allocate();
|
return _pid_allocator.allocate();
|
||||||
}
|
}
|
||||||
|
@ -37,13 +37,14 @@
|
|||||||
#include <async_mqtt5/detail/async_traits.hpp>
|
#include <async_mqtt5/detail/async_traits.hpp>
|
||||||
#include <async_mqtt5/detail/control_packet.hpp>
|
#include <async_mqtt5/detail/control_packet.hpp>
|
||||||
#include <async_mqtt5/detail/internal_types.hpp>
|
#include <async_mqtt5/detail/internal_types.hpp>
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/impl/codecs/message_decoders.hpp>
|
#include <async_mqtt5/impl/codecs/message_decoders.hpp>
|
||||||
#include <async_mqtt5/impl/codecs/message_encoders.hpp>
|
#include <async_mqtt5/impl/codecs/message_encoders.hpp>
|
||||||
|
|
||||||
namespace async_mqtt5::detail {
|
namespace async_mqtt5::detail {
|
||||||
|
|
||||||
template <typename Stream>
|
template <typename Stream, typename LoggerType>
|
||||||
class connect_op {
|
class connect_op {
|
||||||
static constexpr size_t min_packet_sz = 5;
|
static constexpr size_t min_packet_sz = 5;
|
||||||
|
|
||||||
@ -60,6 +61,7 @@ class connect_op {
|
|||||||
|
|
||||||
Stream& _stream;
|
Stream& _stream;
|
||||||
mqtt_ctx& _ctx;
|
mqtt_ctx& _ctx;
|
||||||
|
log_invoke<LoggerType>& _log;
|
||||||
|
|
||||||
using handler_type = asio::any_completion_handler<void (error_code)>;
|
using handler_type = asio::any_completion_handler<void (error_code)>;
|
||||||
handler_type _handler;
|
handler_type _handler;
|
||||||
@ -72,9 +74,11 @@ class connect_op {
|
|||||||
public:
|
public:
|
||||||
template <typename Handler>
|
template <typename Handler>
|
||||||
connect_op(
|
connect_op(
|
||||||
Stream& stream, mqtt_ctx& ctx, Handler&& handler
|
Stream& stream, mqtt_ctx& ctx,
|
||||||
|
log_invoke<LoggerType>& log,
|
||||||
|
Handler&& handler
|
||||||
) :
|
) :
|
||||||
_stream(stream), _ctx(ctx),
|
_stream(stream), _ctx(ctx), _log(log),
|
||||||
_handler(std::forward<Handler>(handler)),
|
_handler(std::forward<Handler>(handler)),
|
||||||
_cancellation_state(
|
_cancellation_state(
|
||||||
asio::get_associated_cancellation_slot(_handler),
|
asio::get_associated_cancellation_slot(_handler),
|
||||||
@ -121,6 +125,7 @@ public:
|
|||||||
if (is_cancelled())
|
if (is_cancelled())
|
||||||
return complete(asio::error::operation_aborted);
|
return complete(asio::error::operation_aborted);
|
||||||
|
|
||||||
|
_log.at_tcp_connect(ec, ep);
|
||||||
if (ec)
|
if (ec)
|
||||||
return complete(ec);
|
return complete(ec);
|
||||||
|
|
||||||
@ -153,19 +158,19 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void operator()(
|
void operator()(
|
||||||
on_tls_handshake, error_code ec,
|
on_tls_handshake, error_code ec, endpoint ep, authority_path ap
|
||||||
endpoint ep, authority_path ap
|
|
||||||
) {
|
) {
|
||||||
if (is_cancelled())
|
if (is_cancelled())
|
||||||
return complete(asio::error::operation_aborted);
|
return complete(asio::error::operation_aborted);
|
||||||
|
|
||||||
|
_log.at_tls_handshake(ec, ep);
|
||||||
if (ec)
|
if (ec)
|
||||||
return complete(ec);
|
return complete(ec);
|
||||||
|
|
||||||
do_ws_handshake(std::move(ep), std::move(ap));
|
do_ws_handshake(std::move(ep), std::move(ap));
|
||||||
}
|
}
|
||||||
|
|
||||||
void do_ws_handshake(endpoint, authority_path ap) {
|
void do_ws_handshake(endpoint ep, authority_path ap) {
|
||||||
if constexpr (has_ws_handshake<Stream>) {
|
if constexpr (has_ws_handshake<Stream>) {
|
||||||
using namespace boost::beast;
|
using namespace boost::beast;
|
||||||
|
|
||||||
@ -189,17 +194,23 @@ public:
|
|||||||
|
|
||||||
_stream.async_handshake(
|
_stream.async_handshake(
|
||||||
ap.host + ':' + ap.port, ap.path,
|
ap.host + ':' + ap.port, ap.path,
|
||||||
asio::prepend(std::move(*this), on_ws_handshake {})
|
asio::append(
|
||||||
|
asio::prepend(std::move(*this), on_ws_handshake {}),
|
||||||
|
ep
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
(*this)(on_ws_handshake {}, error_code {});
|
(*this)(on_ws_handshake {}, error_code {}, ep);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator()(on_ws_handshake, error_code ec) {
|
void operator()(on_ws_handshake, error_code ec, endpoint ep) {
|
||||||
if (is_cancelled())
|
if (is_cancelled())
|
||||||
return complete(asio::error::operation_aborted);
|
return complete(asio::error::operation_aborted);
|
||||||
|
|
||||||
|
if constexpr (has_ws_handshake<Stream>)
|
||||||
|
_log.at_ws_handshake(ec, ep);
|
||||||
|
|
||||||
if (ec)
|
if (ec)
|
||||||
return complete(ec);
|
return complete(ec);
|
||||||
|
|
||||||
@ -343,6 +354,7 @@ public:
|
|||||||
if (!rc.has_value()) // reason code not allowed in CONNACK
|
if (!rc.has_value()) // reason code not allowed in CONNACK
|
||||||
return complete(client::error::malformed_packet);
|
return complete(client::error::malformed_packet);
|
||||||
|
|
||||||
|
_log.at_connack(*rc, session_present, ca_props);
|
||||||
if (*rc)
|
if (*rc)
|
||||||
return complete(asio::error::try_again);
|
return complete(asio::error::try_again);
|
||||||
|
|
||||||
@ -401,7 +413,7 @@ public:
|
|||||||
detail::async_write(
|
detail::async_write(
|
||||||
_stream, asio::buffer(wire_data),
|
_stream, asio::buffer(wire_data),
|
||||||
asio::consign(
|
asio::consign(
|
||||||
asio::prepend(std::move(*this), on_send_auth{}),
|
asio::prepend(std::move(*this), on_send_auth {}),
|
||||||
std::move(packet)
|
std::move(packet)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
#include <boost/spirit/home/x3.hpp>
|
#include <boost/spirit/home/x3.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/types.hpp>
|
#include <async_mqtt5/types.hpp>
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
|
|
||||||
namespace async_mqtt5::detail {
|
namespace async_mqtt5::detail {
|
||||||
|
|
||||||
@ -107,11 +108,13 @@ public:
|
|||||||
error_code timer_ec, authority_path ap
|
error_code timer_ec, authority_path ap
|
||||||
) {
|
) {
|
||||||
if (
|
if (
|
||||||
ord[0] == 0 && resolve_ec == asio::error::operation_aborted ||
|
(ord[0] == 0 && resolve_ec == asio::error::operation_aborted) ||
|
||||||
ord[0] == 1 && timer_ec == asio::error::operation_aborted
|
(ord[0] == 1 && timer_ec == asio::error::operation_aborted)
|
||||||
)
|
)
|
||||||
return complete(asio::error::operation_aborted, {}, {});
|
return complete(asio::error::operation_aborted, {}, {});
|
||||||
|
|
||||||
|
resolve_ec = timer_ec ? resolve_ec : asio::error::timed_out;
|
||||||
|
_owner._log.at_resolve(resolve_ec, ap.host, ap.port, epts);
|
||||||
if (!resolve_ec)
|
if (!resolve_ec)
|
||||||
return complete(error_code {}, std::move(epts), std::move(ap));
|
return complete(error_code {}, std::move(epts), std::move(ap));
|
||||||
|
|
||||||
@ -136,7 +139,10 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
template <typename LoggerType>
|
||||||
class endpoints {
|
class endpoints {
|
||||||
|
using logger_type = LoggerType;
|
||||||
|
|
||||||
asio::ip::tcp::resolver _resolver;
|
asio::ip::tcp::resolver _resolver;
|
||||||
asio::steady_timer& _connect_timer;
|
asio::steady_timer& _connect_timer;
|
||||||
|
|
||||||
@ -144,6 +150,8 @@ class endpoints {
|
|||||||
|
|
||||||
int _current_host { -1 };
|
int _current_host { -1 };
|
||||||
|
|
||||||
|
log_invoke<logger_type>& _log;
|
||||||
|
|
||||||
template <typename Owner, typename Handler>
|
template <typename Owner, typename Handler>
|
||||||
friend class resolve_op;
|
friend class resolve_op;
|
||||||
|
|
||||||
@ -159,8 +167,12 @@ class endpoints {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
template <typename Executor>
|
template <typename Executor>
|
||||||
endpoints(Executor ex, asio::steady_timer& timer) :
|
endpoints(
|
||||||
_resolver(ex), _connect_timer(timer)
|
Executor ex, asio::steady_timer& timer,
|
||||||
|
log_invoke<logger_type>& log
|
||||||
|
) :
|
||||||
|
_resolver(std::move(ex)), _connect_timer(timer),
|
||||||
|
_log(log)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
endpoints(const endpoints&) = delete;
|
endpoints(const endpoints&) = delete;
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
#include <boost/asio/recycling_allocator.hpp>
|
#include <boost/asio/recycling_allocator.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/types.hpp>
|
#include <async_mqtt5/types.hpp>
|
||||||
|
#include <async_mqtt5/reason_codes.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/detail/control_packet.hpp>
|
#include <async_mqtt5/detail/control_packet.hpp>
|
||||||
|
|
||||||
@ -114,6 +115,20 @@ private:
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case control_code_e::disconnect: {
|
case control_code_e::disconnect: {
|
||||||
|
auto rv = decoders::decode_disconnect(
|
||||||
|
static_cast<uint32_t>(std::distance(first, last)), first
|
||||||
|
);
|
||||||
|
if (!rv.has_value())
|
||||||
|
return on_malformed_packet(
|
||||||
|
"Malformed DISCONNECT received: cannot decode"
|
||||||
|
);
|
||||||
|
|
||||||
|
const auto& [rc, props] = *rv;
|
||||||
|
_svc_ptr->log().at_disconnect(
|
||||||
|
to_reason_code<reason_codes::category::disconnect>(rc)
|
||||||
|
.value_or(reason_codes::unspecified_error),
|
||||||
|
props
|
||||||
|
);
|
||||||
_svc_ptr->close_stream();
|
_svc_ptr->close_stream();
|
||||||
_svc_ptr->open_stream();
|
_svc_ptr->open_stream();
|
||||||
}
|
}
|
||||||
|
@ -182,9 +182,10 @@ public:
|
|||||||
|
|
||||||
auto init_connect = [](
|
auto init_connect = [](
|
||||||
auto handler, typename Owner::stream_type& stream,
|
auto handler, typename Owner::stream_type& stream,
|
||||||
mqtt_ctx& context, endpoint ep, authority_path ap
|
mqtt_ctx& context, log_invoke<typename Owner::logger_type>& log,
|
||||||
|
endpoint ep, authority_path ap
|
||||||
) {
|
) {
|
||||||
connect_op { stream, context, std::move(handler) }
|
connect_op { stream, context, log, std::move(handler) }
|
||||||
.perform(ep, std::move(ap));
|
.perform(ep, std::move(ap));
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -192,6 +193,7 @@ public:
|
|||||||
asio::async_initiate<const asio::deferred_t, void(error_code)>(
|
asio::async_initiate<const asio::deferred_t, void(error_code)>(
|
||||||
init_connect, asio::deferred, std::ref(*sptr),
|
init_connect, asio::deferred, std::ref(*sptr),
|
||||||
std::ref(_owner._stream_context.mqtt_context()),
|
std::ref(_owner._stream_context.mqtt_context()),
|
||||||
|
std::ref(_owner.log()),
|
||||||
ep, ap
|
ep, ap
|
||||||
),
|
),
|
||||||
_owner._connect_timer.async_wait(asio::deferred)
|
_owner._connect_timer.async_wait(asio::deferred)
|
||||||
|
202
include/async_mqtt5/logger.hpp
Normal file
202
include/async_mqtt5/logger.hpp
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
//
|
||||||
|
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0.
|
||||||
|
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef ASYNC_MQTT5_LOGGER_HPP
|
||||||
|
#define ASYNC_MQTT5_LOGGER_HPP
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <string_view>
|
||||||
|
|
||||||
|
#include <boost/system/error_code.hpp>
|
||||||
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
|
|
||||||
|
#include <async_mqtt5/reason_codes.hpp>
|
||||||
|
#include <async_mqtt5/property_types.hpp>
|
||||||
|
#include <async_mqtt5/types.hpp>
|
||||||
|
|
||||||
|
#include <async_mqtt5/impl/codecs/traits.hpp>
|
||||||
|
|
||||||
|
namespace async_mqtt5 {
|
||||||
|
|
||||||
|
namespace asio = boost::asio;
|
||||||
|
using error_code = boost::system::error_code;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Represents the severity level of log messages.
|
||||||
|
*/
|
||||||
|
enum class log_level : uint8_t {
|
||||||
|
/** Error messages that indicate serious issues. **/
|
||||||
|
error = 1,
|
||||||
|
|
||||||
|
/** Warnings that indicate potential problems or non-critical issues. **/
|
||||||
|
warning,
|
||||||
|
|
||||||
|
/** Informational messages that highlight normal application behaviour and events. **/
|
||||||
|
info,
|
||||||
|
|
||||||
|
/** Detailed messages useful for diagnosing issues. **/
|
||||||
|
debug
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief A logger class that can used by the \ref mqtt_client to output
|
||||||
|
* the results of operations to stderr.
|
||||||
|
*
|
||||||
|
* \details All functions are invoked directly within the \ref mqtt_client using
|
||||||
|
* its default executor. If the \ref mqtt_client is initialized with an explicit or
|
||||||
|
* implicit strand, none of the functions will be invoked concurrently.
|
||||||
|
*
|
||||||
|
* \par Thread safety
|
||||||
|
* ['Distinct objects]: unsafe. \n
|
||||||
|
* ['Shared objects]: unsafe. \n
|
||||||
|
* This class is [*not thread-safe].
|
||||||
|
*/
|
||||||
|
class logger {
|
||||||
|
constexpr static auto prefix = "[Async.MQTT5]";
|
||||||
|
|
||||||
|
log_level _level;
|
||||||
|
public:
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Constructs a logger that filters log messages based on the specified log level.
|
||||||
|
*
|
||||||
|
* \param level Messages with a log level higher than the given log level will be suppressed.
|
||||||
|
*/
|
||||||
|
logger(log_level level = log_level::warning) : _level(level) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Outputs the results of the resolve operation.
|
||||||
|
*
|
||||||
|
* \param ec Error code returned by the resolve operation.
|
||||||
|
* \param host Hostname used in the resolve operation.
|
||||||
|
* \param port Port used in the resolve operation.
|
||||||
|
* \param eps Endpoints returned by the resolve operation.
|
||||||
|
*/
|
||||||
|
void at_resolve(
|
||||||
|
error_code ec, std::string_view host, std::string_view port,
|
||||||
|
const asio::ip::tcp::resolver::results_type& eps
|
||||||
|
) {
|
||||||
|
if (!ec && _level < log_level::info)
|
||||||
|
return;
|
||||||
|
|
||||||
|
write_prefix();
|
||||||
|
std::clog
|
||||||
|
<< "resolve: "
|
||||||
|
<< host << ":" << port;
|
||||||
|
std::clog << " - " << ec.message();
|
||||||
|
|
||||||
|
if (_level == log_level::debug) {
|
||||||
|
std::clog << " [";
|
||||||
|
for (auto it = eps.begin(); it != eps.end();) {
|
||||||
|
std::clog << it->endpoint().address().to_string();
|
||||||
|
if (++it != eps.end())
|
||||||
|
std::clog << ",";
|
||||||
|
}
|
||||||
|
std::clog << "]" << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Outputs the results of the TCP connect operation.
|
||||||
|
*
|
||||||
|
* \param ec Error code returned by the TCP connect operation.
|
||||||
|
* \param ep The TCP endpoint used to establish the TCP connection.
|
||||||
|
*/
|
||||||
|
void at_tcp_connect(error_code ec, asio::ip::tcp::endpoint ep) {
|
||||||
|
if (!ec && _level < log_level::info)
|
||||||
|
return;
|
||||||
|
|
||||||
|
write_prefix();
|
||||||
|
std::clog
|
||||||
|
<< "connect: "
|
||||||
|
<< ep.address().to_string() << ":" << ep.port()
|
||||||
|
<< " - " << ec.message()
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Outputs the results of the TLS handshake operation.
|
||||||
|
*
|
||||||
|
* \param ec Error code returned by the TLS handshake operation.
|
||||||
|
* \param ep The TCP endpoint used to establish the TLS handshake.
|
||||||
|
*/
|
||||||
|
void at_tls_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
|
||||||
|
if (!ec && _level < log_level::info)
|
||||||
|
return;
|
||||||
|
|
||||||
|
write_prefix();
|
||||||
|
std::clog
|
||||||
|
<< "tls handshake: "
|
||||||
|
<< ep.address().to_string() << ":" << ep.port()
|
||||||
|
<< " - " << ec.message()
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Outputs the results of the WebSocket handshake operation.
|
||||||
|
*
|
||||||
|
* \param ec Error code returned by the WebSocket handshake operation.
|
||||||
|
* \param ep The TCP endpoint used to establish the WebSocket handshake.
|
||||||
|
*/
|
||||||
|
void at_ws_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
|
||||||
|
if (!ec && _level < log_level::info)
|
||||||
|
return;
|
||||||
|
|
||||||
|
write_prefix();
|
||||||
|
std::clog
|
||||||
|
<< "ws handshake: "
|
||||||
|
<< ep.address().to_string() << ":" << ep.port()
|
||||||
|
<< " - " << ec.message()
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Outputs the contents of the \__CONNACK\__ packet sent by the Broker.
|
||||||
|
*
|
||||||
|
* \param rc Reason Code in the received \__CONNACK\__ packet indicating
|
||||||
|
* the result of the MQTT handshake.
|
||||||
|
* \param session_present A flag indicating whether the Broker already has a session associated
|
||||||
|
* with this connection.
|
||||||
|
* \param ca_props \__CONNACK_PROPS\__ received in the \__CONNACK\__ packet.
|
||||||
|
*/
|
||||||
|
void at_connack(
|
||||||
|
reason_code rc,
|
||||||
|
bool /* session_present */, const connack_props& /* ca_props */
|
||||||
|
) {
|
||||||
|
if (!rc && _level < log_level::info)
|
||||||
|
return;
|
||||||
|
|
||||||
|
write_prefix();
|
||||||
|
std::clog << "connack: " << rc.message() << ".";
|
||||||
|
std::clog << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Outputs the contents of the \__DISCONNECT\__ packet sent by the Broker.
|
||||||
|
*
|
||||||
|
* \param rc Reason Code in the received \__DISCONNECT\__ packet indicating
|
||||||
|
* the reason behind the disconnection.
|
||||||
|
* \param dc_props \__DISCONNECT_PROPS\__ received in the \__DISCONNECT\__ packet.
|
||||||
|
*/
|
||||||
|
void at_disconnect(reason_code rc, const disconnect_props& dc_props) {
|
||||||
|
write_prefix();
|
||||||
|
std::clog << "disconnect: " << rc.message() << ".";
|
||||||
|
if (dc_props[prop::reason_string].has_value())
|
||||||
|
std::clog << " Reason string: " << * dc_props[prop::reason_string];
|
||||||
|
std::clog << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void write_prefix() {
|
||||||
|
std::clog << prefix << " ";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // end namespace async_mqtt5
|
||||||
|
|
||||||
|
|
||||||
|
#endif // !ASYNC_MQTT5_LOGGER_HPP
|
@ -20,6 +20,7 @@
|
|||||||
#include <async_mqtt5/error.hpp>
|
#include <async_mqtt5/error.hpp>
|
||||||
#include <async_mqtt5/types.hpp>
|
#include <async_mqtt5/types.hpp>
|
||||||
|
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
#include <async_mqtt5/detail/rebind_executor.hpp>
|
#include <async_mqtt5/detail/rebind_executor.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/impl/client_service.hpp>
|
#include <async_mqtt5/impl/client_service.hpp>
|
||||||
@ -49,7 +50,8 @@ namespace asio = boost::asio;
|
|||||||
*/
|
*/
|
||||||
template <
|
template <
|
||||||
typename StreamType,
|
typename StreamType,
|
||||||
typename TlsContext = std::monostate
|
typename TlsContext = std::monostate,
|
||||||
|
typename LoggerType = detail::noop_logger
|
||||||
>
|
>
|
||||||
class mqtt_client {
|
class mqtt_client {
|
||||||
public:
|
public:
|
||||||
@ -69,9 +71,10 @@ public:
|
|||||||
private:
|
private:
|
||||||
using stream_type = StreamType;
|
using stream_type = StreamType;
|
||||||
using tls_context_type = TlsContext;
|
using tls_context_type = TlsContext;
|
||||||
|
using logger_type = LoggerType;
|
||||||
|
|
||||||
using client_service_type = detail::client_service<
|
using client_service_type = detail::client_service<
|
||||||
stream_type, tls_context_type
|
stream_type, tls_context_type, logger_type
|
||||||
>;
|
>;
|
||||||
using impl_type = std::shared_ptr<client_service_type>;
|
using impl_type = std::shared_ptr<client_service_type>;
|
||||||
impl_type _impl;
|
impl_type _impl;
|
||||||
@ -81,14 +84,15 @@ public:
|
|||||||
* \brief Constructs a Client with given parameters.
|
* \brief Constructs a Client with given parameters.
|
||||||
*
|
*
|
||||||
* \param ex An executor that will be associated with the Client.
|
* \param ex An executor that will be associated with the Client.
|
||||||
* \param tls_context A context object used in TLS/SLL connection.
|
* \param tls_context A context object used in TLS/SSL connection.
|
||||||
|
* \param logger An object used to log events within the Client.
|
||||||
*/
|
*/
|
||||||
explicit mqtt_client(
|
explicit mqtt_client(
|
||||||
const executor_type& ex,
|
const executor_type& ex,
|
||||||
TlsContext tls_context = {}
|
tls_context_type tls_context = {}, logger_type logger = {}
|
||||||
) :
|
) :
|
||||||
_impl(std::make_shared<client_service_type>(
|
_impl(std::make_shared<client_service_type>(
|
||||||
ex, std::move(tls_context)
|
ex, std::move(tls_context), std::move(logger)
|
||||||
))
|
))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
@ -97,7 +101,8 @@ public:
|
|||||||
*
|
*
|
||||||
* \tparam \__ExecutionContext\__ Type of a concrete execution context.
|
* \tparam \__ExecutionContext\__ Type of a concrete execution context.
|
||||||
* \param context Execution context whose executor will be associated with the Client.
|
* \param context Execution context whose executor will be associated with the Client.
|
||||||
* \param tls_context A context object used in TLS/SLL connection.
|
* \param tls_context A context object used in TLS/SSL connection.
|
||||||
|
* \param logger An object used to log events within the Client.
|
||||||
*
|
*
|
||||||
* \par Precondition
|
* \par Precondition
|
||||||
* \code
|
* \code
|
||||||
@ -113,9 +118,12 @@ public:
|
|||||||
>
|
>
|
||||||
explicit mqtt_client(
|
explicit mqtt_client(
|
||||||
ExecutionContext& context,
|
ExecutionContext& context,
|
||||||
TlsContext tls_context = {}
|
tls_context_type tls_context = {}, logger_type logger = {}
|
||||||
) :
|
) :
|
||||||
mqtt_client(context.get_executor(), std::move(tls_context))
|
mqtt_client(
|
||||||
|
context.get_executor(),
|
||||||
|
std::move(tls_context), std::move(logger)
|
||||||
|
)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -171,8 +179,7 @@ public:
|
|||||||
typename Ctx = TlsContext,
|
typename Ctx = TlsContext,
|
||||||
std::enable_if_t<!std::is_same_v<Ctx, std::monostate>, bool> = true
|
std::enable_if_t<!std::is_same_v<Ctx, std::monostate>, bool> = true
|
||||||
>
|
>
|
||||||
decltype(auto) tls_context()
|
decltype(auto) tls_context() {
|
||||||
{
|
|
||||||
return _impl->tls_context();
|
return _impl->tls_context();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -973,7 +980,6 @@ public:
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
} // end namespace async_mqtt5
|
} // end namespace async_mqtt5
|
||||||
|
|
||||||
#endif // !ASYNC_MQTT5_MQTT_CLIENT_HPP
|
#endif // !ASYNC_MQTT5_MQTT_CLIENT_HPP
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
#include <async_mqtt5/detail/async_mutex.hpp>
|
#include <async_mqtt5/detail/async_mutex.hpp>
|
||||||
#include <async_mqtt5/detail/async_traits.hpp>
|
#include <async_mqtt5/detail/async_traits.hpp>
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/impl/endpoints.hpp>
|
#include <async_mqtt5/impl/endpoints.hpp>
|
||||||
|
|
||||||
@ -28,7 +29,8 @@ using error_code = boost::system::error_code;
|
|||||||
|
|
||||||
template <
|
template <
|
||||||
typename StreamType,
|
typename StreamType,
|
||||||
typename StreamContext = std::monostate
|
typename StreamContext = std::monostate,
|
||||||
|
typename LoggerType = async_mqtt5::detail::noop_logger
|
||||||
>
|
>
|
||||||
class test_autoconnect_stream {
|
class test_autoconnect_stream {
|
||||||
public:
|
public:
|
||||||
@ -36,28 +38,33 @@ public:
|
|||||||
using stream_ptr = std::shared_ptr<stream_type>;
|
using stream_ptr = std::shared_ptr<stream_type>;
|
||||||
using stream_context_type = StreamContext;
|
using stream_context_type = StreamContext;
|
||||||
using executor_type = typename stream_type::executor_type;
|
using executor_type = typename stream_type::executor_type;
|
||||||
|
using logger_type = LoggerType;
|
||||||
private:
|
private:
|
||||||
executor_type _stream_executor;
|
executor_type _stream_executor;
|
||||||
detail::async_mutex _conn_mtx;
|
detail::async_mutex _conn_mtx;
|
||||||
asio::steady_timer _connect_timer;
|
asio::steady_timer _connect_timer;
|
||||||
detail::endpoints _endpoints;
|
detail::endpoints<logger_type> _endpoints;
|
||||||
|
|
||||||
stream_ptr _stream_ptr;
|
stream_ptr _stream_ptr;
|
||||||
stream_context_type& _stream_context;
|
stream_context_type& _stream_context;
|
||||||
|
|
||||||
|
detail::log_invoke<logger_type> _log;
|
||||||
|
|
||||||
template <typename Stream>
|
template <typename Stream>
|
||||||
friend class async_mqtt5::detail::reconnect_op;
|
friend class async_mqtt5::detail::reconnect_op;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
test_autoconnect_stream(
|
test_autoconnect_stream(
|
||||||
const executor_type& ex, stream_context_type& context
|
const executor_type& ex,
|
||||||
|
stream_context_type& context,
|
||||||
|
detail::log_invoke<logger_type>& log
|
||||||
) :
|
) :
|
||||||
_stream_executor(ex),
|
_stream_executor(ex),
|
||||||
_conn_mtx(_stream_executor),
|
_conn_mtx(_stream_executor),
|
||||||
_connect_timer(_stream_executor),
|
_connect_timer(_stream_executor),
|
||||||
_endpoints(_stream_executor, _connect_timer),
|
_endpoints(_stream_executor, _connect_timer, log),
|
||||||
_stream_context(context)
|
_stream_context(context),
|
||||||
|
_log(log)
|
||||||
{
|
{
|
||||||
replace_next_layer(construct_next_layer());
|
replace_next_layer(construct_next_layer());
|
||||||
open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
|
open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
|
||||||
@ -115,6 +122,11 @@ public:
|
|||||||
close();
|
close();
|
||||||
std::exchange(_stream_ptr, std::move(sptr));
|
std::exchange(_stream_ptr, std::move(sptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
detail::log_invoke<logger_type>& log() {
|
||||||
|
return _log;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end namespace async_mqtt5::test
|
} // end namespace async_mqtt5::test
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
#include <boost/asio/ip/tcp.hpp>
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/types.hpp>
|
#include <async_mqtt5/types.hpp>
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
#include <async_mqtt5/detail/internal_types.hpp>
|
#include <async_mqtt5/detail/internal_types.hpp>
|
||||||
#include <async_mqtt5/impl/connect_op.hpp>
|
#include <async_mqtt5/impl/connect_op.hpp>
|
||||||
|
|
||||||
@ -64,8 +65,9 @@ void run_unit_test(
|
|||||||
std::move(h)(ec);
|
std::move(h)(ec);
|
||||||
};
|
};
|
||||||
|
|
||||||
detail::connect_op<test::test_stream>(
|
detail::log_invoke d;
|
||||||
stream, mqtt_ctx, std::move(handler)
|
detail::connect_op<test::test_stream, detail::noop_logger>(
|
||||||
|
stream, mqtt_ctx, d, std::move(handler)
|
||||||
).perform(*std::begin(eps), std::move(ap));
|
).perform(*std::begin(eps), std::move(ap));
|
||||||
|
|
||||||
ioc.run_for(1s);
|
ioc.run_for(1s);
|
||||||
|
198
test/unit/logger.cpp
Normal file
198
test/unit/logger.cpp
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
//
|
||||||
|
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0.
|
||||||
|
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <boost/test/unit_test.hpp>
|
||||||
|
#include <boost/test/tools/output_test_stream.hpp>
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
#include <boost/asio/detached.hpp>
|
||||||
|
#include <boost/asio/io_context.hpp>
|
||||||
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
|
#include <boost/asio/ssl/stream.hpp>
|
||||||
|
|
||||||
|
#include <boost/beast/websocket/stream.hpp>
|
||||||
|
#include <boost/beast/ssl/ssl_stream.hpp> // async_teardown specialization for websocket ssl stream
|
||||||
|
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
|
|
||||||
|
#include <async_mqtt5/mqtt_client.hpp>
|
||||||
|
#include <async_mqtt5/logger.hpp>
|
||||||
|
|
||||||
|
#include "test_common/message_exchange.hpp"
|
||||||
|
#include "test_common/test_service.hpp"
|
||||||
|
#include "test_common/test_stream.hpp"
|
||||||
|
|
||||||
|
using namespace async_mqtt5;
|
||||||
|
namespace asio = boost::asio;
|
||||||
|
|
||||||
|
namespace async_mqtt5 {
|
||||||
|
|
||||||
|
template <typename StreamBase>
|
||||||
|
struct tls_handshake_type<asio::ssl::stream<StreamBase>> {
|
||||||
|
static constexpr auto client = asio::ssl::stream_base::client;
|
||||||
|
static constexpr auto server = asio::ssl::stream_base::server;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename StreamBase>
|
||||||
|
void assign_tls_sni(
|
||||||
|
const authority_path& ap,
|
||||||
|
asio::ssl::context& /* ctx */,
|
||||||
|
asio::ssl::stream<StreamBase>& stream
|
||||||
|
) {
|
||||||
|
SSL_set_tlsext_host_name(stream.native_handle(), ap.host.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end namespace async_mqtt5
|
||||||
|
|
||||||
|
|
||||||
|
void logger_test() {
|
||||||
|
BOOST_STATIC_ASSERT(detail::has_at_resolve<logger>);
|
||||||
|
BOOST_STATIC_ASSERT(detail::has_at_tcp_connect<logger>);
|
||||||
|
BOOST_STATIC_ASSERT(detail::has_at_tls_handshake<logger>);
|
||||||
|
BOOST_STATIC_ASSERT(detail::has_at_ws_handshake<logger>);
|
||||||
|
BOOST_STATIC_ASSERT(detail::has_at_connack<logger>);
|
||||||
|
BOOST_STATIC_ASSERT(detail::has_at_disconnect<logger>);
|
||||||
|
}
|
||||||
|
|
||||||
|
using stream_type = boost::beast::websocket::stream<
|
||||||
|
asio::ssl::stream<asio::ip::tcp::socket>
|
||||||
|
>;
|
||||||
|
using context_type = asio::ssl::context;
|
||||||
|
using logger_type = logger;
|
||||||
|
using client_type = mqtt_client<stream_type, context_type, logger_type>;
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_SUITE(logger_tests)
|
||||||
|
|
||||||
|
class clog_redirect {
|
||||||
|
std::streambuf* _old_buffer;
|
||||||
|
|
||||||
|
public:
|
||||||
|
clog_redirect(
|
||||||
|
std::streambuf* new_buffer
|
||||||
|
) :
|
||||||
|
_old_buffer(std::clog.rdbuf(new_buffer))
|
||||||
|
{}
|
||||||
|
|
||||||
|
~clog_redirect() {
|
||||||
|
std::clog.rdbuf(_old_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
bool contains(const std::string& str, const std::string& substr) {
|
||||||
|
return str.find(substr) != std::string::npos;
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(successful_connect_debug) {
|
||||||
|
boost::test_tools::output_test_stream output;
|
||||||
|
|
||||||
|
{
|
||||||
|
clog_redirect guard(output.rdbuf());
|
||||||
|
asio::io_context ioc;
|
||||||
|
|
||||||
|
asio::ssl::context tls_context(asio::ssl::context::tls_client);
|
||||||
|
client_type c(
|
||||||
|
ioc, std::move(tls_context), logger(log_level::debug)
|
||||||
|
);
|
||||||
|
|
||||||
|
c.brokers("broker.hivemq.com/mqtt", 8884)
|
||||||
|
.async_run(asio::detached);
|
||||||
|
|
||||||
|
c.async_disconnect([](error_code) {});
|
||||||
|
|
||||||
|
ioc.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string log = output.rdbuf()->str();
|
||||||
|
BOOST_TEST_MESSAGE(log);
|
||||||
|
BOOST_TEST_WARN(contains(log, "resolve"));
|
||||||
|
BOOST_TEST_WARN(contains(log, "connect"));
|
||||||
|
BOOST_TEST_WARN(contains(log, "tls handshake"));
|
||||||
|
BOOST_TEST_WARN(contains(log, "ws handshake"));
|
||||||
|
BOOST_TEST_WARN(contains(log, "connack"));
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(successful_connect_warning) {
|
||||||
|
boost::test_tools::output_test_stream output;
|
||||||
|
|
||||||
|
{
|
||||||
|
clog_redirect guard(output.rdbuf());
|
||||||
|
|
||||||
|
asio::io_context ioc;
|
||||||
|
asio::ssl::context tls_context(asio::ssl::context::tls_client);
|
||||||
|
client_type c(
|
||||||
|
ioc, std::move(tls_context), logger(log_level::warning)
|
||||||
|
);
|
||||||
|
|
||||||
|
c.brokers("broker.hivemq.com/mqtt", 8884)
|
||||||
|
.async_run(asio::detached);
|
||||||
|
|
||||||
|
c.async_disconnect([](error_code) {});
|
||||||
|
|
||||||
|
ioc.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If connection is successful, nothing should be printed.
|
||||||
|
// However if the Broker is down or overloaded, this will cause logs to be printed.
|
||||||
|
// We should not fail the test because of it.
|
||||||
|
BOOST_TEST_WARN(output.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(disconnect) {
|
||||||
|
using test::after;
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
|
boost::test_tools::output_test_stream output;
|
||||||
|
{
|
||||||
|
clog_redirect guard(output.rdbuf());
|
||||||
|
|
||||||
|
// packets
|
||||||
|
auto connect = encoders::encode_connect(
|
||||||
|
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
|
||||||
|
);
|
||||||
|
auto connack = encoders::encode_connack(false, uint8_t(0x00), {});
|
||||||
|
|
||||||
|
disconnect_props dc_props;
|
||||||
|
dc_props[prop::reason_string] = "No reason.";
|
||||||
|
auto disconnect = encoders::encode_disconnect(0x00, dc_props);
|
||||||
|
|
||||||
|
test::msg_exchange broker_side;
|
||||||
|
broker_side
|
||||||
|
.expect(connect)
|
||||||
|
.complete_with(error_code{}, after(0ms))
|
||||||
|
.reply_with(connack, after(0ms))
|
||||||
|
.send(disconnect, after(50ms))
|
||||||
|
.expect(connect);
|
||||||
|
|
||||||
|
asio::io_context ioc;
|
||||||
|
auto executor = ioc.get_executor();
|
||||||
|
auto& broker = asio::make_service<test::test_broker>(
|
||||||
|
ioc, executor, std::move(broker_side)
|
||||||
|
);
|
||||||
|
|
||||||
|
mqtt_client<test::test_stream, std::monostate, logger> c(executor);
|
||||||
|
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
|
||||||
|
.async_run(asio::detached);
|
||||||
|
|
||||||
|
asio::steady_timer timer(c.get_executor());
|
||||||
|
timer.expires_after(100ms);
|
||||||
|
timer.async_wait([&c](error_code) { c.cancel(); });
|
||||||
|
|
||||||
|
ioc.run();
|
||||||
|
BOOST_TEST(broker.received_all_expected());
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string log = output.rdbuf()->str();
|
||||||
|
BOOST_TEST_MESSAGE(log);
|
||||||
|
BOOST_TEST(contains(log, "disconnect"));
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_SUITE_END();
|
@ -14,6 +14,8 @@
|
|||||||
#include <boost/asio/post.hpp>
|
#include <boost/asio/post.hpp>
|
||||||
#include <boost/asio/prepend.hpp>
|
#include <boost/asio/prepend.hpp>
|
||||||
|
|
||||||
|
#include <async_mqtt5/detail/log_invoke.hpp>
|
||||||
|
|
||||||
#include <async_mqtt5/impl/client_service.hpp>
|
#include <async_mqtt5/impl/client_service.hpp>
|
||||||
#include <async_mqtt5/impl/reconnect_op.hpp>
|
#include <async_mqtt5/impl/reconnect_op.hpp>
|
||||||
|
|
||||||
@ -111,7 +113,8 @@ void run_connect_to_localhost_test(int succeed_after) {
|
|||||||
);
|
);
|
||||||
|
|
||||||
auto stream_ctx = stream_context(std::monostate {});
|
auto stream_ctx = stream_context(std::monostate {});
|
||||||
auto auto_stream = astream(ioc.get_executor(), stream_ctx);
|
auto log = detail::log_invoke();
|
||||||
|
auto auto_stream = astream(ioc.get_executor(), stream_ctx, log);
|
||||||
auto_stream.brokers("localhost", 1883);
|
auto_stream.brokers("localhost", 1883);
|
||||||
|
|
||||||
auto handler = [&handlers_called](error_code ec) {
|
auto handler = [&handlers_called](error_code ec) {
|
||||||
@ -144,7 +147,8 @@ BOOST_AUTO_TEST_CASE(no_servers) {
|
|||||||
|
|
||||||
asio::io_context ioc;
|
asio::io_context ioc;
|
||||||
auto stream_ctx = stream_context(std::monostate{});
|
auto stream_ctx = stream_context(std::monostate{});
|
||||||
auto auto_stream = astream(ioc.get_executor(), stream_ctx);
|
auto log = detail::log_invoke();
|
||||||
|
auto auto_stream = astream(ioc.get_executor(), stream_ctx, log);
|
||||||
auto_stream.brokers("", 1883);
|
auto_stream.brokers("", 1883);
|
||||||
|
|
||||||
auto handler = [&handlers_called](error_code ec) {
|
auto handler = [&handlers_called](error_code ec) {
|
||||||
|
Reference in New Issue
Block a user