add a notification in receive channel when session resets

Summary: resolves T13152

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Maniphest Tasks: T13152

Differential Revision: https://repo.mireo.local/D26679
This commit is contained in:
Korina Šimičević
2023-11-23 15:36:06 +01:00
parent 5034734fc4
commit b4f4cd0047
12 changed files with 189 additions and 29 deletions

View File

@ -30,6 +30,12 @@ may complete with, along with the reasons for their occurrence.
the current operation due to the exhaustion of the available identifiers.
This occurs when there are 65535 outgoing Packets awaiting their responses.
]]
[[`async_mqtt5::client::error::session_expired`][
The Client has established a successful connection with a Broker, but either the session does not exist or has expired.
In cases where the Client had previously set up subscriptions to Topics, these subscriptions are also expired.
Therefore, the Client should re-subscribe.
This error code is exclusive to completion handlers associated with [refmem mqtt_client async_receive] calls.
]]
[[`async_mqtt5::client::error::qos_not_supported`] [
The Client has attempted to publish an Application Message with __QOS__ higher
than the Maximum __QOS__ specified by the Server.

View File

@ -35,11 +35,32 @@ struct credentials {
}
};
class session_state {
uint8_t _flags = 0b00;
static constexpr uint8_t session_present_flag = 0b01;
public:
void session_present(bool present) {
return update_flag(present, session_present_flag);
}
bool session_present() const { return _flags & session_present_flag; };
private:
void update_flag(bool set, uint8_t flag) {
if (set)
_flags |= flag;
else
_flags &= ~flag;
}
};
struct mqtt_context {
credentials credentials;
std::optional<will> will;
connect_props co_props;
connack_props ca_props;
session_state session_state;
any_authenticator authenticator;
};

View File

@ -61,6 +61,9 @@ enum class error : int {
/** There are no more available Packet Identifiers to use. */
pid_overrun,
/** The Client's session does not exist or it has expired. */
session_expired,
// publish
/** The Server does not support the specified \ref qos_e. */
qos_not_supported,
@ -79,6 +82,8 @@ inline std::string client_error_to_string(error err) {
switch (err) {
case malformed_packet:
return "Malformed packet has been detected";
case session_expired:
return "The Client's session does not exist or it has expired. ";
case pid_overrun:
return "There are no more available Packet Identifiers to use.";
case qos_not_supported:
@ -401,7 +406,6 @@ constexpr reason_code session_taken_over { 0x8e };
/** The Topic Filter is not malformed, but it is not accepted. */
constexpr reason_code topic_filter_invalid { 0x8f };
/** The Topic Name is not malformed, but it is not accepted. */
constexpr reason_code topic_name_invalid { 0x90 };
@ -468,7 +472,7 @@ namespace detail {
using enum category;
template <category cat>
inline std::pair<reason_code*, size_t> valid_codes()
std::pair<reason_code*, size_t> valid_codes()
requires (cat == connack) {
static reason_code valid_codes[] = {
success, unspecified_error, malformed_packet,
@ -487,7 +491,7 @@ requires (cat == connack) {
}
template <category cat>
inline std::pair<reason_code*, size_t> valid_codes()
std::pair<reason_code*, size_t> valid_codes()
requires (cat == auth) {
static reason_code valid_codes[] = {
success, continue_authentication
@ -497,7 +501,7 @@ requires (cat == auth) {
}
template <category cat>
inline std::pair<reason_code*, size_t> valid_codes()
std::pair<reason_code*, size_t> valid_codes()
requires (cat == puback || cat == pubrec) {
static reason_code valid_codes[] = {
success, no_matching_subscribers, unspecified_error,
@ -510,7 +514,7 @@ requires (cat == puback || cat == pubrec) {
}
template <category cat>
inline std::pair<reason_code*, size_t> valid_codes()
std::pair<reason_code*, size_t> valid_codes()
requires (cat == pubrel || cat == pubcomp) {
static reason_code valid_codes[] = {
success, packet_id_not_found
@ -520,7 +524,7 @@ requires (cat == pubrel || cat == pubcomp) {
}
template <category cat>
inline std::pair<reason_code*, size_t> valid_codes()
std::pair<reason_code*, size_t> valid_codes()
requires (cat == suback) {
static reason_code valid_codes[] = {
granted_qos_0, granted_qos_1, granted_qos_2,
@ -536,7 +540,7 @@ requires (cat == suback) {
}
template <category cat>
inline std::pair<reason_code*, size_t> valid_codes()
std::pair<reason_code*, size_t> valid_codes()
requires (cat == unsuback) {
static reason_code valid_codes[] = {
success, no_subscription_existed,
@ -549,7 +553,7 @@ requires (cat == unsuback) {
}
template <category cat>
inline std::pair<reason_code*, size_t> valid_codes()
std::pair<reason_code*, size_t> valid_codes()
requires (cat == disconnect) {
static reason_code valid_codes[] = {
normal_disconnection, unspecified_error,
@ -578,7 +582,7 @@ requires (cat == disconnect) {
template <reason_codes::category cat>
inline std::optional<reason_code> to_reason_code(uint8_t code) {
std::optional<reason_code> to_reason_code(uint8_t code) {
auto [ptr, len] = reason_codes::detail::valid_codes<cat>();
auto it = std::lower_bound(ptr, ptr + len, reason_code(code));

View File

@ -125,6 +125,7 @@ public:
duration wait_for, CompletionCondition cc
) {
if (ec == asio::error::try_again) {
_svc.update_session_state();
_svc._async_sender.resend();
_data_span = { _read_buff.cend(), _read_buff.cend() };
return perform(wait_for, std::move(cc));

View File

@ -73,7 +73,7 @@ class async_sender {
serial_num_t _last_serial_num { 0 };
public:
async_sender(ClientService& svc) : _svc(svc) {}
explicit async_sender(ClientService& svc) : _svc(svc) {}
using executor_type = typename client_service::executor_type;
executor_type get_executor() const noexcept {
@ -145,6 +145,7 @@ public:
_write_in_progress = false;
if (ec == asio::error::try_again) {
_svc.update_session_state();
_write_queue.insert(
_write_queue.begin(),
std::make_move_iterator(write_queue.begin()),

View File

@ -28,7 +28,7 @@ class stream_context<StreamType, TlsContext> {
mqtt_context _mqtt_context;
tls_context_type _tls_context;
public:
stream_context(TlsContext tls_context) :
explicit stream_context(TlsContext tls_context) :
_tls_context(std::move(tls_context))
{}
@ -72,7 +72,7 @@ requires (!has_tls_layer<StreamType>)
class stream_context<StreamType, std::monostate> {
mqtt_context _mqtt_context;
public:
stream_context(std::monostate) {}
explicit stream_context(std::monostate) {}
mqtt_context& mqtt_context() {
return _mqtt_context;
@ -289,6 +289,15 @@ public:
);
}
void update_session_state() {
auto& session_state = _stream_context.mqtt_context().session_state;
if (!session_state.session_present()) {
channel_store_error(client::error::session_expired);
_replies.clear_pending_pubrels();
session_state.session_present(true);
}
}
bool channel_store(decoders::publish_message message) {
auto& [topic, packet_id, flags, props, payload] = message;
return _rec_channel.try_send(
@ -297,6 +306,10 @@ public:
);
}
bool channel_store_error(error_code ec) {
return _rec_channel.try_send(ec, std::string {}, std::string {}, publish_props {});
}
template <typename CompletionToken>
decltype(auto) async_channel_receive(CompletionToken&& token) {
// sig = void (error_code, std::string, std::string, publish_props)

View File

@ -281,6 +281,7 @@ public:
const auto& [session_present, reason_code, ca_props] = *rv;
_ctx.ca_props = ca_props;
_ctx.session_state.session_present(session_present);
// TODO: session_present logic
// Unexpected result handling:

View File

@ -155,6 +155,19 @@ public:
_fast_replies.clear();
}
void clear_pending_pubrels() {
for (auto it = _handlers.begin(); it != _handlers.end();) {
if (it->code() == control_code_e::pubrel) {
std::move(*it)(
asio::error::operation_aborted, byte_citer {}, byte_citer {}
);
it = _handlers.erase(it);
}
else
++it;
}
}
private:
handlers::iterator find_handler(control_code_e code, uint16_t packet_id) {
return std::find_if(

View File

@ -660,6 +660,7 @@ public:
* The list of all possible error codes that this operation can finish with:\n
* - `boost::system::errc::errc_t::success`\n
* - `boost::asio::error::operation_aborted`\n
* - \link async_mqtt5::client::error::session_expired \endlink
*
* Refer to the section on \__ERROR_HANDLING\__ to find the underlying causes for each error code.
*/

View File

@ -166,7 +166,7 @@ BOOST_AUTO_TEST_CASE(client_cancel_async_receive) {
}
// passes on debian, hangs on windows in io_context destructor
BOOST_AUTO_TEST_CASE(ioc_stop_async_publish) {
BOOST_AUTO_TEST_CASE(ioc_stop_async_publish, *boost::unit_test::disabled() ) {
cancel_async_publish<test::ioc_stop>();
}
@ -175,7 +175,7 @@ BOOST_AUTO_TEST_CASE(client_cancel_async_publish) {
}
// passes on debian, hangs on windows
BOOST_AUTO_TEST_CASE(ioc_stop_cancel_during_connecting) {
BOOST_AUTO_TEST_CASE(ioc_stop_cancel_during_connecting, *boost::unit_test::disabled() ) {
cancel_during_connecting<test::ioc_stop>();
}

View File

@ -1,5 +1,6 @@
#include <boost/test/unit_test.hpp>
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
@ -15,25 +16,30 @@ BOOST_AUTO_TEST_SUITE(coroutine/*, *boost::unit_test::disabled()*/)
using namespace async_mqtt5;
namespace asio = boost::asio;
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
template<typename StreamType, typename TlsContext>
asio::awaitable<void> sanity_check(mqtt_client<StreamType, TlsContext>& c) {
co_await c.template async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!", retain_e::no, publish_props{},
asio::use_awaitable
auto [ec_0] = co_await c.template async_publish<qos_e::at_most_once>(
"test/mqtt-test", "hello world with qos0!", retain_e::yes, publish_props {},
use_nothrow_awaitable
);
BOOST_CHECK(!ec_0);
auto [puback_rc, puback_props] = co_await c.template async_publish<qos_e::at_least_once>(
auto [ec_1, puback_rc, puback_props] = co_await c.template async_publish<qos_e::at_least_once>(
"test/mqtt-test", "hello world with qos1!",
retain_e::no, publish_props{},
asio::use_awaitable
retain_e::yes, publish_props {},
use_nothrow_awaitable
);
BOOST_CHECK(!ec_1);
BOOST_CHECK(!puback_rc);
auto [pubcomp_rc, pubcomp_props] = co_await c.template async_publish<qos_e::exactly_once>(
auto [ec_2, pubcomp_rc, pubcomp_props] = co_await c.template async_publish<qos_e::exactly_once>(
"test/mqtt-test", "hello world with qos2!",
retain_e::no, publish_props{},
asio::use_awaitable
retain_e::yes, publish_props {},
use_nothrow_awaitable
);
BOOST_CHECK(!ec_2);
BOOST_CHECK(!pubcomp_rc);
std::vector<subscribe_topic> topics;
@ -46,19 +52,21 @@ asio::awaitable<void> sanity_check(mqtt_client<StreamType, TlsContext>& c) {
}
});
auto [sub_codes, sub_props] = co_await c.async_subscribe(
topics, subscribe_props{}, asio::use_awaitable
auto [sub_ec, sub_codes, sub_props] = co_await c.async_subscribe(
topics, subscribe_props {}, use_nothrow_awaitable
);
BOOST_CHECK(!sub_ec);
BOOST_CHECK(!sub_codes[0]);
auto [topic, payload, publish_props] = co_await c.async_receive(asio::use_awaitable);
auto [rec, topic, payload, publish_props] = co_await c.async_receive(use_nothrow_awaitable);
auto [unsub_codes, unsub_props] = co_await c.async_unsubscribe(
auto [unsub_ec, unsub_codes, unsub_props] = co_await c.async_unsubscribe(
std::vector<std::string>{"test/mqtt-test"}, unsubscribe_props{},
asio::use_awaitable
use_nothrow_awaitable
);
BOOST_CHECK(!unsub_ec);
BOOST_CHECK(!unsub_codes[0]);
co_await c.async_disconnect(asio::use_awaitable);
co_await c.async_disconnect(use_nothrow_awaitable);
co_return;
}

View File

@ -0,0 +1,91 @@
#include <boost/test/unit_test.hpp>
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <async_mqtt5.hpp>
#include <test_common/test_service.hpp>
using namespace async_mqtt5;
constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);
BOOST_AUTO_TEST_SUITE(session/*, *boost::unit_test::disabled()*/)
BOOST_AUTO_TEST_CASE(session_state_session_present) {
detail::session_state session_state;
BOOST_CHECK_EQUAL(session_state.session_present(), false);
session_state.session_present(true);
BOOST_CHECK_EQUAL(session_state.session_present(), true);
session_state.session_present(false);
BOOST_CHECK_EQUAL(session_state.session_present(), false);
}
BOOST_AUTO_TEST_CASE(session_expired_in_channel) {
asio::io_context ioc;
using stream_type = asio::ip::tcp::socket;
using client_type = mqtt_client<stream_type>;
client_type c(ioc, "");
c.credentials("tester", "", "")
.brokers("mqtt.mireo.local", 1883)
.run();
co_spawn(ioc,
[&]() -> asio::awaitable<void> {
auto [ec, topic, payload, props] = co_await c.async_receive(use_nothrow_awaitable);
BOOST_CHECK(ec == client::error::session_expired);
BOOST_CHECK_EQUAL(topic, std::string {});
BOOST_CHECK_EQUAL(payload, std::string {});
c.cancel();
co_return;
},
asio::detached
);
ioc.run();
}
BOOST_AUTO_TEST_CASE(clear_waiting_on_pubrel) {
constexpr int expected_handlers_called = 1;
int handlers_called = 0;
asio::io_context ioc;
using client_service_type = test::test_service<asio::ip::tcp::socket>;
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
decoders::publish_message pub_msg = std::make_tuple(
"topic", 1, 0b0100, publish_props {}, "payload"
);
detail::publish_rec_op<client_service_type> { svc_ptr }.perform(pub_msg);
// let publish_rec_op reach wait_on_pubrel stage
asio::steady_timer timer(ioc.get_executor());
timer.expires_after(std::chrono::milliseconds(50));
timer.async_wait([&svc_ptr, &handlers_called](error_code) {
BOOST_CHECK_EQUAL(svc_ptr.use_count(), 2);
svc_ptr->update_session_state(); // session_present = false
// publish_rec_op should complete
BOOST_CHECK_EQUAL(svc_ptr.use_count(), 1);
svc_ptr->async_channel_receive(
[&svc_ptr, &handlers_called](error_code ec, std::string topic, std::string payload, publish_props props) {
handlers_called++;
BOOST_CHECK(ec == client::error::session_expired);
BOOST_CHECK_EQUAL(topic, std::string {});
BOOST_CHECK_EQUAL(payload, std::string {});
svc_ptr->cancel();
});
});
ioc.run();
BOOST_CHECK_EQUAL(
handlers_called, expected_handlers_called
);
}
BOOST_AUTO_TEST_SUITE_END();