forked from boostorg/mqtt5
Re-create client_service after stopping client's runloop and let it destroy asynchronously
Summary: related to T11798, #5 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27882
This commit is contained in:
@ -84,7 +84,7 @@ public:
|
|||||||
|
|
||||||
class any_authenticator {
|
class any_authenticator {
|
||||||
std::string _method;
|
std::string _method;
|
||||||
std::unique_ptr<detail::auth_fun_base> _auth_fun;
|
std::shared_ptr<detail::auth_fun_base> _auth_fun;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
any_authenticator() = default;
|
any_authenticator() = default;
|
||||||
|
@ -75,6 +75,15 @@ struct mqtt_ctx {
|
|||||||
connack_props ca_props;
|
connack_props ca_props;
|
||||||
session_state state;
|
session_state state;
|
||||||
any_authenticator authenticator;
|
any_authenticator authenticator;
|
||||||
|
|
||||||
|
mqtt_ctx() = default;
|
||||||
|
|
||||||
|
mqtt_ctx(const mqtt_ctx& other) :
|
||||||
|
creds(other.creds), will_msg(other.will_msg),
|
||||||
|
keep_alive(other.keep_alive), co_props(other.co_props),
|
||||||
|
ca_props {}, state {},
|
||||||
|
authenticator(other.authenticator)
|
||||||
|
{}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct disconnect_ctx {
|
struct disconnect_ctx {
|
||||||
|
@ -81,6 +81,10 @@ public:
|
|||||||
_endpoints.brokers(std::move(hosts), default_port);
|
_endpoints.brokers(std::move(hosts), default_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void clone_endpoints(const autoconnect_stream& other) {
|
||||||
|
_endpoints.clone_servers(other._endpoints);
|
||||||
|
}
|
||||||
|
|
||||||
bool is_open() const noexcept {
|
bool is_open() const noexcept {
|
||||||
return lowest_layer(*_stream_ptr).is_open();
|
return lowest_layer(*_stream_ptr).is_open();
|
||||||
}
|
}
|
||||||
|
@ -36,10 +36,15 @@ class stream_context<
|
|||||||
using tls_context_type = TlsContext;
|
using tls_context_type = TlsContext;
|
||||||
|
|
||||||
mqtt_ctx _mqtt_context;
|
mqtt_ctx _mqtt_context;
|
||||||
tls_context_type _tls_context;
|
std::shared_ptr<tls_context_type> _tls_context_ptr;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit stream_context(TlsContext tls_context) :
|
explicit stream_context(TlsContext tls_context) :
|
||||||
_tls_context(std::move(tls_context))
|
_tls_context_ptr(std::make_shared<tls_context_type>(std::move(tls_context)))
|
||||||
|
{}
|
||||||
|
|
||||||
|
stream_context(const stream_context& other) :
|
||||||
|
_mqtt_context(other._mqtt_context), _tls_context_ptr(other._tls_context_ptr)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
auto& mqtt_context() {
|
auto& mqtt_context() {
|
||||||
@ -51,7 +56,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto& tls_context() {
|
auto& tls_context() {
|
||||||
return _tls_context;
|
return *_tls_context_ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& session_state() {
|
auto& session_state() {
|
||||||
@ -115,6 +120,9 @@ class stream_context<
|
|||||||
mqtt_ctx _mqtt_context;
|
mqtt_ctx _mqtt_context;
|
||||||
public:
|
public:
|
||||||
explicit stream_context(std::monostate) {}
|
explicit stream_context(std::monostate) {}
|
||||||
|
stream_context(const stream_context& other) :
|
||||||
|
_mqtt_context(other._mqtt_context)
|
||||||
|
{}
|
||||||
|
|
||||||
auto& mqtt_context() {
|
auto& mqtt_context() {
|
||||||
return _mqtt_context;
|
return _mqtt_context;
|
||||||
@ -231,6 +239,17 @@ private:
|
|||||||
|
|
||||||
asio::any_completion_handler<void(error_code)> _run_handler;
|
asio::any_completion_handler<void(error_code)> _run_handler;
|
||||||
|
|
||||||
|
client_service(const client_service& other) :
|
||||||
|
_executor(other._executor), _stream_context(other._stream_context),
|
||||||
|
_stream(_executor, _stream_context),
|
||||||
|
_replies(_executor),
|
||||||
|
_async_sender(*this),
|
||||||
|
_active_span(_read_buff.cend(), _read_buff.cend()),
|
||||||
|
_rec_channel(_executor, std::numeric_limits<size_t>::max())
|
||||||
|
{
|
||||||
|
_stream.clone_endpoints(other._stream);
|
||||||
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
client_service(
|
client_service(
|
||||||
@ -251,6 +270,10 @@ public:
|
|||||||
return _executor;
|
return _executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto dup() const {
|
||||||
|
return std::shared_ptr<client_service>(new client_service(*this));
|
||||||
|
}
|
||||||
|
|
||||||
template <
|
template <
|
||||||
typename Ctx = TlsContext,
|
typename Ctx = TlsContext,
|
||||||
std::enable_if_t<!std::is_same_v<Ctx, std::monostate>, bool> = true
|
std::enable_if_t<!std::is_same_v<Ctx, std::monostate>, bool> = true
|
||||||
|
@ -149,6 +149,10 @@ public:
|
|||||||
: _resolver(ex), _connect_timer(timer)
|
: _resolver(ex), _connect_timer(timer)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
void clone_servers(const endpoints& other) {
|
||||||
|
_servers = other._servers;
|
||||||
|
}
|
||||||
|
|
||||||
using executor_type = asio::ip::tcp::resolver::executor_type;
|
using executor_type = asio::ip::tcp::resolver::executor_type;
|
||||||
// NOTE: asio::ip::basic_resolver returns executor by value
|
// NOTE: asio::ip::basic_resolver returns executor by value
|
||||||
executor_type get_executor() {
|
executor_type get_executor() {
|
||||||
|
@ -46,8 +46,8 @@ private:
|
|||||||
using client_service_type = detail::client_service<
|
using client_service_type = detail::client_service<
|
||||||
stream_type, tls_context_type
|
stream_type, tls_context_type
|
||||||
>;
|
>;
|
||||||
using clisvc_ptr = std::shared_ptr<client_service_type>;
|
using impl_type = std::shared_ptr<client_service_type>;
|
||||||
clisvc_ptr _svc_ptr;
|
impl_type _impl;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
@ -62,7 +62,7 @@ public:
|
|||||||
const std::string& cnf,
|
const std::string& cnf,
|
||||||
TlsContext tls_context = {}
|
TlsContext tls_context = {}
|
||||||
) :
|
) :
|
||||||
_svc_ptr(std::make_shared<client_service_type>(
|
_impl(std::make_shared<client_service_type>(
|
||||||
ex, cnf, std::move(tls_context)
|
ex, cnf, std::move(tls_context)
|
||||||
))
|
))
|
||||||
{}
|
{}
|
||||||
@ -109,8 +109,8 @@ public:
|
|||||||
* \details Cancels this client first. Moved-from client can only be destructed.
|
* \details Cancels this client first. Moved-from client can only be destructed.
|
||||||
*/
|
*/
|
||||||
mqtt_client& operator=(mqtt_client&& other) noexcept {
|
mqtt_client& operator=(mqtt_client&& other) noexcept {
|
||||||
cancel();
|
_impl->cancel();
|
||||||
_svc_ptr = std::move(other._svc_ptr);
|
_impl = std::move(other._impl);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,14 +120,15 @@ public:
|
|||||||
* \details Automatically calls \ref mqtt_client::cancel.
|
* \details Automatically calls \ref mqtt_client::cancel.
|
||||||
*/
|
*/
|
||||||
~mqtt_client() {
|
~mqtt_client() {
|
||||||
if (_svc_ptr) cancel();
|
if (_impl)
|
||||||
|
_impl->cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Get the executor associated with the object.
|
* \brief Get the executor associated with the object.
|
||||||
*/
|
*/
|
||||||
executor_type get_executor() const noexcept {
|
executor_type get_executor() const noexcept {
|
||||||
return _svc_ptr->get_executor();
|
return _impl->get_executor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -149,7 +150,7 @@ public:
|
|||||||
>
|
>
|
||||||
decltype(auto) tls_context()
|
decltype(auto) tls_context()
|
||||||
{
|
{
|
||||||
return _svc_ptr->tls_context();
|
return _impl->tls_context();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -178,16 +179,16 @@ public:
|
|||||||
decltype(auto) async_run(CompletionToken&& token) {
|
decltype(auto) async_run(CompletionToken&& token) {
|
||||||
using Signature = void(error_code);
|
using Signature = void(error_code);
|
||||||
|
|
||||||
auto initiation = [] (auto handler, const clisvc_ptr& svc_ptr) {
|
auto initiation = [] (auto handler, const impl_type& impl) {
|
||||||
svc_ptr->run(std::move(handler));
|
impl->run(std::move(handler));
|
||||||
|
|
||||||
detail::ping_op { svc_ptr }.perform();
|
detail::ping_op { impl }.perform();
|
||||||
detail::read_message_op { svc_ptr }.perform();
|
detail::read_message_op { impl }.perform();
|
||||||
detail::sentry_op { svc_ptr }.perform();
|
detail::sentry_op { impl }.perform();
|
||||||
};
|
};
|
||||||
|
|
||||||
return asio::async_initiate<CompletionToken, Signature>(
|
return asio::async_initiate<CompletionToken, Signature>(
|
||||||
initiation, token, _svc_ptr
|
initiation, token, _impl
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,7 +202,9 @@ public:
|
|||||||
* The Client cannot be used before calling \ref mqtt_client::async_run again.
|
* The Client cannot be used before calling \ref mqtt_client::async_run again.
|
||||||
*/
|
*/
|
||||||
void cancel() {
|
void cancel() {
|
||||||
_svc_ptr->cancel();
|
auto impl = _impl;
|
||||||
|
_impl = impl->dup();
|
||||||
|
impl->cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -217,7 +220,7 @@ public:
|
|||||||
* before the \ref async_run function is invoked again.
|
* before the \ref async_run function is invoked again.
|
||||||
*/
|
*/
|
||||||
mqtt_client& will(will will) {
|
mqtt_client& will(will will) {
|
||||||
_svc_ptr->will(std::move(will));
|
_impl->will(std::move(will));
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,7 +239,7 @@ public:
|
|||||||
std::string client_id,
|
std::string client_id,
|
||||||
std::string username = "", std::string password = ""
|
std::string username = "", std::string password = ""
|
||||||
) {
|
) {
|
||||||
_svc_ptr->credentials(
|
_impl->credentials(
|
||||||
std::move(client_id),
|
std::move(client_id),
|
||||||
std::move(username), std::move(password)
|
std::move(username), std::move(password)
|
||||||
);
|
);
|
||||||
@ -271,7 +274,7 @@ public:
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
mqtt_client& brokers(std::string hosts, uint16_t default_port = 1883) {
|
mqtt_client& brokers(std::string hosts, uint16_t default_port = 1883) {
|
||||||
_svc_ptr->brokers(std::move(hosts), default_port);
|
_impl->brokers(std::move(hosts), default_port);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,7 +297,7 @@ public:
|
|||||||
std::enable_if_t<detail::is_authenticator<Authenticator>, bool> = true
|
std::enable_if_t<detail::is_authenticator<Authenticator>, bool> = true
|
||||||
>
|
>
|
||||||
mqtt_client& authenticator(Authenticator&& authenticator) {
|
mqtt_client& authenticator(Authenticator&& authenticator) {
|
||||||
_svc_ptr->authenticator(std::forward<Authenticator>(authenticator));
|
_impl->authenticator(std::forward<Authenticator>(authenticator));
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -318,7 +321,7 @@ public:
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
mqtt_client& keep_alive(uint16_t seconds) {
|
mqtt_client& keep_alive(uint16_t seconds) {
|
||||||
_svc_ptr->keep_alive(seconds);
|
_impl->keep_alive(seconds);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -328,7 +331,7 @@ public:
|
|||||||
* \see See \__CONNECT_PROPS\__ for all eligible properties.
|
* \see See \__CONNECT_PROPS\__ for all eligible properties.
|
||||||
*/
|
*/
|
||||||
mqtt_client& connect_properties(connect_props props) {
|
mqtt_client& connect_properties(connect_props props) {
|
||||||
_svc_ptr->connect_properties(std::move(props));
|
_impl->connect_properties(std::move(props));
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -350,7 +353,7 @@ public:
|
|||||||
std::integral_constant<prop::property_type, p> prop,
|
std::integral_constant<prop::property_type, p> prop,
|
||||||
prop::value_type_t<p> value
|
prop::value_type_t<p> value
|
||||||
) {
|
) {
|
||||||
_svc_ptr->connect_property(prop) = value;
|
_impl->connect_property(prop) = value;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -361,7 +364,7 @@ public:
|
|||||||
* \note If \ref authenticator was not called, this method does nothing.
|
* \note If \ref authenticator was not called, this method does nothing.
|
||||||
*/
|
*/
|
||||||
void re_authenticate() {
|
void re_authenticate() {
|
||||||
detail::re_auth_op { _svc_ptr }.perform();
|
detail::re_auth_op { _impl }.perform();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -385,7 +388,7 @@ public:
|
|||||||
const auto& connack_property(
|
const auto& connack_property(
|
||||||
std::integral_constant<prop::property_type, p> prop
|
std::integral_constant<prop::property_type, p> prop
|
||||||
) const {
|
) const {
|
||||||
return _svc_ptr->connack_property(prop);
|
return _impl->connack_property(prop);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -394,7 +397,7 @@ public:
|
|||||||
* \see See \__CONNACK_PROPS\__ for all eligible properties.
|
* \see See \__CONNACK_PROPS\__ for all eligible properties.
|
||||||
*/
|
*/
|
||||||
const connack_props& connack_properties() const {
|
const connack_props& connack_properties() const {
|
||||||
return _svc_ptr->connack_properties();
|
return _impl->connack_properties();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -477,11 +480,11 @@ public:
|
|||||||
auto initiation = [] (
|
auto initiation = [] (
|
||||||
auto handler, std::string topic, std::string payload,
|
auto handler, std::string topic, std::string payload,
|
||||||
retain_e retain, const publish_props& props,
|
retain_e retain, const publish_props& props,
|
||||||
const clisvc_ptr& svc_ptr
|
const impl_type& impl
|
||||||
) {
|
) {
|
||||||
detail::publish_send_op<
|
detail::publish_send_op<
|
||||||
client_service_type, decltype(handler), qos_type
|
client_service_type, decltype(handler), qos_type
|
||||||
> { svc_ptr, std::move(handler) }
|
> { impl, std::move(handler) }
|
||||||
.perform(
|
.perform(
|
||||||
std::move(topic), std::move(payload),
|
std::move(topic), std::move(payload),
|
||||||
retain, props
|
retain, props
|
||||||
@ -490,7 +493,7 @@ public:
|
|||||||
|
|
||||||
return asio::async_initiate<CompletionToken, Signature>(
|
return asio::async_initiate<CompletionToken, Signature>(
|
||||||
initiation, token,
|
initiation, token,
|
||||||
std::move(topic), std::move(payload), retain, props, _svc_ptr
|
std::move(topic), std::move(payload), retain, props, _impl
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -555,14 +558,14 @@ public:
|
|||||||
|
|
||||||
auto initiation = [] (
|
auto initiation = [] (
|
||||||
auto handler, const std::vector<subscribe_topic>& topics,
|
auto handler, const std::vector<subscribe_topic>& topics,
|
||||||
const subscribe_props& props, const clisvc_ptr& impl
|
const subscribe_props& props, const impl_type& impl
|
||||||
) {
|
) {
|
||||||
detail::subscribe_op { impl, std::move(handler) }
|
detail::subscribe_op { impl, std::move(handler) }
|
||||||
.perform(topics, props);
|
.perform(topics, props);
|
||||||
};
|
};
|
||||||
|
|
||||||
return asio::async_initiate<CompletionToken, Signature>(
|
return asio::async_initiate<CompletionToken, Signature>(
|
||||||
initiation, token, topics, props, _svc_ptr
|
initiation, token, topics, props, _impl
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -684,14 +687,14 @@ public:
|
|||||||
auto initiation = [](
|
auto initiation = [](
|
||||||
auto handler,
|
auto handler,
|
||||||
const std::vector<std::string>& topics,
|
const std::vector<std::string>& topics,
|
||||||
const unsubscribe_props& props, const clisvc_ptr& impl
|
const unsubscribe_props& props, const impl_type& impl
|
||||||
) {
|
) {
|
||||||
detail::unsubscribe_op { impl, std::move(handler) }
|
detail::unsubscribe_op { impl, std::move(handler) }
|
||||||
.perform(topics, props);
|
.perform(topics, props);
|
||||||
};
|
};
|
||||||
|
|
||||||
return asio::async_initiate<CompletionToken, Signature>(
|
return asio::async_initiate<CompletionToken, Signature>(
|
||||||
initiation, token, topics, props, _svc_ptr
|
initiation, token, topics, props, _impl
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -797,7 +800,7 @@ public:
|
|||||||
template <typename CompletionToken>
|
template <typename CompletionToken>
|
||||||
decltype(auto) async_receive(CompletionToken&& token) {
|
decltype(auto) async_receive(CompletionToken&& token) {
|
||||||
// Sig = void (error_code, std::string, std::string, publish_props)
|
// Sig = void (error_code, std::string, std::string, publish_props)
|
||||||
return _svc_ptr->async_channel_receive(
|
return _impl->async_channel_receive(
|
||||||
std::forward<CompletionToken>(token)
|
std::forward<CompletionToken>(token)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -851,9 +854,11 @@ public:
|
|||||||
disconnect_rc_e reason_code, const disconnect_props& props,
|
disconnect_rc_e reason_code, const disconnect_props& props,
|
||||||
CompletionToken&& token
|
CompletionToken&& token
|
||||||
) {
|
) {
|
||||||
|
auto impl = _impl;
|
||||||
|
_impl = impl->dup();
|
||||||
return detail::async_disconnect(
|
return detail::async_disconnect(
|
||||||
detail::disconnect_rc_e(static_cast<uint8_t>(reason_code)),
|
detail::disconnect_rc_e(static_cast<uint8_t>(reason_code)),
|
||||||
props, true, _svc_ptr,
|
props, true, impl,
|
||||||
std::forward<CompletionToken>(token)
|
std::forward<CompletionToken>(token)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -323,9 +323,8 @@ public:
|
|||||||
_qos(qos), _retain(retain)
|
_qos(qos), _retain(retain)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
// just to make sure that we don't accidentally make a copy
|
|
||||||
/// Copy constructor.
|
/// Copy constructor.
|
||||||
will(const will&) = delete;
|
will(const will&) = default;
|
||||||
|
|
||||||
/// Move constructor.
|
/// Move constructor.
|
||||||
will(will&&) noexcept = default;
|
will(will&&) noexcept = default;
|
||||||
|
@ -243,7 +243,7 @@ using namespace std::chrono;
|
|||||||
|
|
||||||
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
|
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
|
||||||
|
|
||||||
BOOST_FIXTURE_TEST_CASE(rerunning_the_client, shared_test_data, *boost::unit_test::disabled()) {
|
BOOST_FIXTURE_TEST_CASE(rerunning_the_client, shared_test_data) {
|
||||||
// packets
|
// packets
|
||||||
auto disconnect = encoders::encode_disconnect(uint8_t(0x00), {});
|
auto disconnect = encoders::encode_disconnect(uint8_t(0x00), {});
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user