Introduce support for cancellable async calls

This commit is contained in:
sangelovic
2020-03-29 21:43:20 +02:00
parent e91bedd4cb
commit 4847d61706
9 changed files with 174 additions and 34 deletions

View File

@ -42,6 +42,7 @@ namespace sdbus {
class IProxy; class IProxy;
class Variant; class Variant;
class Error; class Error;
class PendingAsyncCall;
} }
namespace sdbus { namespace sdbus {
@ -193,7 +194,7 @@ namespace sdbus {
template <typename _Rep, typename _Period> template <typename _Rep, typename _Period>
AsyncMethodInvoker& withTimeout(const std::chrono::duration<_Rep, _Period>& timeout); AsyncMethodInvoker& withTimeout(const std::chrono::duration<_Rep, _Period>& timeout);
template <typename... _Args> AsyncMethodInvoker& withArguments(_Args&&... args); template <typename... _Args> AsyncMethodInvoker& withArguments(_Args&&... args);
template <typename _Function> void uponReplyInvoke(_Function&& callback); template <typename _Function> PendingAsyncCall uponReplyInvoke(_Function&& callback);
private: private:
IProxy& proxy_; IProxy& proxy_;

View File

@ -576,7 +576,7 @@ namespace sdbus {
} }
template <typename _Function> template <typename _Function>
void AsyncMethodInvoker::uponReplyInvoke(_Function&& callback) PendingAsyncCall AsyncMethodInvoker::uponReplyInvoke(_Function&& callback)
{ {
assert(method_.isValid()); // onInterface() must be placed/called prior to this function assert(method_.isValid()); // onInterface() must be placed/called prior to this function
@ -594,7 +594,7 @@ namespace sdbus {
sdbus::apply(callback, error, args); sdbus::apply(callback, error, args);
}; };
proxy_.callMethod(method_, std::move(asyncReplyHandler), timeout_); return proxy_.callMethod(method_, std::move(asyncReplyHandler), timeout_);
} }
/*** ---------------- ***/ /*** ---------------- ***/

View File

@ -38,6 +38,10 @@ namespace sdbus {
class MethodCall; class MethodCall;
class MethodReply; class MethodReply;
class IConnection; class IConnection;
class PendingAsyncCall;
namespace internal {
class Proxy;
}
} }
namespace sdbus { namespace sdbus {
@ -108,6 +112,7 @@ namespace sdbus {
* @param[in] message Message representing an async method call * @param[in] message Message representing an async method call
* @param[in] asyncReplyCallback Handler for the async reply * @param[in] asyncReplyCallback Handler for the async reply
* @param[in] timeout Timeout for dbus call in microseconds * @param[in] timeout Timeout for dbus call in microseconds
* @return Cookie for the the pending asynchronous call
* *
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives, * The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided async reply handler will get invoked from the context of the connection * the provided async reply handler will get invoked from the context of the connection
@ -117,7 +122,7 @@ namespace sdbus {
* *
* @throws sdbus::Error in case of failure * @throws sdbus::Error in case of failure
*/ */
virtual void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0; virtual PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0;
/*! /*!
* @copydoc IProxy::callMethod(const MethodCall&,async_reply_handler,uint64_t) * @copydoc IProxy::callMethod(const MethodCall&,async_reply_handler,uint64_t)
@ -263,6 +268,46 @@ namespace sdbus {
[[nodiscard]] PropertySetter setProperty(const std::string& propertyName); [[nodiscard]] PropertySetter setProperty(const std::string& propertyName);
}; };
/********************************************//**
* @class PendingAsyncCall
*
* PendingAsyncCall represents a simple handle type to cancel the delivery
* of the asynchronous D-Bus call result to the application.
*
* The handle is lifetime-independent from the originating Proxy object.
* It's safe to call its methods even after the Proxy has gone.
*
***********************************************/
class PendingAsyncCall
{
public:
/*!
* @brief Cancels the delivery of the pending asynchronous call result
*
* This function effectively removes the callback handler registered to the
* async D-Bus method call result delivery. Does nothing if the call was
* completed already, or if the originating Proxy object has gone meanwhile.
*/
void cancel();
/*!
* @brief Answers whether the asynchronous call is still pending
*
* @return True if the call is pending, false if the call has been fully completed
*
* Pending call in this context means a call whose results have not arrived, or
* have arrived and are currently being processed by the callback handler.
*/
bool isPending() const;
private:
friend internal::Proxy;
PendingAsyncCall(std::weak_ptr<void> callData);
private:
std::weak_ptr<void> callData_;
};
// Out-of-line member definitions // Out-of-line member definitions
template <typename _Rep, typename _Period> template <typename _Rep, typename _Period>

View File

@ -93,16 +93,20 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
return sendMethodCallMessageAndWaitForReply(message, timeout); return sendMethodCallMessageAndWaitForReply(message, timeout);
} }
void Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) PendingAsyncCall Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout)
{ {
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL); SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL);
auto callback = (void*)&Proxy::sdbus_async_reply_handler; auto callback = (void*)&Proxy::sdbus_async_reply_handler;
auto callData = std::make_unique<AsyncCalls::CallData>(AsyncCalls::CallData{*this, std::move(asyncReplyCallback), {}}); auto callData = std::make_shared<AsyncCalls::CallData>(AsyncCalls::CallData{*this, std::move(asyncReplyCallback), {}});
auto weakData = std::weak_ptr{callData};
callData->slot = message.send(callback, callData.get(), timeout); callData->slot = message.send(callback, callData.get(), timeout);
pendingAsyncCalls_.addCall(callData->slot.get(), std::move(callData)); auto slotPtr = callData->slot.get();
pendingAsyncCalls_.addCall(slotPtr, std::move(callData));
return {weakData};
} }
MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout) MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout)
@ -124,7 +128,7 @@ MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& messag
void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error) void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error)
{ {
SCOPE_EXIT{ cond_.notify_one(); }; SCOPE_EXIT{ cond_.notify_one(); };
std::unique_lock<std::mutex> lock{mutex_}; std::unique_lock lock{mutex_};
SCOPE_EXIT{ arrived_ = true; }; SCOPE_EXIT{ arrived_ = true; };
//error_ = nullptr; // Necessary if SyncCallReplyData instance is thread_local //error_ = nullptr; // Necessary if SyncCallReplyData instance is thread_local
@ -137,7 +141,7 @@ void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply
MethodReply Proxy::SyncCallReplyData::waitForMethodReply() MethodReply Proxy::SyncCallReplyData::waitForMethodReply()
{ {
std::unique_lock<std::mutex> lock{mutex_}; std::unique_lock lock{mutex_};
cond_.wait(lock, [this](){ return arrived_; }); cond_.wait(lock, [this](){ return arrived_; });
//arrived_ = false; // Necessary if SyncCallReplyData instance is thread_local //arrived_ = false; // Necessary if SyncCallReplyData instance is thread_local
@ -202,11 +206,12 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
auto& proxy = asyncCallData->proxy; auto& proxy = asyncCallData->proxy;
auto slot = asyncCallData->slot.get(); auto slot = asyncCallData->slot.get();
// We are removing the CallData item at the complete scope exit, after the callback has been invoked.
// We can't do it earlier (before callback invocation for example), because CallBack data (slot release)
// is the synchronization point between callback invocation and Proxy::unregister.
SCOPE_EXIT SCOPE_EXIT
{ {
// Slot will be nullptr in case of synchronous call. In that case, the call data lives on the call stack, // Remove call meta-data if it's a real async call (a sync call done in terms of async has slot == nullptr)
// in another thread. But that thread may have already been woken up by now and cleared its call stack,
// so we can't access asyncCallData here. Hence we save the slot pointer at the beginning of this function.
if (slot) if (slot)
proxy.pendingAsyncCalls_.removeCall(slot); proxy.pendingAsyncCalls_.removeCall(slot);
}; };
@ -247,6 +252,34 @@ int Proxy::sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd
namespace sdbus { namespace sdbus {
PendingAsyncCall::PendingAsyncCall(std::weak_ptr<void> callData)
: callData_(std::move(callData))
{
}
void PendingAsyncCall::cancel()
{
if (auto ptr = callData_.lock(); ptr != nullptr)
{
auto* callData = static_cast<internal::Proxy::AsyncCalls::CallData*>(ptr.get());
callData->proxy.pendingAsyncCalls_.removeCall(callData->slot.get());
// At this point, the callData item is being deleted, leading to the release of the
// sd-bus slot pointer. This release locks the global sd-bus mutex. If the async
// callback is currently being processed, the sd-bus mutex is locked by the event
// loop thread, thus access to the callData item is synchronized and thread-safe.
}
}
bool PendingAsyncCall::isPending() const
{
return !callData_.expired();
}
}
namespace sdbus {
std::unique_ptr<sdbus::IProxy> createProxy( IConnection& connection std::unique_ptr<sdbus::IProxy> createProxy( IConnection& connection
, std::string destination , std::string destination
, std::string objectPath ) , std::string objectPath )

View File

@ -52,7 +52,7 @@ namespace sdbus::internal {
MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override; MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override;
MethodReply callMethod(const MethodCall& message, uint64_t timeout) override; MethodReply callMethod(const MethodCall& message, uint64_t timeout) override;
void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override; PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override;
void registerSignalHandler( const std::string& interfaceName void registerSignalHandler( const std::string& interfaceName
, const std::string& signalName , const std::string& signalName
@ -81,6 +81,8 @@ namespace sdbus::internal {
static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
private: private:
friend PendingAsyncCall;
std::unique_ptr< sdbus::internal::IConnection std::unique_ptr< sdbus::internal::IConnection
, std::function<void(sdbus::internal::IConnection*)> , std::function<void(sdbus::internal::IConnection*)>
> connection_; > connection_;
@ -119,29 +121,42 @@ namespace sdbus::internal {
clear(); clear();
} }
bool addCall(void* slot, std::unique_ptr<CallData>&& asyncCallData) bool addCall(void* slot, std::shared_ptr<CallData> asyncCallData)
{ {
std::lock_guard lock(mutex_); std::lock_guard lock(mutex_);
return calls_.emplace(slot, std::move(asyncCallData)).second; return calls_.emplace(slot, std::move(asyncCallData)).second;
} }
bool removeCall(void* slot) void removeCall(void* slot)
{ {
std::lock_guard lock(mutex_); std::unique_lock lock(mutex_);
return calls_.erase(slot) > 0; if (auto it = calls_.find(slot); it != calls_.end())
{
auto callData = std::move(it->second);
calls_.erase(it);
lock.unlock();
// Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release
// out of the `mutex_' critical section here, because if the `removeCall` is called by some
// thread and at the same time Proxy's async reply handler (which already holds global sd-bus
// mutex) is in progress in a different thread, we get double-mutex deadlock.
}
} }
void clear() void clear()
{ {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock lock(mutex_);
auto asyncCallSlots = std::move(calls_); auto asyncCallSlots = std::move(calls_);
// Perform releasing of sd-bus slots outside of the calls_ critical section which avoids
// double mutex dead lock when the async reply handler is invoked at the same time.
lock.unlock(); lock.unlock();
// Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release
// out of the `mutex_' critical section here, because if the `clear` is called by some thread
// and at the same time Proxy's async reply handler (which already holds global sd-bus
// mutex) is in progress in a different thread, we get double-mutex deadlock.
} }
private: private:
std::unordered_map<void*, std::unique_ptr<CallData>> calls_; std::unordered_map<void*, std::shared_ptr<CallData>> calls_;
std::mutex mutex_; std::mutex mutex_;
} pendingAsyncCalls_; } pendingAsyncCalls_;
}; };

View File

@ -385,6 +385,51 @@ TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide)
ASSERT_THAT(future.get(), Eq(100)); ASSERT_THAT(future.get(), Eq(100));
} }
TEST_F(SdbusTestObject, AnswersThatAsyncCallIsPendingIfItIsInProgress)
{
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){});
auto call = m_proxy->doOperationClientSideAsync(100);
ASSERT_TRUE(call.isPending());
}
TEST_F(SdbusTestObject, CancelsPendingAsyncCallOnClientSide)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); });
auto call = m_proxy->doOperationClientSideAsync(100);
call.cancel();
ASSERT_THAT(future.wait_for(300ms), Eq(std::future_status::timeout));
}
TEST_F(SdbusTestObject, AnswersThatAsyncCallIsNotPendingAfterItHasBeenCancelled)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); });
auto call = m_proxy->doOperationClientSideAsync(100);
call.cancel();
ASSERT_FALSE(call.isPending());
}
TEST_F(SdbusTestObject, AnswersThatAsyncCallIsNotPendingAfterItHasBeenCompleted)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); });
auto call = m_proxy->doOperationClientSideAsync(0);
(void) future.get(); // Wait for the call to finish
ASSERT_FALSE(call.isPending());
}
TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide) TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
{ {
std::promise<uint32_t> promise; std::promise<uint32_t> promise;

View File

@ -156,15 +156,15 @@ public:
return result; return result;
} }
void doOperationClientSideAsync(uint32_t param) sdbus::PendingAsyncCall doOperationClientSideAsync(uint32_t param)
{ {
object_.callMethodAsync("doOperation") return object_.callMethodAsync("doOperation")
.onInterface(INTERFACE_NAME) .onInterface(INTERFACE_NAME)
.withArguments(param) .withArguments(param)
.uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue) .uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue)
{ {
this->onDoOperationReply(returnValue, error); this->onDoOperationReply(returnValue, error);
}); });
} }
void doErroneousOperationClientSideAsync() void doErroneousOperationClientSideAsync()

