diff --git a/doc/Jamfile b/doc/Jamfile
index e18ef34..4e579c2 100644
--- a/doc/Jamfile
+++ b/doc/Jamfile
@@ -39,6 +39,7 @@ make xml/index.xml
# additional dependencies
../include/async_mqtt5/error.hpp
+ ../include/async_mqtt5/logger.hpp
../include/async_mqtt5/reason_codes.hpp
../include/async_mqtt5/types.hpp
../include/async_mqtt5/mqtt_client.hpp
diff --git a/doc/qbk/reference/quickref.xml b/doc/qbk/reference/quickref.xml
index 5c338ab..d52a007 100644
--- a/doc/qbk/reference/quickref.xml
+++ b/doc/qbk/reference/quickref.xml
@@ -20,6 +20,7 @@
subscribe_options
subscribe_topic
will
+ logger
Concepts
@@ -41,6 +42,7 @@
disconnect_rc_e
qos_e
retain_e
+ log_level
Functions
diff --git a/doc/reference.dox b/doc/reference.dox
index 5df93c7..ee9fc2b 100644
--- a/doc/reference.dox
+++ b/doc/reference.dox
@@ -70,6 +70,7 @@ WARN_LOGFILE =
# configuration options related to the input files
#---------------------------------------------------------------------------
INPUT = ../include/async_mqtt5/error.hpp \
+ ../include/async_mqtt5/logger.hpp \
../include/async_mqtt5/reason_codes.hpp \
../include/async_mqtt5/types.hpp \
../include/async_mqtt5/mqtt_client.hpp
diff --git a/include/async_mqtt5.hpp b/include/async_mqtt5.hpp
index 26c1334..027dcfd 100644
--- a/include/async_mqtt5.hpp
+++ b/include/async_mqtt5.hpp
@@ -9,6 +9,7 @@
#define ASYNC_MQTT5_HPP
#include
+#include
#include
#include
#include
diff --git a/include/async_mqtt5/detail/control_packet.hpp b/include/async_mqtt5/detail/control_packet.hpp
index c7103f1..3d248cc 100644
--- a/include/async_mqtt5/detail/control_packet.hpp
+++ b/include/async_mqtt5/detail/control_packet.hpp
@@ -58,7 +58,7 @@ class control_packet {
control_packet(
const Allocator& a,
uint16_t packet_id, std::string packet
- ) noexcept :
+ ) :
_packet_id(packet_id),
_packet(boost::allocate_unique(a, std::move(packet)))
{}
diff --git a/include/async_mqtt5/detail/log_invoke.hpp b/include/async_mqtt5/detail/log_invoke.hpp
new file mode 100644
index 0000000..e8819bf
--- /dev/null
+++ b/include/async_mqtt5/detail/log_invoke.hpp
@@ -0,0 +1,147 @@
+//
+// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
+//
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef ASYNC_MQTT5_LOG_INVOKE_HPP
+#define ASYNC_MQTT5_LOG_INVOKE_HPP
+
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+namespace async_mqtt5::detail {
+
+namespace asio = boost::asio;
+using boost::system::error_code;
+
+// NOOP Logger
+class noop_logger {};
+
+// at_resolve
+
+template
+using at_resolve_sig = decltype(
+ std::declval().at_resolve(
+ std::declval(),
+ std::declval(), std::declval(),
+ std::declval()
+ )
+);
+template
+constexpr bool has_at_resolve = boost::is_detected::value;
+
+// at_tcp_connect
+
+template
+using at_tcp_connect_sig = decltype(
+ std::declval().at_tcp_connect(
+ std::declval(), std::declval()
+ )
+);
+template
+constexpr bool has_at_tcp_connect = boost::is_detected::value;
+
+// at_tls_handshake
+
+template
+using at_tls_handshake_sig = decltype(
+ std::declval().at_tls_handshake(
+ std::declval(), std::declval()
+ )
+);
+template
+constexpr bool has_at_tls_handshake = boost::is_detected::value;
+
+// at_ws_handshake
+
+template
+using at_ws_handshake_sig = decltype(
+ std::declval().at_ws_handshake(
+ std::declval(), std::declval()
+ )
+);
+template
+constexpr bool has_at_ws_handshake = boost::is_detected::value;
+
+// at_connack
+
+template
+using at_connack_sig = decltype(
+ std::declval().at_connack(
+ std::declval(),
+ std::declval(), std::declval()
+ )
+);
+template
+constexpr bool has_at_connack = boost::is_detected::value;
+
+// at_disconnect
+template
+using at_disconnect_sig = decltype(
+ std::declval().at_disconnect(
+ std::declval(), std::declval()
+ )
+);
+template
+constexpr bool has_at_disconnect = boost::is_detected::value;
+
+template
+class log_invoke {
+ LoggerType _logger;
+public:
+ explicit log_invoke(LoggerType&& logger = {}) :
+ _logger(std::forward(logger))
+ {}
+
+ void at_resolve(
+ error_code ec, std::string_view host, std::string_view port,
+ const asio::ip::tcp::resolver::results_type& eps
+ ) {
+ if constexpr (has_at_resolve)
+ _logger.at_resolve(ec, host, port, eps);
+ }
+
+ void at_tcp_connect(error_code ec, asio::ip::tcp::endpoint ep) {
+ if constexpr (has_at_tcp_connect)
+ _logger.at_tcp_connect(ec, ep);
+ }
+
+ void at_tls_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
+ if constexpr (has_at_tls_handshake)
+ _logger.at_tls_handshake(ec, ep);
+ }
+
+ void at_ws_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
+ if constexpr (has_at_ws_handshake)
+ _logger.at_ws_handshake(ec, ep);
+ }
+
+ void at_connack(
+ reason_code rc,
+ bool session_present, const connack_props& ca_props
+ ) {
+ if constexpr (has_at_connack)
+ _logger.at_connack(rc, session_present, ca_props);
+ }
+
+ void at_disconnect(reason_code rc, const disconnect_props& dc_props) {
+ if constexpr (has_at_disconnect)
+ _logger.at_disconnect(rc, dc_props);
+ }
+
+};
+
+} // end namespace async_mqtt5::detail
+
+
+#endif // !ASYNC_MQTT5_LOG_INVOKE_HPP
diff --git a/include/async_mqtt5/impl/autoconnect_stream.hpp b/include/async_mqtt5/impl/autoconnect_stream.hpp
index 06a2e16..7101395 100644
--- a/include/async_mqtt5/impl/autoconnect_stream.hpp
+++ b/include/async_mqtt5/impl/autoconnect_stream.hpp
@@ -20,6 +20,7 @@
#include
#include
+#include
#include
#include
@@ -33,13 +34,15 @@ using error_code = boost::system::error_code;
template <
typename StreamType,
- typename StreamContext = std::monostate
+ typename StreamContext = std::monostate,
+ typename LoggerType = noop_logger
>
class autoconnect_stream {
public:
- using self_type = autoconnect_stream;
+ using self_type = autoconnect_stream;
using stream_type = StreamType;
using stream_context_type = StreamContext;
+ using logger_type = LoggerType;
using executor_type = typename stream_type::executor_type;
private:
using stream_ptr = std::shared_ptr;
@@ -47,29 +50,33 @@ private:
executor_type _stream_executor;
async_mutex _conn_mtx;
asio::steady_timer _read_timer, _connect_timer;
- endpoints _endpoints;
+ endpoints _endpoints;
stream_ptr _stream_ptr;
stream_context_type& _stream_context;
+ log_invoke& _log;
+
template
friend class read_op;
template
friend class write_op;
- template
+ template
friend class reconnect_op;
public:
autoconnect_stream(
- const executor_type& ex, stream_context_type& context
+ const executor_type& ex, stream_context_type& context,
+ log_invoke& log
) :
_stream_executor(ex),
_conn_mtx(_stream_executor),
_read_timer(_stream_executor), _connect_timer(_stream_executor),
- _endpoints(_stream_executor, _connect_timer),
- _stream_context(context)
+ _endpoints(_stream_executor, _connect_timer, log),
+ _stream_context(context),
+ _log(log)
{
replace_next_layer(construct_next_layer());
}
@@ -166,6 +173,11 @@ public:
}
private:
+
+ log_invoke& log() {
+ return _log;
+ }
+
static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
error_code ec;
auto& layer = lowest_layer(*sptr);
diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp
index 4a2393f..8f12726 100644
--- a/include/async_mqtt5/impl/client_service.hpp
+++ b/include/async_mqtt5/impl/client_service.hpp
@@ -19,9 +19,8 @@
#include
#include
-#include
-
#include
+#include
#include
#include
@@ -213,18 +212,20 @@ public:
template <
typename StreamType,
- typename TlsContext = std::monostate
+ typename TlsContext = std::monostate,
+ typename LoggerType = noop_logger
>
class client_service {
- using self_type = client_service;
+ using self_type = client_service;
using stream_context_type = stream_context;
using stream_type = autoconnect_stream<
- StreamType, stream_context_type
+ StreamType, stream_context_type, LoggerType
>;
public:
using executor_type = typename stream_type::executor_type;
private:
using tls_context_type = TlsContext;
+ using logger_type = LoggerType;
using receive_channel = asio::experimental::basic_channel<
executor_type,
channel_traits<>,
@@ -251,6 +252,8 @@ private:
executor_type _executor;
+ log_invoke _log;
+
stream_context_type _stream_context;
stream_type _stream;
@@ -267,8 +270,10 @@ private:
asio::steady_timer _sentry_timer;
client_service(const client_service& other) :
- _executor(other._executor), _stream_context(other._stream_context),
- _stream(_executor, _stream_context),
+ _executor(other._executor),
+ _log(other._log),
+ _stream_context(other._stream_context),
+ _stream(_executor, _stream_context, _log),
_replies(_executor),
_async_sender(*this),
_active_span(_read_buff.cend(), _read_buff.cend()),
@@ -283,11 +288,12 @@ public:
explicit client_service(
const executor_type& ex,
- tls_context_type tls_context = {}
+ tls_context_type tls_context = {}, logger_type logger = {}
) :
_executor(ex),
+ _log(std::move(logger)),
_stream_context(std::move(tls_context)),
- _stream(ex, _stream_context),
+ _stream(ex, _stream_context, _log),
_replies(ex),
_async_sender(*this),
_active_span(_read_buff.cend(), _read_buff.cend()),
@@ -411,6 +417,10 @@ public:
_stream.close();
}
+ log_invoke& log() {
+ return _log;
+ }
+
uint16_t allocate_pid() {
return _pid_allocator.allocate();
}
diff --git a/include/async_mqtt5/impl/connect_op.hpp b/include/async_mqtt5/impl/connect_op.hpp
index de69d26..28d620b 100644
--- a/include/async_mqtt5/impl/connect_op.hpp
+++ b/include/async_mqtt5/impl/connect_op.hpp
@@ -37,13 +37,14 @@
#include
#include
#include
+#include
#include
#include
namespace async_mqtt5::detail {
-template
+template
class connect_op {
static constexpr size_t min_packet_sz = 5;
@@ -60,6 +61,7 @@ class connect_op {
Stream& _stream;
mqtt_ctx& _ctx;
+ log_invoke& _log;
using handler_type = asio::any_completion_handler;
handler_type _handler;
@@ -72,9 +74,11 @@ class connect_op {
public:
template
connect_op(
- Stream& stream, mqtt_ctx& ctx, Handler&& handler
+ Stream& stream, mqtt_ctx& ctx,
+ log_invoke& log,
+ Handler&& handler
) :
- _stream(stream), _ctx(ctx),
+ _stream(stream), _ctx(ctx), _log(log),
_handler(std::forward(handler)),
_cancellation_state(
asio::get_associated_cancellation_slot(_handler),
@@ -121,6 +125,7 @@ public:
if (is_cancelled())
return complete(asio::error::operation_aborted);
+ _log.at_tcp_connect(ec, ep);
if (ec)
return complete(ec);
@@ -153,19 +158,19 @@ public:
}
void operator()(
- on_tls_handshake, error_code ec,
- endpoint ep, authority_path ap
+ on_tls_handshake, error_code ec, endpoint ep, authority_path ap
) {
if (is_cancelled())
return complete(asio::error::operation_aborted);
+ _log.at_tls_handshake(ec, ep);
if (ec)
return complete(ec);
do_ws_handshake(std::move(ep), std::move(ap));
}
- void do_ws_handshake(endpoint, authority_path ap) {
+ void do_ws_handshake(endpoint ep, authority_path ap) {
if constexpr (has_ws_handshake) {
using namespace boost::beast;
@@ -189,17 +194,23 @@ public:
_stream.async_handshake(
ap.host + ':' + ap.port, ap.path,
- asio::prepend(std::move(*this), on_ws_handshake {})
+ asio::append(
+ asio::prepend(std::move(*this), on_ws_handshake {}),
+ ep
+ )
);
}
else
- (*this)(on_ws_handshake {}, error_code {});
+ (*this)(on_ws_handshake {}, error_code {}, ep);
}
- void operator()(on_ws_handshake, error_code ec) {
+ void operator()(on_ws_handshake, error_code ec, endpoint ep) {
if (is_cancelled())
return complete(asio::error::operation_aborted);
+ if constexpr (has_ws_handshake)
+ _log.at_ws_handshake(ec, ep);
+
if (ec)
return complete(ec);
@@ -343,6 +354,7 @@ public:
if (!rc.has_value()) // reason code not allowed in CONNACK
return complete(client::error::malformed_packet);
+ _log.at_connack(*rc, session_present, ca_props);
if (*rc)
return complete(asio::error::try_again);
@@ -401,7 +413,7 @@ public:
detail::async_write(
_stream, asio::buffer(wire_data),
asio::consign(
- asio::prepend(std::move(*this), on_send_auth{}),
+ asio::prepend(std::move(*this), on_send_auth {}),
std::move(packet)
)
);
diff --git a/include/async_mqtt5/impl/endpoints.hpp b/include/async_mqtt5/impl/endpoints.hpp
index 6094ed6..a96875e 100644
--- a/include/async_mqtt5/impl/endpoints.hpp
+++ b/include/async_mqtt5/impl/endpoints.hpp
@@ -27,6 +27,7 @@
#include
#include
+#include
namespace async_mqtt5::detail {
@@ -107,11 +108,13 @@ public:
error_code timer_ec, authority_path ap
) {
if (
- ord[0] == 0 && resolve_ec == asio::error::operation_aborted ||
- ord[0] == 1 && timer_ec == asio::error::operation_aborted
+ (ord[0] == 0 && resolve_ec == asio::error::operation_aborted) ||
+ (ord[0] == 1 && timer_ec == asio::error::operation_aborted)
)
return complete(asio::error::operation_aborted, {}, {});
+ resolve_ec = timer_ec ? resolve_ec : asio::error::timed_out;
+ _owner._log.at_resolve(resolve_ec, ap.host, ap.port, epts);
if (!resolve_ec)
return complete(error_code {}, std::move(epts), std::move(ap));
@@ -136,7 +139,10 @@ private:
};
+template
class endpoints {
+ using logger_type = LoggerType;
+
asio::ip::tcp::resolver _resolver;
asio::steady_timer& _connect_timer;
@@ -144,6 +150,8 @@ class endpoints {
int _current_host { -1 };
+ log_invoke& _log;
+
template
friend class resolve_op;
@@ -159,8 +167,12 @@ class endpoints {
public:
template
- endpoints(Executor ex, asio::steady_timer& timer) :
- _resolver(ex), _connect_timer(timer)
+ endpoints(
+ Executor ex, asio::steady_timer& timer,
+ log_invoke& log
+ ) :
+ _resolver(std::move(ex)), _connect_timer(timer),
+ _log(log)
{}
endpoints(const endpoints&) = delete;
diff --git a/include/async_mqtt5/impl/read_message_op.hpp b/include/async_mqtt5/impl/read_message_op.hpp
index 06ffe0e..53a24fe 100644
--- a/include/async_mqtt5/impl/read_message_op.hpp
+++ b/include/async_mqtt5/impl/read_message_op.hpp
@@ -16,6 +16,7 @@
#include
#include
+#include
#include
@@ -114,6 +115,20 @@ private:
}
break;
case control_code_e::disconnect: {
+ auto rv = decoders::decode_disconnect(
+ static_cast(std::distance(first, last)), first
+ );
+ if (!rv.has_value())
+ return on_malformed_packet(
+ "Malformed DISCONNECT received: cannot decode"
+ );
+
+ const auto& [rc, props] = *rv;
+ _svc_ptr->log().at_disconnect(
+ to_reason_code(rc)
+ .value_or(reason_codes::unspecified_error),
+ props
+ );
_svc_ptr->close_stream();
_svc_ptr->open_stream();
}
diff --git a/include/async_mqtt5/impl/reconnect_op.hpp b/include/async_mqtt5/impl/reconnect_op.hpp
index 2b00769..38341b3 100644
--- a/include/async_mqtt5/impl/reconnect_op.hpp
+++ b/include/async_mqtt5/impl/reconnect_op.hpp
@@ -182,9 +182,10 @@ public:
auto init_connect = [](
auto handler, typename Owner::stream_type& stream,
- mqtt_ctx& context, endpoint ep, authority_path ap
+ mqtt_ctx& context, log_invoke& log,
+ endpoint ep, authority_path ap
) {
- connect_op { stream, context, std::move(handler) }
+ connect_op { stream, context, log, std::move(handler) }
.perform(ep, std::move(ap));
};
@@ -192,6 +193,7 @@ public:
asio::async_initiate(
init_connect, asio::deferred, std::ref(*sptr),
std::ref(_owner._stream_context.mqtt_context()),
+ std::ref(_owner.log()),
ep, ap
),
_owner._connect_timer.async_wait(asio::deferred)
diff --git a/include/async_mqtt5/logger.hpp b/include/async_mqtt5/logger.hpp
new file mode 100644
index 0000000..8040761
--- /dev/null
+++ b/include/async_mqtt5/logger.hpp
@@ -0,0 +1,202 @@
+//
+// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
+//
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef ASYNC_MQTT5_LOGGER_HPP
+#define ASYNC_MQTT5_LOGGER_HPP
+
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+
+namespace async_mqtt5 {
+
+namespace asio = boost::asio;
+using error_code = boost::system::error_code;
+
+/**
+ * \brief Represents the severity level of log messages.
+ */
+enum class log_level : uint8_t {
+ /** Error messages that indicate serious issues. **/
+ error = 1,
+
+ /** Warnings that indicate potential problems or non-critical issues. **/
+ warning,
+
+ /** Informational messages that highlight normal application behaviour and events. **/
+ info,
+
+ /** Detailed messages useful for diagnosing issues. **/
+ debug
+};
+
+/**
+ * \brief A logger class that can used by the \ref mqtt_client to output
+ * the results of operations to stderr.
+ *
+ * \details All functions are invoked directly within the \ref mqtt_client using
+ * its default executor. If the \ref mqtt_client is initialized with an explicit or
+ * implicit strand, none of the functions will be invoked concurrently.
+ *
+ * \par Thread safety
+ * ['Distinct objects]: unsafe. \n
+ * ['Shared objects]: unsafe. \n
+ * This class is [*not thread-safe].
+*/
+class logger {
+ constexpr static auto prefix = "[Async.MQTT5]";
+
+ log_level _level;
+public:
+
+ /**
+ * \brief Constructs a logger that filters log messages based on the specified log level.
+ *
+ * \param level Messages with a log level higher than the given log level will be suppressed.
+ */
+ logger(log_level level = log_level::warning) : _level(level) {}
+
+ /**
+ * \brief Outputs the results of the resolve operation.
+ *
+ * \param ec Error code returned by the resolve operation.
+ * \param host Hostname used in the resolve operation.
+ * \param port Port used in the resolve operation.
+ * \param eps Endpoints returned by the resolve operation.
+ */
+ void at_resolve(
+ error_code ec, std::string_view host, std::string_view port,
+ const asio::ip::tcp::resolver::results_type& eps
+ ) {
+ if (!ec && _level < log_level::info)
+ return;
+
+ write_prefix();
+ std::clog
+ << "resolve: "
+ << host << ":" << port;
+ std::clog << " - " << ec.message();
+
+ if (_level == log_level::debug) {
+ std::clog << " [";
+ for (auto it = eps.begin(); it != eps.end();) {
+ std::clog << it->endpoint().address().to_string();
+ if (++it != eps.end())
+ std::clog << ",";
+ }
+ std::clog << "]" << std::endl;
+ }
+ }
+
+ /**
+ * \brief Outputs the results of the TCP connect operation.
+ *
+ * \param ec Error code returned by the TCP connect operation.
+ * \param ep The TCP endpoint used to establish the TCP connection.
+ */
+ void at_tcp_connect(error_code ec, asio::ip::tcp::endpoint ep) {
+ if (!ec && _level < log_level::info)
+ return;
+
+ write_prefix();
+ std::clog
+ << "connect: "
+ << ep.address().to_string() << ":" << ep.port()
+ << " - " << ec.message()
+ << std::endl;
+ }
+
+ /**
+ * \brief Outputs the results of the TLS handshake operation.
+ *
+ * \param ec Error code returned by the TLS handshake operation.
+ * \param ep The TCP endpoint used to establish the TLS handshake.
+ */
+ void at_tls_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
+ if (!ec && _level < log_level::info)
+ return;
+
+ write_prefix();
+ std::clog
+ << "tls handshake: "
+ << ep.address().to_string() << ":" << ep.port()
+ << " - " << ec.message()
+ << std::endl;
+ }
+
+ /**
+ * \brief Outputs the results of the WebSocket handshake operation.
+ *
+ * \param ec Error code returned by the WebSocket handshake operation.
+ * \param ep The TCP endpoint used to establish the WebSocket handshake.
+ */
+ void at_ws_handshake(error_code ec, asio::ip::tcp::endpoint ep) {
+ if (!ec && _level < log_level::info)
+ return;
+
+ write_prefix();
+ std::clog
+ << "ws handshake: "
+ << ep.address().to_string() << ":" << ep.port()
+ << " - " << ec.message()
+ << std::endl;
+ }
+
+ /**
+ * \brief Outputs the contents of the \__CONNACK\__ packet sent by the Broker.
+ *
+ * \param rc Reason Code in the received \__CONNACK\__ packet indicating
+ * the result of the MQTT handshake.
+ * \param session_present A flag indicating whether the Broker already has a session associated
+ * with this connection.
+ * \param ca_props \__CONNACK_PROPS\__ received in the \__CONNACK\__ packet.
+ */
+ void at_connack(
+ reason_code rc,
+ bool /* session_present */, const connack_props& /* ca_props */
+ ) {
+ if (!rc && _level < log_level::info)
+ return;
+
+ write_prefix();
+ std::clog << "connack: " << rc.message() << ".";
+ std::clog << std::endl;
+ }
+
+ /**
+ * \brief Outputs the contents of the \__DISCONNECT\__ packet sent by the Broker.
+ *
+ * \param rc Reason Code in the received \__DISCONNECT\__ packet indicating
+ * the reason behind the disconnection.
+ * \param dc_props \__DISCONNECT_PROPS\__ received in the \__DISCONNECT\__ packet.
+ */
+ void at_disconnect(reason_code rc, const disconnect_props& dc_props) {
+ write_prefix();
+ std::clog << "disconnect: " << rc.message() << ".";
+ if (dc_props[prop::reason_string].has_value())
+ std::clog << " Reason string: " << * dc_props[prop::reason_string];
+ std::clog << std::endl;
+ }
+
+private:
+ void write_prefix() {
+ std::clog << prefix << " ";
+ }
+};
+
+} // end namespace async_mqtt5
+
+
+#endif // !ASYNC_MQTT5_LOGGER_HPP
diff --git a/include/async_mqtt5/mqtt_client.hpp b/include/async_mqtt5/mqtt_client.hpp
index adaf7c4..67be565 100644
--- a/include/async_mqtt5/mqtt_client.hpp
+++ b/include/async_mqtt5/mqtt_client.hpp
@@ -20,6 +20,7 @@
#include
#include
+#include
#include
#include
@@ -49,7 +50,8 @@ namespace asio = boost::asio;
*/
template <
typename StreamType,
- typename TlsContext = std::monostate
+ typename TlsContext = std::monostate,
+ typename LoggerType = detail::noop_logger
>
class mqtt_client {
public:
@@ -69,9 +71,10 @@ public:
private:
using stream_type = StreamType;
using tls_context_type = TlsContext;
+ using logger_type = LoggerType;
using client_service_type = detail::client_service<
- stream_type, tls_context_type
+ stream_type, tls_context_type, logger_type
>;
using impl_type = std::shared_ptr;
impl_type _impl;
@@ -81,14 +84,15 @@ public:
* \brief Constructs a Client with given parameters.
*
* \param ex An executor that will be associated with the Client.
- * \param tls_context A context object used in TLS/SLL connection.
+ * \param tls_context A context object used in TLS/SSL connection.
+ * \param logger An object used to log events within the Client.
*/
explicit mqtt_client(
const executor_type& ex,
- TlsContext tls_context = {}
+ tls_context_type tls_context = {}, logger_type logger = {}
) :
_impl(std::make_shared(
- ex, std::move(tls_context)
+ ex, std::move(tls_context), std::move(logger)
))
{}
@@ -97,7 +101,8 @@ public:
*
* \tparam \__ExecutionContext\__ Type of a concrete execution context.
* \param context Execution context whose executor will be associated with the Client.
- * \param tls_context A context object used in TLS/SLL connection.
+ * \param tls_context A context object used in TLS/SSL connection.
+ * \param logger An object used to log events within the Client.
*
* \par Precondition
* \code
@@ -113,9 +118,12 @@ public:
>
explicit mqtt_client(
ExecutionContext& context,
- TlsContext tls_context = {}
+ tls_context_type tls_context = {}, logger_type logger = {}
) :
- mqtt_client(context.get_executor(), std::move(tls_context))
+ mqtt_client(
+ context.get_executor(),
+ std::move(tls_context), std::move(logger)
+ )
{}
/**
@@ -171,8 +179,7 @@ public:
typename Ctx = TlsContext,
std::enable_if_t, bool> = true
>
- decltype(auto) tls_context()
- {
+ decltype(auto) tls_context() {
return _impl->tls_context();
}
@@ -973,7 +980,6 @@ public:
};
-
} // end namespace async_mqtt5
#endif // !ASYNC_MQTT5_MQTT_CLIENT_HPP
diff --git a/test/include/test_common/test_autoconnect_stream.hpp b/test/include/test_common/test_autoconnect_stream.hpp
index 7e4fb4b..9b8a1a5 100644
--- a/test/include/test_common/test_autoconnect_stream.hpp
+++ b/test/include/test_common/test_autoconnect_stream.hpp
@@ -18,6 +18,7 @@
#include
#include
+#include
#include
@@ -28,7 +29,8 @@ using error_code = boost::system::error_code;
template <
typename StreamType,
- typename StreamContext = std::monostate
+ typename StreamContext = std::monostate,
+ typename LoggerType = async_mqtt5::detail::noop_logger
>
class test_autoconnect_stream {
public:
@@ -36,28 +38,33 @@ public:
using stream_ptr = std::shared_ptr;
using stream_context_type = StreamContext;
using executor_type = typename stream_type::executor_type;
-
+ using logger_type = LoggerType;
private:
executor_type _stream_executor;
detail::async_mutex _conn_mtx;
asio::steady_timer _connect_timer;
- detail::endpoints _endpoints;
+ detail::endpoints _endpoints;
stream_ptr _stream_ptr;
stream_context_type& _stream_context;
+ detail::log_invoke _log;
+
template
friend class async_mqtt5::detail::reconnect_op;
public:
test_autoconnect_stream(
- const executor_type& ex, stream_context_type& context
+ const executor_type& ex,
+ stream_context_type& context,
+ detail::log_invoke& log
) :
_stream_executor(ex),
_conn_mtx(_stream_executor),
_connect_timer(_stream_executor),
- _endpoints(_stream_executor, _connect_timer),
- _stream_context(context)
+ _endpoints(_stream_executor, _connect_timer, log),
+ _stream_context(context),
+ _log(log)
{
replace_next_layer(construct_next_layer());
open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
@@ -115,6 +122,11 @@ public:
close();
std::exchange(_stream_ptr, std::move(sptr));
}
+
+private:
+ detail::log_invoke& log() {
+ return _log;
+ }
};
} // end namespace async_mqtt5::test
diff --git a/test/unit/connect_op.cpp b/test/unit/connect_op.cpp
index 560cbce..239e03c 100644
--- a/test/unit/connect_op.cpp
+++ b/test/unit/connect_op.cpp
@@ -17,6 +17,7 @@
#include
#include
+#include
#include
#include
@@ -64,8 +65,9 @@ void run_unit_test(
std::move(h)(ec);
};
- detail::connect_op(
- stream, mqtt_ctx, std::move(handler)
+ detail::log_invoke d;
+ detail::connect_op(
+ stream, mqtt_ctx, d, std::move(handler)
).perform(*std::begin(eps), std::move(ap));
ioc.run_for(1s);
diff --git a/test/unit/logger.cpp b/test/unit/logger.cpp
new file mode 100644
index 0000000..8f88d05
--- /dev/null
+++ b/test/unit/logger.cpp
@@ -0,0 +1,198 @@
+//
+// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
+//
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+#include
+#include // async_teardown specialization for websocket ssl stream
+
+#include
+
+#include
+#include
+
+#include "test_common/message_exchange.hpp"
+#include "test_common/test_service.hpp"
+#include "test_common/test_stream.hpp"
+
+using namespace async_mqtt5;
+namespace asio = boost::asio;
+
+namespace async_mqtt5 {
+
+template
+struct tls_handshake_type> {
+ static constexpr auto client = asio::ssl::stream_base::client;
+ static constexpr auto server = asio::ssl::stream_base::server;
+};
+
+template
+void assign_tls_sni(
+ const authority_path& ap,
+ asio::ssl::context& /* ctx */,
+ asio::ssl::stream& stream
+) {
+ SSL_set_tlsext_host_name(stream.native_handle(), ap.host.c_str());
+}
+
+} // end namespace async_mqtt5
+
+
+void logger_test() {
+ BOOST_STATIC_ASSERT(detail::has_at_resolve);
+ BOOST_STATIC_ASSERT(detail::has_at_tcp_connect);
+ BOOST_STATIC_ASSERT(detail::has_at_tls_handshake);
+ BOOST_STATIC_ASSERT(detail::has_at_ws_handshake);
+ BOOST_STATIC_ASSERT(detail::has_at_connack);
+ BOOST_STATIC_ASSERT(detail::has_at_disconnect);
+}
+
+using stream_type = boost::beast::websocket::stream<
+ asio::ssl::stream
+>;
+using context_type = asio::ssl::context;
+using logger_type = logger;
+using client_type = mqtt_client;
+
+BOOST_AUTO_TEST_SUITE(logger_tests)
+
+class clog_redirect {
+ std::streambuf* _old_buffer;
+
+public:
+ clog_redirect(
+ std::streambuf* new_buffer
+ ) :
+ _old_buffer(std::clog.rdbuf(new_buffer))
+ {}
+
+ ~clog_redirect() {
+ std::clog.rdbuf(_old_buffer);
+ }
+
+};
+
+bool contains(const std::string& str, const std::string& substr) {
+ return str.find(substr) != std::string::npos;
+}
+
+BOOST_AUTO_TEST_CASE(successful_connect_debug) {
+ boost::test_tools::output_test_stream output;
+
+ {
+ clog_redirect guard(output.rdbuf());
+ asio::io_context ioc;
+
+ asio::ssl::context tls_context(asio::ssl::context::tls_client);
+ client_type c(
+ ioc, std::move(tls_context), logger(log_level::debug)
+ );
+
+ c.brokers("broker.hivemq.com/mqtt", 8884)
+ .async_run(asio::detached);
+
+ c.async_disconnect([](error_code) {});
+
+ ioc.run();
+ }
+
+ std::string log = output.rdbuf()->str();
+ BOOST_TEST_MESSAGE(log);
+ BOOST_TEST_WARN(contains(log, "resolve"));
+ BOOST_TEST_WARN(contains(log, "connect"));
+ BOOST_TEST_WARN(contains(log, "tls handshake"));
+ BOOST_TEST_WARN(contains(log, "ws handshake"));
+ BOOST_TEST_WARN(contains(log, "connack"));
+}
+
+BOOST_AUTO_TEST_CASE(successful_connect_warning) {
+ boost::test_tools::output_test_stream output;
+
+ {
+ clog_redirect guard(output.rdbuf());
+
+ asio::io_context ioc;
+ asio::ssl::context tls_context(asio::ssl::context::tls_client);
+ client_type c(
+ ioc, std::move(tls_context), logger(log_level::warning)
+ );
+
+ c.brokers("broker.hivemq.com/mqtt", 8884)
+ .async_run(asio::detached);
+
+ c.async_disconnect([](error_code) {});
+
+ ioc.run();
+ }
+
+ // If connection is successful, nothing should be printed.
+ // However if the Broker is down or overloaded, this will cause logs to be printed.
+ // We should not fail the test because of it.
+ BOOST_TEST_WARN(output.is_empty());
+}
+
+BOOST_AUTO_TEST_CASE(disconnect) {
+ using test::after;
+ using namespace std::chrono_literals;
+
+ boost::test_tools::output_test_stream output;
+ {
+ clog_redirect guard(output.rdbuf());
+
+ // packets
+ auto connect = encoders::encode_connect(
+ "", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
+ );
+ auto connack = encoders::encode_connack(false, uint8_t(0x00), {});
+
+ disconnect_props dc_props;
+ dc_props[prop::reason_string] = "No reason.";
+ auto disconnect = encoders::encode_disconnect(0x00, dc_props);
+
+ test::msg_exchange broker_side;
+ broker_side
+ .expect(connect)
+ .complete_with(error_code{}, after(0ms))
+ .reply_with(connack, after(0ms))
+ .send(disconnect, after(50ms))
+ .expect(connect);
+
+ asio::io_context ioc;
+ auto executor = ioc.get_executor();
+ auto& broker = asio::make_service(
+ ioc, executor, std::move(broker_side)
+ );
+
+ mqtt_client c(executor);
+ c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
+ .async_run(asio::detached);
+
+ asio::steady_timer timer(c.get_executor());
+ timer.expires_after(100ms);
+ timer.async_wait([&c](error_code) { c.cancel(); });
+
+ ioc.run();
+ BOOST_TEST(broker.received_all_expected());
+ }
+
+ std::string log = output.rdbuf()->str();
+ BOOST_TEST_MESSAGE(log);
+ BOOST_TEST(contains(log, "disconnect"));
+}
+
+BOOST_AUTO_TEST_SUITE_END();
diff --git a/test/unit/reconnect_op.cpp b/test/unit/reconnect_op.cpp
index 1af4409..b072307 100644
--- a/test/unit/reconnect_op.cpp
+++ b/test/unit/reconnect_op.cpp
@@ -14,6 +14,8 @@
#include
#include
+#include
+
#include
#include
@@ -111,7 +113,8 @@ void run_connect_to_localhost_test(int succeed_after) {
);
auto stream_ctx = stream_context(std::monostate {});
- auto auto_stream = astream(ioc.get_executor(), stream_ctx);
+ auto log = detail::log_invoke();
+ auto auto_stream = astream(ioc.get_executor(), stream_ctx, log);
auto_stream.brokers("localhost", 1883);
auto handler = [&handlers_called](error_code ec) {
@@ -144,7 +147,8 @@ BOOST_AUTO_TEST_CASE(no_servers) {
asio::io_context ioc;
auto stream_ctx = stream_context(std::monostate{});
- auto auto_stream = astream(ioc.get_executor(), stream_ctx);
+ auto log = detail::log_invoke();
+ auto auto_stream = astream(ioc.get_executor(), stream_ctx, log);
auto_stream.brokers("", 1883);
auto handler = [&handlers_called](error_code ec) {