forked from Kistler-Group/sdbus-cpp
Introduce support for cancellable async calls
This commit is contained in:
committed by
Stanislav Angelovič
parent
e91bedd4cb
commit
00d0837d98
@@ -93,16 +93,20 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
|
||||
return sendMethodCallMessageAndWaitForReply(message, timeout);
|
||||
}
|
||||
|
||||
void Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout)
|
||||
PendingAsyncCall Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout)
|
||||
{
|
||||
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL);
|
||||
|
||||
auto callback = (void*)&Proxy::sdbus_async_reply_handler;
|
||||
auto callData = std::make_unique<AsyncCalls::CallData>(AsyncCalls::CallData{*this, std::move(asyncReplyCallback), {}});
|
||||
auto callData = std::make_shared<AsyncCalls::CallData>(AsyncCalls::CallData{*this, std::move(asyncReplyCallback), {}});
|
||||
auto weakData = std::weak_ptr{callData};
|
||||
|
||||
callData->slot = message.send(callback, callData.get(), timeout);
|
||||
|
||||
pendingAsyncCalls_.addCall(callData->slot.get(), std::move(callData));
|
||||
auto slotPtr = callData->slot.get();
|
||||
pendingAsyncCalls_.addCall(slotPtr, std::move(callData));
|
||||
|
||||
return {weakData};
|
||||
}
|
||||
|
||||
MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout)
|
||||
@@ -124,7 +128,7 @@ MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& messag
|
||||
void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error)
|
||||
{
|
||||
SCOPE_EXIT{ cond_.notify_one(); };
|
||||
std::unique_lock<std::mutex> lock{mutex_};
|
||||
std::unique_lock lock{mutex_};
|
||||
SCOPE_EXIT{ arrived_ = true; };
|
||||
|
||||
//error_ = nullptr; // Necessary if SyncCallReplyData instance is thread_local
|
||||
@@ -137,7 +141,7 @@ void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply
|
||||
|
||||
MethodReply Proxy::SyncCallReplyData::waitForMethodReply()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock{mutex_};
|
||||
std::unique_lock lock{mutex_};
|
||||
cond_.wait(lock, [this](){ return arrived_; });
|
||||
|
||||
//arrived_ = false; // Necessary if SyncCallReplyData instance is thread_local
|
||||
@@ -202,11 +206,12 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
|
||||
auto& proxy = asyncCallData->proxy;
|
||||
auto slot = asyncCallData->slot.get();
|
||||
|
||||
// We are removing the CallData item at the complete scope exit, after the callback has been invoked.
|
||||
// We can't do it earlier (before callback invocation for example), because CallBack data (slot release)
|
||||
// is the synchronization point between callback invocation and Proxy::unregister.
|
||||
SCOPE_EXIT
|
||||
{
|
||||
// Slot will be nullptr in case of synchronous call. In that case, the call data lives on the call stack,
|
||||
// in another thread. But that thread may have already been woken up by now and cleared its call stack,
|
||||
// so we can't access asyncCallData here. Hence we save the slot pointer at the beginning of this function.
|
||||
// Remove call meta-data if it's a real async call (a sync call done in terms of async has slot == nullptr)
|
||||
if (slot)
|
||||
proxy.pendingAsyncCalls_.removeCall(slot);
|
||||
};
|
||||
@@ -247,6 +252,34 @@ int Proxy::sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd
|
||||
|
||||
namespace sdbus {
|
||||
|
||||
PendingAsyncCall::PendingAsyncCall(std::weak_ptr<void> callData)
|
||||
: callData_(std::move(callData))
|
||||
{
|
||||
}
|
||||
|
||||
void PendingAsyncCall::cancel()
|
||||
{
|
||||
if (auto ptr = callData_.lock(); ptr != nullptr)
|
||||
{
|
||||
auto* callData = static_cast<internal::Proxy::AsyncCalls::CallData*>(ptr.get());
|
||||
callData->proxy.pendingAsyncCalls_.removeCall(callData->slot.get());
|
||||
|
||||
// At this point, the callData item is being deleted, leading to the release of the
|
||||
// sd-bus slot pointer. This release locks the global sd-bus mutex. If the async
|
||||
// callback is currently being processed, the sd-bus mutex is locked by the event
|
||||
// loop thread, thus access to the callData item is synchronized and thread-safe.
|
||||
}
|
||||
}
|
||||
|
||||
bool PendingAsyncCall::isPending() const
|
||||
{
|
||||
return !callData_.expired();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace sdbus {
|
||||
|
||||
std::unique_ptr<sdbus::IProxy> createProxy( IConnection& connection
|
||||
, std::string destination
|
||||
, std::string objectPath )
|
||||
|
||||
Reference in New Issue
Block a user