#ifndef ASYNC_MQTT5_ASYNC_MUTEX_HPP #define ASYNC_MQTT5_ASYNC_MUTEX_HPP #include #include #include #include #include #include #include #include namespace async_mqtt5::detail { namespace asio = boost::asio; using error_code = boost::system::error_code; class async_mutex { public: using executor_type = asio::any_io_executor; private: using queued_op_t = asio::any_completion_handler< void (error_code) >; using queue_t = ring_buffer; // Handler with assigned tracking executor. // Objects of this type are type-erased by any_completion_handler // and stored in the waiting queue. template class tracked_op { tracking_type _executor; std::decay_t _handler; public: tracked_op(Handler&& h) : _executor(tracking_executor(h)), _handler(std::move(h)) {} tracked_op(tracked_op&&) noexcept = default; tracked_op(const tracked_op&) = delete; using executor_type = tracking_type; executor_type get_executor() const noexcept { return _executor; } using allocator_type = asio::associated_allocator_t; allocator_type get_allocator() const noexcept { return asio::get_associated_allocator(_handler); } using cancellation_slot_type = asio::associated_cancellation_slot_t; cancellation_slot_type get_cancellation_slot() const noexcept { return asio::get_associated_cancellation_slot(_handler); } void operator()(error_code ec) { std::move(_handler)(ec); } }; // Per-operation cancellation helper. // It is safe to emit the cancellation signal from any thread // provided there are no other concurrent calls to the async_mutex. // The helper stores queue iterator to operation since the iterator // would not be invalidated by other queue operations. class cancel_waiting_op { async_mutex& _owner; queue_t::iterator _ihandler; public: cancel_waiting_op(async_mutex& owner, queue_t::iterator ih) : _owner(owner), _ihandler(ih) {} void operator()(asio::cancellation_type_t type) { if (type == asio::cancellation_type_t::none) return; std::unique_lock l { _owner._thread_mutex }; if (*_ihandler) { auto h = std::move(*_ihandler); auto ex = asio::get_associated_executor(h); l.unlock(); asio::require(ex, asio::execution::blocking.possibly) .execute([h = std::move(h)]() mutable { std::move(h)(asio::error::operation_aborted); }); } } }; spinlock _thread_mutex; std::atomic _locked { false }; queue_t _waiting; executor_type _ex; public: template async_mutex(Executor&& ex) : _ex(std::forward(ex)) {} async_mutex(const async_mutex&) = delete; async_mutex& operator=(const async_mutex&) = delete; ~async_mutex() { cancel(); } const executor_type& get_executor() const noexcept { return _ex; } bool is_locked() const noexcept { return _locked.load(std::memory_order_relaxed); } // Schedules mutex for lock operation and return immediately. // Calls given completion handler when mutex is locked. // It's the responsibility of the completion handler to unlock the mutex. template decltype(auto) lock(CompletionToken&& token) noexcept { auto initiation = [this] (auto handler) { this->execute_or_queue(std::move(handler)); }; return asio::async_initiate( std::move(initiation), token ); } // Unlocks the mutex. The mutex must be in locked state. // Next queued operation, if any, will be executed in a manner // equivalent to asio::post. void unlock() { std::unique_lock l { _thread_mutex }; if (_waiting.empty()) { _locked.store(false, std::memory_order_release); return; } while (!_waiting.empty()) { auto op = std::move(_waiting.front()); _waiting.pop_front(); if (!op) continue; op.get_cancellation_slot().clear(); l.unlock(); execute_op(std::move(op)); break; } } // Cancels all outstanding operations waiting on the mutex. void cancel() { std::unique_lock l { _thread_mutex }; while (!_waiting.empty()) { auto op = std::move(_waiting.front()); _waiting.pop_front(); if (!op) continue; op.get_cancellation_slot().clear(); auto ex = asio::get_associated_executor(op, _ex); asio::require(ex, asio::execution::blocking.possibly) .execute([op = std::move(op)]() mutable { std::move(op)(asio::error::operation_aborted); }); } } private: // Schedule operation to `opex` executor using `_ex` executor. // The operation is equivalent to asio::post(_ex, op) but // for some reason this form of execution is much faster. void execute_op(queued_op_t op) { asio::require(_ex, asio::execution::blocking.never) .execute([ex = _ex, op = std::move(op)]() mutable { auto opex = asio::get_associated_executor(op, ex); opex.execute( [op = std::move(op)]() mutable { op(error_code{}); } ); }); } // Executes operation immediatelly if mutex is not locked // or queues it for later execution otherwise. In both cases // the operation will be executed in a manner equivalent // to asio::post to avoid recursion. void execute_or_queue(auto handler) noexcept { std::unique_lock l { _thread_mutex }; tracked_op h { std::move(handler) }; if (_locked.load(std::memory_order_relaxed)) { _waiting.emplace_back(std::move(h)); auto slot = _waiting.back().get_cancellation_slot(); if (slot.is_connected()) slot.template emplace( *this, _waiting.end() - 1 ); } else { _locked.store(true, std::memory_order_release); l.unlock(); execute_op(queued_op_t { std::move(h) }); } } }; } // end namespace async_mqtt5::detail #endif // !ASYNC_MQTT5_ASYNC_MUTEX_HPP