forked from Kistler-Group/sdbus-cpp
feat: add Slot-returning overloads of async method calls (#433)
This commit is contained in:
113
src/Proxy.cpp
113
src/Proxy.cpp
@@ -94,6 +94,11 @@ MethodCall Proxy::createMethodCall(const char* interfaceName, const char* method
|
||||
return connection_->createMethodCall(destination_.c_str(), objectPath_.c_str(), interfaceName, methodName);
|
||||
}
|
||||
|
||||
MethodReply Proxy::callMethod(const MethodCall& message)
|
||||
{
|
||||
return Proxy::callMethod(message, /*timeout*/ 0);
|
||||
}
|
||||
|
||||
MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
|
||||
{
|
||||
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid method call message provided", EINVAL);
|
||||
@@ -101,19 +106,44 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
|
||||
return connection_->callMethod(message, timeout);
|
||||
}
|
||||
|
||||
PendingAsyncCall Proxy::callMethodAsync(const MethodCall& message, async_reply_handler asyncReplyCallback)
|
||||
{
|
||||
return Proxy::callMethodAsync(message, std::move(asyncReplyCallback), /*timeout*/ 0);
|
||||
}
|
||||
|
||||
Slot Proxy::callMethodAsync(const MethodCall& message, async_reply_handler asyncReplyCallback, return_slot_t)
|
||||
{
|
||||
return Proxy::callMethodAsync(message, std::move(asyncReplyCallback), /*timeout*/ 0, return_slot);
|
||||
}
|
||||
|
||||
PendingAsyncCall Proxy::callMethodAsync(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout)
|
||||
{
|
||||
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL);
|
||||
|
||||
auto callback = (void*)&Proxy::sdbus_async_reply_handler;
|
||||
auto callData = std::make_shared<AsyncCalls::CallData>(AsyncCalls::CallData{*this, std::move(asyncReplyCallback)});
|
||||
auto weakData = std::weak_ptr<AsyncCalls::CallData>{callData};
|
||||
auto asyncCallInfo = std::make_shared<AsyncCallInfo>(AsyncCallInfo{ .callback = std::move(asyncReplyCallback)
|
||||
, .proxy = *this
|
||||
, .floating = false });
|
||||
|
||||
callData->slot = connection_->callMethod(message, callback, callData.get(), timeout);
|
||||
asyncCallInfo->slot = connection_->callMethod(message, (void*)&Proxy::sdbus_async_reply_handler, asyncCallInfo.get(), timeout);
|
||||
|
||||
pendingAsyncCalls_.addCall(std::move(callData));
|
||||
auto asyncCallInfoWeakPtr = std::weak_ptr{asyncCallInfo};
|
||||
|
||||
return {weakData};
|
||||
floatingAsyncCallSlots_.push_back(std::move(asyncCallInfo));
|
||||
|
||||
return {asyncCallInfoWeakPtr};
|
||||
}
|
||||
|
||||
Slot Proxy::callMethodAsync(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout, return_slot_t)
|
||||
{
|
||||
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL);
|
||||
|
||||
auto asyncCallInfo = std::make_unique<AsyncCallInfo>(AsyncCallInfo{ .callback = std::move(asyncReplyCallback)
|
||||
, .proxy = *this
|
||||
, .floating = true });
|
||||
|
||||
asyncCallInfo->slot = connection_->callMethod(message, (void*)&Proxy::sdbus_async_reply_handler, asyncCallInfo.get(), timeout);
|
||||
|
||||
return {asyncCallInfo.release(), [](void *ptr){ delete static_cast<AsyncCallInfo*>(ptr); }};
|
||||
}
|
||||
|
||||
std::future<MethodReply> Proxy::callMethodAsync(const MethodCall& message, with_future_t)
|
||||
@@ -186,7 +216,7 @@ Slot Proxy::registerSignalHandler( const char* interfaceName
|
||||
|
||||
void Proxy::unregister()
|
||||
{
|
||||
pendingAsyncCalls_.clear();
|
||||
floatingAsyncCallSlots_.clear();
|
||||
floatingSignalSlots_.clear();
|
||||
}
|
||||
|
||||
@@ -207,17 +237,17 @@ Message Proxy::getCurrentlyProcessedMessage() const
|
||||
|
||||
int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError)
|
||||
{
|
||||
auto* asyncCallData = static_cast<AsyncCalls::CallData*>(userData);
|
||||
assert(asyncCallData != nullptr);
|
||||
assert(asyncCallData->callback);
|
||||
auto& proxy = asyncCallData->proxy;
|
||||
auto* asyncCallInfo = static_cast<AsyncCallInfo*>(userData);
|
||||
assert(asyncCallInfo != nullptr);
|
||||
assert(asyncCallInfo->callback);
|
||||
auto& proxy = asyncCallInfo->proxy;
|
||||
|
||||
// We are removing the CallData item at the complete scope exit, after the callback has been invoked.
|
||||
// We can't do it earlier (before callback invocation for example), because CallBack data (slot release)
|
||||
// is the synchronization point between callback invocation and Proxy::unregister.
|
||||
SCOPE_EXIT
|
||||
{
|
||||
proxy.pendingAsyncCalls_.removeCall(asyncCallData);
|
||||
proxy.floatingAsyncCallSlots_.erase(asyncCallInfo);
|
||||
};
|
||||
|
||||
auto message = Message::Factory::create<MethodReply>(sdbusMessage, &proxy.connection_->getSdBusInterface());
|
||||
@@ -227,12 +257,12 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
|
||||
const auto* error = sd_bus_message_get_error(sdbusMessage);
|
||||
if (error == nullptr)
|
||||
{
|
||||
asyncCallData->callback(std::move(message), {});
|
||||
asyncCallInfo->callback(std::move(message), {});
|
||||
}
|
||||
else
|
||||
{
|
||||
Error exception(Error::Name{error->name}, error->message);
|
||||
asyncCallData->callback(std::move(message), std::move(exception));
|
||||
asyncCallInfo->callback(std::move(message), std::move(exception));
|
||||
}
|
||||
}, retError);
|
||||
|
||||
@@ -253,21 +283,64 @@ int Proxy::sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd
|
||||
return ok ? 0 : -1;
|
||||
}
|
||||
|
||||
Proxy::FloatingAsyncCallSlots::~FloatingAsyncCallSlots()
|
||||
{
|
||||
clear();
|
||||
}
|
||||
|
||||
void Proxy::FloatingAsyncCallSlots::push_back(std::shared_ptr<AsyncCallInfo> asyncCallInfo)
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
if (!asyncCallInfo->finished) // The call may have finished in the meantime
|
||||
slots_.emplace_back(std::move(asyncCallInfo));
|
||||
}
|
||||
|
||||
void Proxy::FloatingAsyncCallSlots::erase(AsyncCallInfo* info)
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
info->finished = true;
|
||||
auto it = std::find_if(slots_.begin(), slots_.end(), [info](auto const& entry){ return entry.get() == info; });
|
||||
if (it != slots_.end())
|
||||
{
|
||||
auto callInfo = std::move(*it);
|
||||
slots_.erase(it);
|
||||
lock.unlock();
|
||||
|
||||
// Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release
|
||||
// out of the `mutex_' critical section here, because if the `removeCall` is called by some
|
||||
// thread and at the same time Proxy's async reply handler (which already holds global sd-bus
|
||||
// mutex) is in progress in a different thread, we get double-mutex deadlock.
|
||||
}
|
||||
}
|
||||
|
||||
void Proxy::FloatingAsyncCallSlots::clear()
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
auto asyncCallSlots = std::move(slots_);
|
||||
slots_ = {};
|
||||
lock.unlock();
|
||||
|
||||
// Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release
|
||||
// out of the `mutex_' critical section here, because if the `clear` is called by some thread
|
||||
// and at the same time Proxy's async reply handler (which already holds global sd-bus
|
||||
// mutex) is in progress in a different thread, we get double-mutex deadlock.
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace sdbus {
|
||||
|
||||
PendingAsyncCall::PendingAsyncCall(std::weak_ptr<void> callData)
|
||||
: callData_(std::move(callData))
|
||||
PendingAsyncCall::PendingAsyncCall(std::weak_ptr<void> callInfo)
|
||||
: callInfo_(std::move(callInfo))
|
||||
{
|
||||
}
|
||||
|
||||
void PendingAsyncCall::cancel()
|
||||
{
|
||||
if (auto ptr = callData_.lock(); ptr != nullptr)
|
||||
if (auto ptr = callInfo_.lock(); ptr != nullptr)
|
||||
{
|
||||
auto* callData = static_cast<internal::Proxy::AsyncCalls::CallData*>(ptr.get());
|
||||
callData->proxy.pendingAsyncCalls_.removeCall(callData);
|
||||
auto* asyncCallInfo = static_cast<internal::Proxy::AsyncCallInfo*>(ptr.get());
|
||||
asyncCallInfo->proxy.floatingAsyncCallSlots_.erase(asyncCallInfo);
|
||||
|
||||
// At this point, the callData item is being deleted, leading to the release of the
|
||||
// sd-bus slot pointer. This release locks the global sd-bus mutex. If the async
|
||||
@@ -278,7 +351,7 @@ void PendingAsyncCall::cancel()
|
||||
|
||||
bool PendingAsyncCall::isPending() const
|
||||
{
|
||||
return !callData_.expired();
|
||||
return !callInfo_.expired();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user