diff --git a/example/cpp20_coroutines.cpp b/example/cpp20_coroutines.cpp index 86c9a3f..3cec2c8 100644 --- a/example/cpp20_coroutines.cpp +++ b/example/cpp20_coroutines.cpp @@ -165,9 +165,9 @@ int main(int argc, char** argv) { .brokers("mqtt.broker", 1883) .run(); - asio::co_spawn(ioc.get_executor(), coroutine(c), asio::detached); + co_spawn(ioc.get_executor(), coroutine(c), asio::detached); // or... - asio::co_spawn(ioc.get_executor(), nothrow_coroutine(c), asio::detached); + co_spawn(ioc.get_executor(), nothrow_coroutine(c), asio::detached); ioc.run(); } diff --git a/example/publisher.cpp b/example/publisher.cpp index 1a8a227..26d4d56 100644 --- a/example/publisher.cpp +++ b/example/publisher.cpp @@ -46,7 +46,7 @@ int main() { asio::io_context ioc; // Spawn the coroutine. - asio::co_spawn(ioc.get_executor(), client_publisher(ioc), asio::detached); + co_spawn(ioc.get_executor(), client_publisher(ioc), asio::detached); // Start the execution. ioc.run(); diff --git a/example/receiver.cpp b/example/receiver.cpp index 3f3a0ab..4404bab 100644 --- a/example/receiver.cpp +++ b/example/receiver.cpp @@ -71,7 +71,7 @@ int main() { asio::io_context ioc; // Spawn the coroutine. - asio::co_spawn(ioc, client_receiver(ioc), asio::detached); + co_spawn(ioc, client_receiver(ioc), asio::detached); // Start the execution. ioc.run(); diff --git a/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp b/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp index b87f341..c13bd58 100644 --- a/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp +++ b/include/async_mqtt5/impl/internal/codecs/message_decoders.hpp @@ -54,7 +54,8 @@ inline std::optional decode_connect( x3::big_word >> // keep_alive prop::props_; - auto vh = type_parse(it, it + remain_length, var_header_); + const byte_citer end = it + remain_length; + auto vh = type_parse(it, end, var_header_); if (!vh) return std::optional{}; @@ -75,7 +76,7 @@ inline std::optional decode_connect( basic::if_(has_uname)[basic::utf8_] >> // username basic::if_(has_pwd)[basic::utf8_]; // password - auto pload = type_parse(it, it + remain_length, payload_); + auto pload = type_parse(it, end, payload_); if (!pload) return std::optional{}; diff --git a/test/unit/test/cancellation.cpp b/test/unit/test/cancellation.cpp index 0df4920..d550c47 100644 --- a/test/unit/test/cancellation.cpp +++ b/test/unit/test/cancellation.cpp @@ -1,5 +1,8 @@ #include +#include +#include +#include #include #include #include @@ -8,6 +11,8 @@ using namespace async_mqtt5; +constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable); + namespace async_mqtt5::test { enum cancellation_type { @@ -153,7 +158,7 @@ void cancel_during_connecting() { } -BOOST_AUTO_TEST_SUITE(cancellation) +BOOST_AUTO_TEST_SUITE(cancellation/*, *boost::unit_test::disabled()*/) BOOST_AUTO_TEST_CASE(ioc_stop_async_receive) { @@ -183,64 +188,45 @@ BOOST_AUTO_TEST_CASE(client_cancel_during_connecting) { cancel_during_connecting(); } - -// last two ec checks fail, they finish with no_recovery when -// you don't call c.cancel() BOOST_AUTO_TEST_CASE(rerunning_the_client) { - constexpr int expected_handlers_called = 3; - int handlers_called = 0; - asio::io_context ioc; - using stream_type = asio::ip::tcp::socket; - using client_type = mqtt_client; - client_type c(ioc, ""); + co_spawn(ioc, + [&ioc]() -> asio::awaitable { + using stream_type = asio::ip::tcp::socket; + using client_type = mqtt_client; + client_type c(ioc, ""); - c.brokers("mqtt.mireo.local", 1883) - .run(); + c.brokers("mqtt.mireo.local", 1883) + .credentials("test-cli", "", "") + .run(); - c.async_publish( - "cancelled_topic", "cancelled_payload", retain_e::yes, {}, - [&handlers_called](error_code ec, reason_code rc, pubcomp_props) { - BOOST_CHECK_EQUAL(ec, asio::error::operation_aborted); - BOOST_CHECK_EQUAL(rc, reason_codes::empty); - handlers_called++; - } - ); + auto [ec] = co_await c.async_publish( + "t", "p", retain_e::yes, publish_props {}, use_nothrow_awaitable + ); + BOOST_CHECK(!ec); - asio::post(ioc, [&c] { c.cancel(); }); + c.cancel(); + + auto [cec] = co_await c.async_publish( + "ct", "cp", retain_e::yes, publish_props {}, use_nothrow_awaitable + ); + BOOST_CHECK(cec == asio::error::operation_aborted); - asio::steady_timer cancel_timer(c.get_executor()); - cancel_timer.expires_after(std::chrono::seconds(1)); - cancel_timer.async_wait( - [&](auto) { c.run(); - c.async_publish( - "test/mqtt-test", "payload", retain_e::yes, {}, - [&handlers_called](error_code ec) { - BOOST_CHECK(!ec); - handlers_called++; - } + auto [rec] = co_await c.async_publish( + "ct", "cp", retain_e::yes, publish_props {}, use_nothrow_awaitable ); + BOOST_CHECK(!rec); - c.async_receive([&handlers_called]( - error_code ec, std::string, std::string, publish_props - ) { - BOOST_CHECK(!ec); - handlers_called++; - }); - } + co_await c.async_disconnect(use_nothrow_awaitable); + co_return; + }, + asio::detached ); - asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::seconds(2)); - timer.async_wait([&](auto) { c.cancel(); }); - ioc.run(); - BOOST_CHECK_EQUAL( - handlers_called, expected_handlers_called - ); } BOOST_AUTO_TEST_SUITE_END(); diff --git a/test/unit/test/coroutine.cpp b/test/unit/test/coroutine.cpp index 6213de1..096537e 100644 --- a/test/unit/test/coroutine.cpp +++ b/test/unit/test/coroutine.cpp @@ -133,7 +133,7 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) { } ); - co_spawn(ioc, + co_spawn(ioc, [&]() -> asio::awaitable { co_await sanity_check(c); timer.cancel(); diff --git a/test/unit/test/serialization.cpp b/test/unit/test/serialization.cpp index b875065..dd2bdbb 100644 --- a/test/unit/test/serialization.cpp +++ b/test/unit/test/serialization.cpp @@ -143,7 +143,7 @@ BOOST_AUTO_TEST_CASE(test_puback) { BOOST_CHECK_EQUAL(*packet_id_, packet_id); const auto& [control_byte, remain_length] = *header; - auto rv = decoders::decode_puback(remain_length, it); + auto rv = decoders::decode_puback(remain_length - sizeof(uint16_t), it); BOOST_CHECK_MESSAGE(rv, "Parsing PUBACK failed."); const auto& [reason_code_, pprops] = *rv;