forked from Kistler-Group/sdbus-cpp
fix: race condition in async Proxy::callMethod
Signed-off-by: Anthony Brandon <anthony@amarulasolutions.com>
This commit is contained in:
committed by
Stanislav Angelovič
parent
737f04abc7
commit
c39bc637b8
@ -125,8 +125,7 @@ PendingAsyncCall Proxy::callMethod(const MethodCall& message, async_reply_handle
|
|||||||
|
|
||||||
callData->slot = message.send(callback, callData.get(), timeout);
|
callData->slot = message.send(callback, callData.get(), timeout);
|
||||||
|
|
||||||
auto slotPtr = callData->slot.get();
|
pendingAsyncCalls_.addCall(std::move(callData));
|
||||||
pendingAsyncCalls_.addCall(slotPtr, std::move(callData));
|
|
||||||
|
|
||||||
return {weakData};
|
return {weakData};
|
||||||
}
|
}
|
||||||
@ -276,7 +275,6 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
|
|||||||
assert(asyncCallData != nullptr);
|
assert(asyncCallData != nullptr);
|
||||||
assert(asyncCallData->callback);
|
assert(asyncCallData->callback);
|
||||||
auto& proxy = asyncCallData->proxy;
|
auto& proxy = asyncCallData->proxy;
|
||||||
auto slot = asyncCallData->slot.get();
|
|
||||||
|
|
||||||
// We are removing the CallData item at the complete scope exit, after the callback has been invoked.
|
// We are removing the CallData item at the complete scope exit, after the callback has been invoked.
|
||||||
// We can't do it earlier (before callback invocation for example), because CallBack data (slot release)
|
// We can't do it earlier (before callback invocation for example), because CallBack data (slot release)
|
||||||
@ -284,8 +282,7 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
|
|||||||
SCOPE_EXIT
|
SCOPE_EXIT
|
||||||
{
|
{
|
||||||
// Remove call meta-data if it's a real async call (a sync call done in terms of async has slot == nullptr)
|
// Remove call meta-data if it's a real async call (a sync call done in terms of async has slot == nullptr)
|
||||||
if (slot)
|
proxy.pendingAsyncCalls_.removeCall(asyncCallData);
|
||||||
proxy.pendingAsyncCalls_.removeCall(slot);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
auto message = Message::Factory::create<MethodReply>(sdbusMessage, &proxy.connection_->getSdBusInterface());
|
auto message = Message::Factory::create<MethodReply>(sdbusMessage, &proxy.connection_->getSdBusInterface());
|
||||||
@ -357,7 +354,7 @@ void PendingAsyncCall::cancel()
|
|||||||
if (auto ptr = callData_.lock(); ptr != nullptr)
|
if (auto ptr = callData_.lock(); ptr != nullptr)
|
||||||
{
|
{
|
||||||
auto* callData = static_cast<internal::Proxy::AsyncCalls::CallData*>(ptr.get());
|
auto* callData = static_cast<internal::Proxy::AsyncCalls::CallData*>(ptr.get());
|
||||||
callData->proxy.pendingAsyncCalls_.removeCall(callData->slot.get());
|
callData->proxy.pendingAsyncCalls_.removeCall(callData);
|
||||||
|
|
||||||
// At this point, the callData item is being deleted, leading to the release of the
|
// At this point, the callData item is being deleted, leading to the release of the
|
||||||
// sd-bus slot pointer. This release locks the global sd-bus mutex. If the async
|
// sd-bus slot pointer. This release locks the global sd-bus mutex. If the async
|
||||||
|
17
src/Proxy.h
17
src/Proxy.h
@ -33,7 +33,7 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <unordered_map>
|
#include <deque>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
@ -138,6 +138,7 @@ namespace sdbus::internal {
|
|||||||
Proxy& proxy;
|
Proxy& proxy;
|
||||||
async_reply_handler callback;
|
async_reply_handler callback;
|
||||||
Slot slot;
|
Slot slot;
|
||||||
|
bool finished { false };
|
||||||
};
|
};
|
||||||
|
|
||||||
~AsyncCalls()
|
~AsyncCalls()
|
||||||
@ -145,18 +146,20 @@ namespace sdbus::internal {
|
|||||||
clear();
|
clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool addCall(void* slot, std::shared_ptr<CallData> asyncCallData)
|
void addCall(std::shared_ptr<CallData> asyncCallData)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex_);
|
std::lock_guard lock(mutex_);
|
||||||
return calls_.emplace(slot, std::move(asyncCallData)).second;
|
if (!asyncCallData->finished) // The call may have finished in the mean time
|
||||||
|
calls_.emplace_back(std::move(asyncCallData));
|
||||||
}
|
}
|
||||||
|
|
||||||
void removeCall(void* slot)
|
void removeCall(CallData* data)
|
||||||
{
|
{
|
||||||
std::unique_lock lock(mutex_);
|
std::unique_lock lock(mutex_);
|
||||||
if (auto it = calls_.find(slot); it != calls_.end())
|
data->finished = true;
|
||||||
|
if (auto it = std::find_if(calls_.begin(), calls_.end(), [data](auto const& entry){ return entry.get() == data; }); it != calls_.end())
|
||||||
{
|
{
|
||||||
auto callData = std::move(it->second);
|
auto callData = std::move(*it);
|
||||||
calls_.erase(it);
|
calls_.erase(it);
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
@ -182,7 +185,7 @@ namespace sdbus::internal {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
std::mutex mutex_;
|
std::mutex mutex_;
|
||||||
std::unordered_map<void*, std::shared_ptr<CallData>> calls_;
|
std::deque<std::shared_ptr<CallData>> calls_;
|
||||||
} pendingAsyncCalls_;
|
} pendingAsyncCalls_;
|
||||||
|
|
||||||
std::atomic<const Message*> m_CurrentlyProcessedMessage{nullptr};
|
std::atomic<const Message*> m_CurrentlyProcessedMessage{nullptr};
|
||||||
|
Reference in New Issue
Block a user