View File

@ -33,9 +33,9 @@ protected:
virtual void onConcatenateReply(const std::string& result, const sdbus::Error* error) = 0; virtual void onConcatenateReply(const std::string& result, const sdbus::Error* error) = 0;
public: public:
void concatenate(const std::map<std::string, sdbus::Variant>& params) sdbus::PendingAsyncCall concatenate(const std::map<std::string, sdbus::Variant>& params)
{ {
proxy_.callMethodAsync("concatenate").onInterface(INTERFACE_NAME).withArguments(params).uponReplyInvoke([this](const sdbus::Error* error, const std::string& result){ this->onConcatenateReply(result, error); }); return proxy_.callMethodAsync("concatenate").onInterface(INTERFACE_NAME).withArguments(params).uponReplyInvoke([this](const sdbus::Error* error, const std::string& result){ this->onConcatenateReply(result, error); });
} }
private: private:

View File

@ -173,7 +173,8 @@ std::tuple<std::string, std::string> ProxyGenerator::processMethods(const Nodes&
std::string outArgStr, outArgTypeStr; std::string outArgStr, outArgTypeStr;
std::tie(outArgStr, outArgTypeStr, std::ignore, std::ignore) = argsToNamesAndTypes(outArgs); std::tie(outArgStr, outArgTypeStr, std::ignore, std::ignore) = argsToNamesAndTypes(outArgs);
definitionSS << tab << (async ? "void" : retType) << " " << name << "(" << inArgTypeStr << ")" << endl const std::string realRetType = (async && !dontExpectReply ? "sdbus::PendingAsyncCall" : async ? "void" : retType);
definitionSS << tab << realRetType << " " << name << "(" << inArgTypeStr << ")" << endl
<< tab << "{" << endl; << tab << "{" << endl;
if (!timeoutValue.empty()) if (!timeoutValue.empty())
@ -186,8 +187,8 @@ std::tuple<std::string, std::string> ProxyGenerator::processMethods(const Nodes&
definitionSS << tab << tab << retType << " result;" << endl; definitionSS << tab << tab << retType << " result;" << endl;
} }
definitionSS << tab << tab << "proxy_.callMethod" << (async ? "Async" : "") << "(\"" << name << "\")" definitionSS << tab << tab << (async && !dontExpectReply ? "return " : "")
".onInterface(INTERFACE_NAME)"; << "proxy_.callMethod" << (async ? "Async" : "") << "(\"" << name << "\").onInterface(INTERFACE_NAME)";
if (!timeoutValue.empty()) if (!timeoutValue.empty())
{ {