mirror of
https://github.com/boostorg/mqtt5.git
synced 2025-07-30 20:47:37 +02:00
Do not store ec=session_expired if the client never subscribed
Summary: related to T13152 Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D26803
This commit is contained in:
@ -280,6 +280,6 @@ void run_openssl_tls_examples() {
|
||||
publish_qos0_openssl_tls();
|
||||
publish_qos1_openssl_tls();
|
||||
publish_qos2_openssl_tls();
|
||||
subscribe_and_receive_openssl_tls(2);
|
||||
subscribe_and_receive_openssl_tls(1);
|
||||
test_coro();
|
||||
}
|
||||
|
@ -7,9 +7,9 @@ void run_websocket_tls_examples();
|
||||
int main(int argc, char* argv[]) {
|
||||
|
||||
run_tcp_examples();
|
||||
// run_openssl_tls_examples();
|
||||
// run_websocket_tcp_examples();
|
||||
// run_websocket_tls_examples();
|
||||
run_openssl_tls_examples();
|
||||
run_websocket_tcp_examples();
|
||||
run_websocket_tls_examples();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -142,5 +142,5 @@ void run_tcp_examples() {
|
||||
publish_qos0_tcp();
|
||||
publish_qos1_tcp();
|
||||
publish_qos2_tcp();
|
||||
subscribe_and_receive_tcp(2);
|
||||
subscribe_and_receive_tcp(1);
|
||||
}
|
||||
|
@ -168,5 +168,5 @@ void run_websocket_tcp_examples() {
|
||||
publish_qos0_websocket_tcp();
|
||||
publish_qos1_websocket_tcp();
|
||||
publish_qos2_websocket_tcp();
|
||||
subscribe_and_receive_websocket_tcp(2);
|
||||
subscribe_and_receive_websocket_tcp(1);
|
||||
}
|
||||
|
@ -245,5 +245,5 @@ void run_websocket_tls_examples() {
|
||||
publish_qos0_websocket_tls();
|
||||
publish_qos1_websocket_tls();
|
||||
publish_qos2_websocket_tls();
|
||||
subscribe_and_receive_websocket_tls(2);
|
||||
subscribe_and_receive_websocket_tls(1);
|
||||
}
|
||||
|
@ -39,12 +39,23 @@ class session_state {
|
||||
uint8_t _flags = 0b00;
|
||||
|
||||
static constexpr uint8_t session_present_flag = 0b01;
|
||||
static constexpr uint8_t subscriptions_present_flag = 0b10;
|
||||
public:
|
||||
void session_present(bool present) {
|
||||
return update_flag(present, session_present_flag);
|
||||
}
|
||||
|
||||
bool session_present() const { return _flags & session_present_flag; };
|
||||
bool session_present() const {
|
||||
return _flags & session_present_flag;
|
||||
}
|
||||
|
||||
void subscriptions_present(bool present) {
|
||||
return update_flag(present, subscriptions_present_flag);
|
||||
}
|
||||
|
||||
bool subscriptions_present() const {
|
||||
return _flags & subscriptions_present_flag;
|
||||
}
|
||||
|
||||
private:
|
||||
void update_flag(bool set, uint8_t flag) {
|
||||
|
@ -3,8 +3,8 @@
|
||||
|
||||
#include <boost/asio/experimental/basic_concurrent_channel.hpp>
|
||||
|
||||
#include <async_mqtt5/detail/internal_types.hpp>
|
||||
#include <async_mqtt5/detail/channel_traits.hpp>
|
||||
#include <async_mqtt5/detail/internal_types.hpp>
|
||||
|
||||
#include <async_mqtt5/impl/assemble_op.hpp>
|
||||
#include <async_mqtt5/impl/async_sender.hpp>
|
||||
@ -40,6 +40,14 @@ public:
|
||||
return _tls_context;
|
||||
}
|
||||
|
||||
auto& session_state() {
|
||||
return _mqtt_context.session_state;
|
||||
}
|
||||
|
||||
const auto& session_state() const {
|
||||
return _mqtt_context.session_state;
|
||||
}
|
||||
|
||||
void will(will will) {
|
||||
_mqtt_context.will = std::move(will);
|
||||
}
|
||||
@ -78,6 +86,14 @@ public:
|
||||
return _mqtt_context;
|
||||
}
|
||||
|
||||
auto& session_state() {
|
||||
return _mqtt_context.session_state;
|
||||
}
|
||||
|
||||
const auto& session_state() const {
|
||||
return _mqtt_context.session_state;
|
||||
}
|
||||
|
||||
void will(will will) {
|
||||
_mqtt_context.will = std::move(will);
|
||||
}
|
||||
@ -289,12 +305,25 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
bool subscriptions_present() const {
|
||||
return _stream_context.session_state().subscriptions_present();
|
||||
}
|
||||
|
||||
void subscriptions_present(bool present) {
|
||||
_stream_context.session_state().subscriptions_present(present);
|
||||
}
|
||||
|
||||
void update_session_state() {
|
||||
auto& session_state = _stream_context.mqtt_context().session_state;
|
||||
auto& session_state = _stream_context.session_state();
|
||||
|
||||
if (!session_state.session_present()) {
|
||||
channel_store_error(client::error::session_expired);
|
||||
_replies.clear_pending_pubrels();
|
||||
session_state.session_present(true);
|
||||
|
||||
if (session_state.subscriptions_present()) {
|
||||
channel_store_error(client::error::session_expired);
|
||||
session_state.subscriptions_present(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
#ifndef ASYNC_MQTT5_SUBSCRIBE_OP_HPP
|
||||
#define ASYNC_MQTT5_SUBSCRIBE_OP_HPP
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include <boost/asio/detached.hpp>
|
||||
|
||||
#include <async_mqtt5/error.hpp>
|
||||
@ -173,6 +175,15 @@ private:
|
||||
error_code ec, uint16_t packet_id,
|
||||
std::vector<reason_code> reason_codes, suback_props props
|
||||
) {
|
||||
if (!_svc_ptr->subscriptions_present()) {
|
||||
bool has_success_rc = std::any_of(
|
||||
reason_codes.cbegin(), reason_codes.cend(),
|
||||
[](const reason_code& rc) { return !rc; }
|
||||
);
|
||||
if (has_success_rc)
|
||||
_svc_ptr->subscriptions_present(true);
|
||||
}
|
||||
|
||||
_svc_ptr->free_pid(packet_id);
|
||||
_handler.complete(ec, std::move(reason_codes), std::move(props));
|
||||
}
|
||||
|
@ -15,43 +15,21 @@ BOOST_AUTO_TEST_SUITE(session/*, *boost::unit_test::disabled()*/)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(session_state_session_present) {
|
||||
detail::session_state session_state;
|
||||
|
||||
BOOST_CHECK_EQUAL(session_state.session_present(), false);
|
||||
session_state.session_present(true);
|
||||
BOOST_CHECK_EQUAL(session_state.session_present(), true);
|
||||
session_state.session_present(false);
|
||||
BOOST_CHECK_EQUAL(session_state.session_present(), false);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(session_expired_in_channel) {
|
||||
asio::io_context ioc;
|
||||
|
||||
using stream_type = asio::ip::tcp::socket;
|
||||
using client_type = mqtt_client<stream_type>;
|
||||
client_type c(ioc, "");
|
||||
|
||||
c.credentials("tester", "", "")
|
||||
.brokers("mqtt.mireo.local", 1883)
|
||||
.run();
|
||||
|
||||
co_spawn(ioc,
|
||||
[&]() -> asio::awaitable<void> {
|
||||
auto [ec, topic, payload, props] = co_await c.async_receive(use_nothrow_awaitable);
|
||||
BOOST_CHECK(ec == client::error::session_expired);
|
||||
BOOST_CHECK_EQUAL(topic, std::string {});
|
||||
BOOST_CHECK_EQUAL(payload, std::string {});
|
||||
c.cancel();
|
||||
co_return;
|
||||
},
|
||||
asio::detached
|
||||
);
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(session_state.subscriptions_present(), false);
|
||||
session_state.subscriptions_present(true);
|
||||
BOOST_CHECK_EQUAL(session_state.subscriptions_present(), true);
|
||||
session_state.subscriptions_present(false);
|
||||
BOOST_CHECK_EQUAL(session_state.subscriptions_present(), false);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(clear_waiting_on_pubrel) {
|
||||
constexpr int expected_handlers_called = 1;
|
||||
int handlers_called = 0;
|
||||
|
||||
asio::io_context ioc;
|
||||
using client_service_type = test::test_service<asio::ip::tcp::socket>;
|
||||
auto svc_ptr = std::make_shared<client_service_type>(ioc.get_executor());
|
||||
@ -65,24 +43,14 @@ BOOST_AUTO_TEST_CASE(clear_waiting_on_pubrel) {
|
||||
// let publish_rec_op reach wait_on_pubrel stage
|
||||
asio::steady_timer timer(ioc.get_executor());
|
||||
timer.expires_after(std::chrono::milliseconds(50));
|
||||
timer.async_wait([&svc_ptr, &handlers_called](error_code) {
|
||||
timer.async_wait([&svc_ptr](error_code) {
|
||||
BOOST_CHECK_EQUAL(svc_ptr.use_count(), 2);
|
||||
svc_ptr->update_session_state(); // session_present = false
|
||||
// publish_rec_op should complete
|
||||
BOOST_CHECK_EQUAL(svc_ptr.use_count(), 1);
|
||||
|
||||
svc_ptr->async_channel_receive(
|
||||
[&svc_ptr, &handlers_called](error_code ec, std::string topic, std::string payload, publish_props props) {
|
||||
handlers_called++;
|
||||
BOOST_CHECK(ec == client::error::session_expired);
|
||||
BOOST_CHECK_EQUAL(topic, std::string {});
|
||||
BOOST_CHECK_EQUAL(payload, std::string {});
|
||||
svc_ptr->cancel();
|
||||
});
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user