remove thread mutex from async_mutex

Summary: also fixes per-operation-cancelled operations preventing unlock of mutex

Reviewers: ivica

Reviewed By: ivica

Subscribers: korina, miljen

Differential Revision: https://repo.mireo.local/D32427
This commit is contained in:
Bruno Iljazovic
2024-11-27 13:28:31 +01:00
parent 944de413a3
commit 9240c51b28
3 changed files with 11 additions and 89 deletions

View File

@ -23,7 +23,6 @@
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
#include <async_mqtt5/detail/async_traits.hpp> #include <async_mqtt5/detail/async_traits.hpp>
#include <async_mqtt5/detail/spinlock.hpp>
namespace async_mqtt5::detail { namespace async_mqtt5::detail {
@ -84,21 +83,16 @@ private:
// The helper stores queue iterator to operation since the iterator // The helper stores queue iterator to operation since the iterator
// would not be invalidated by other queue operations. // would not be invalidated by other queue operations.
class cancel_waiting_op { class cancel_waiting_op {
async_mutex& _owner;
queue_t::iterator _ihandler; queue_t::iterator _ihandler;
public: public:
cancel_waiting_op(async_mutex& owner, queue_t::iterator ih) : explicit cancel_waiting_op(queue_t::iterator ih) : _ihandler(ih) {}
_owner(owner), _ihandler(ih)
{}
void operator()(asio::cancellation_type_t type) { void operator()(asio::cancellation_type_t type) {
if (type == asio::cancellation_type_t::none) if (type == asio::cancellation_type_t::none)
return; return;
std::unique_lock l { _owner._thread_mutex };
if (*_ihandler) { if (*_ihandler) {
auto h = std::move(*_ihandler); auto h = std::move(*_ihandler);
auto ex = asio::get_associated_executor(h); auto ex = asio::get_associated_executor(h);
l.unlock();
asio::require(ex, asio::execution::blocking.possibly) asio::require(ex, asio::execution::blocking.possibly)
.execute([h = std::move(h)]() mutable { .execute([h = std::move(h)]() mutable {
std::move(h)(asio::error::operation_aborted); std::move(h)(asio::error::operation_aborted);
@ -107,8 +101,7 @@ private:
} }
}; };
spinlock _thread_mutex; bool _locked { false };
std::atomic<bool> _locked { false };
queue_t _waiting; queue_t _waiting;
executor_type _ex; executor_type _ex;
@ -128,7 +121,7 @@ public:
} }
bool is_locked() const noexcept { bool is_locked() const noexcept {
return _locked.load(std::memory_order_relaxed); return _locked;
} }
// Schedules mutex for lock operation and return immediately. // Schedules mutex for lock operation and return immediately.
@ -151,26 +144,19 @@ public:
// Next queued operation, if any, will be executed in a manner // Next queued operation, if any, will be executed in a manner
// equivalent to asio::post. // equivalent to asio::post.
void unlock() { void unlock() {
std::unique_lock l { _thread_mutex };
if (_waiting.empty()) {
_locked.store(false, std::memory_order_release);
return;
}
while (!_waiting.empty()) { while (!_waiting.empty()) {
auto op = std::move(_waiting.front()); auto op = std::move(_waiting.front());
_waiting.pop_front(); _waiting.pop_front();
if (!op) continue; if (!op) continue;
op.get_cancellation_slot().clear(); op.get_cancellation_slot().clear();
l.unlock();
execute_op(std::move(op)); execute_op(std::move(op));
break; return;
} }
_locked = false;
} }
// Cancels all outstanding operations waiting on the mutex. // Cancels all outstanding operations waiting on the mutex.
void cancel() { void cancel() {
std::unique_lock l { _thread_mutex };
while (!_waiting.empty()) { while (!_waiting.empty()) {
auto op = std::move(_waiting.front()); auto op = std::move(_waiting.front());
_waiting.pop_front(); _waiting.pop_front();
@ -211,19 +197,17 @@ private:
// to asio::post to avoid recursion. // to asio::post to avoid recursion.
template <typename Handler> template <typename Handler>
void execute_or_queue(Handler&& handler) noexcept { void execute_or_queue(Handler&& handler) noexcept {
std::unique_lock l { _thread_mutex };
tracked_op h { std::move(handler), _ex }; tracked_op h { std::move(handler), _ex };
if (_locked.load(std::memory_order_relaxed)) { if (_locked) {
_waiting.emplace_back(std::move(h)); _waiting.emplace_back(std::move(h));
auto slot = _waiting.back().get_cancellation_slot(); auto slot = _waiting.back().get_cancellation_slot();
if (slot.is_connected()) if (slot.is_connected())
slot.template emplace<cancel_waiting_op>( slot.template emplace<cancel_waiting_op>(
*this, _waiting.end() - 1 _waiting.end() - 1
); );
} }
else { else {
_locked.store(true, std::memory_order_release); _locked = true;
l.unlock();
execute_op(queued_op_t { std::move(h) }); execute_op(queued_op_t { std::move(h) });
} }
} }

View File

@ -1,65 +0,0 @@
//
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASYNC_MQTT5_SPINLOCK_HPP
#define ASYNC_MQTT5_SPINLOCK_HPP
#include <atomic>
namespace async_mqtt5::detail {
#if defined(_MSC_VER)
/* prefer using intrinsics directly instead of winnt.h macro */
/* http://software.intel.com/en-us/forums/topic/296168 */
//#include <intrin.h>
#if defined(_M_AMD64) || defined(_M_IX86)
#pragma intrinsic(_mm_pause)
#define __pause() _mm_pause()
/* (if pause not supported by older x86 assembler, "rep nop" is equivalent)*/
/*#define __pause() __asm rep nop */
#elif defined(_M_IA64)
#pragma intrinsic(__yield)
#define __pause() __yield()
#else
#define __pause() YieldProcessor()
#endif
#elif defined(__x86_64__) || defined(__i386__)
#define __pause() __asm__ __volatile__ ("pause")
#elif defined(__arm__) || defined(__arm64__) || defined(__aarch64__)
#define __pause() __asm__ __volatile__ ("yield")
#endif
// https://rigtorp.se/spinlock/
class spinlock {
std::atomic<bool> lock_ { false };
public:
void lock() noexcept {
for (;;) {
// Optimistically assume the lock is free on the first try
if (!lock_.exchange(true, std::memory_order_acquire))
return;
// Wait for lock to be released without generating cache misses
while (lock_.load(std::memory_order_relaxed)) __pause();
}
}
bool try_lock() noexcept {
// First do a relaxed load to check if lock is free in order to prevent
// unnecessary cache misses if someone does while(!try_lock())
return !lock_.load(std::memory_order_relaxed) &&
!lock_.exchange(true, std::memory_order_acquire);
}
void unlock() noexcept {
lock_.store(false, std::memory_order_release);
}
};
} // end namespace async_mqtt5::detail
#endif // !ASYNC_MQTT5_SPINLOCK_HPP

View File

@ -90,6 +90,7 @@ BOOST_AUTO_TEST_CASE(bind_executor) {
tp.wait(); tp.wait();
BOOST_TEST(handlers_called == expected_handlers_called); BOOST_TEST(handlers_called == expected_handlers_called);
BOOST_TEST(!mutex.is_locked());
} }
BOOST_AUTO_TEST_CASE(per_op_cancellation) { BOOST_AUTO_TEST_CASE(per_op_cancellation) {
@ -124,6 +125,7 @@ BOOST_AUTO_TEST_CASE(per_op_cancellation) {
ioc.run(); ioc.run();
BOOST_TEST(handlers_called == expected_handlers_called); BOOST_TEST(handlers_called == expected_handlers_called);
BOOST_TEST(!mutex.is_locked());
} }
BOOST_AUTO_TEST_CASE(cancel_ops_by_destructor) { BOOST_AUTO_TEST_CASE(cancel_ops_by_destructor) {
@ -180,6 +182,7 @@ BOOST_AUTO_TEST_CASE(cancel_ops) {
mutex.cancel(); mutex.cancel();
ioc.run(); ioc.run();
BOOST_TEST(handlers_called == expected_handlers_called); BOOST_TEST(handlers_called == expected_handlers_called);
BOOST_TEST(!mutex.is_locked());
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()