From 4847d61706869fe2089efb780281118c575a29a6 Mon Sep 17 00:00:00 2001 From: sangelovic Date: Sun, 29 Mar 2020 21:43:20 +0200 Subject: [PATCH] Introduce support for cancellable async calls --- include/sdbus-c++/ConvenienceApiClasses.h | 3 +- include/sdbus-c++/ConvenienceApiClasses.inl | 4 +- include/sdbus-c++/IProxy.h | 47 +++++++++++++++++- src/Proxy.cpp | 49 ++++++++++++++++--- src/Proxy.h | 33 +++++++++---- .../integrationtests/AdaptorAndProxy_test.cpp | 45 +++++++++++++++++ tests/integrationtests/proxy-glue.h | 16 +++--- tests/stresstests/concatenator-proxy.h | 4 +- tools/xml2cpp-codegen/ProxyGenerator.cpp | 7 +-- 9 files changed, 174 insertions(+), 34 deletions(-) diff --git a/include/sdbus-c++/ConvenienceApiClasses.h b/include/sdbus-c++/ConvenienceApiClasses.h index 280e465..37e8f8b 100644 --- a/include/sdbus-c++/ConvenienceApiClasses.h +++ b/include/sdbus-c++/ConvenienceApiClasses.h @@ -42,6 +42,7 @@ namespace sdbus { class IProxy; class Variant; class Error; + class PendingAsyncCall; } namespace sdbus { @@ -193,7 +194,7 @@ namespace sdbus { template AsyncMethodInvoker& withTimeout(const std::chrono::duration<_Rep, _Period>& timeout); template AsyncMethodInvoker& withArguments(_Args&&... args); - template void uponReplyInvoke(_Function&& callback); + template PendingAsyncCall uponReplyInvoke(_Function&& callback); private: IProxy& proxy_; diff --git a/include/sdbus-c++/ConvenienceApiClasses.inl b/include/sdbus-c++/ConvenienceApiClasses.inl index 977dcdf..13c29ab 100644 --- a/include/sdbus-c++/ConvenienceApiClasses.inl +++ b/include/sdbus-c++/ConvenienceApiClasses.inl @@ -576,7 +576,7 @@ namespace sdbus { } template - void AsyncMethodInvoker::uponReplyInvoke(_Function&& callback) + PendingAsyncCall AsyncMethodInvoker::uponReplyInvoke(_Function&& callback) { assert(method_.isValid()); // onInterface() must be placed/called prior to this function @@ -594,7 +594,7 @@ namespace sdbus { sdbus::apply(callback, error, args); }; - proxy_.callMethod(method_, std::move(asyncReplyHandler), timeout_); + return proxy_.callMethod(method_, std::move(asyncReplyHandler), timeout_); } /*** ---------------- ***/ diff --git a/include/sdbus-c++/IProxy.h b/include/sdbus-c++/IProxy.h index 1ce4b3b..c15f9fb 100644 --- a/include/sdbus-c++/IProxy.h +++ b/include/sdbus-c++/IProxy.h @@ -38,6 +38,10 @@ namespace sdbus { class MethodCall; class MethodReply; class IConnection; + class PendingAsyncCall; + namespace internal { + class Proxy; + } } namespace sdbus { @@ -108,6 +112,7 @@ namespace sdbus { * @param[in] message Message representing an async method call * @param[in] asyncReplyCallback Handler for the async reply * @param[in] timeout Timeout for dbus call in microseconds + * @return Cookie for the the pending asynchronous call * * The call is non-blocking. It doesn't wait for the reply. Once the reply arrives, * the provided async reply handler will get invoked from the context of the connection @@ -117,7 +122,7 @@ namespace sdbus { * * @throws sdbus::Error in case of failure */ - virtual void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0; + virtual PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0; /*! * @copydoc IProxy::callMethod(const MethodCall&,async_reply_handler,uint64_t) @@ -263,6 +268,46 @@ namespace sdbus { [[nodiscard]] PropertySetter setProperty(const std::string& propertyName); }; + /********************************************//** + * @class PendingAsyncCall + * + * PendingAsyncCall represents a simple handle type to cancel the delivery + * of the asynchronous D-Bus call result to the application. + * + * The handle is lifetime-independent from the originating Proxy object. + * It's safe to call its methods even after the Proxy has gone. + * + ***********************************************/ + class PendingAsyncCall + { + public: + /*! + * @brief Cancels the delivery of the pending asynchronous call result + * + * This function effectively removes the callback handler registered to the + * async D-Bus method call result delivery. Does nothing if the call was + * completed already, or if the originating Proxy object has gone meanwhile. + */ + void cancel(); + + /*! + * @brief Answers whether the asynchronous call is still pending + * + * @return True if the call is pending, false if the call has been fully completed + * + * Pending call in this context means a call whose results have not arrived, or + * have arrived and are currently being processed by the callback handler. + */ + bool isPending() const; + + private: + friend internal::Proxy; + PendingAsyncCall(std::weak_ptr callData); + + private: + std::weak_ptr callData_; + }; + // Out-of-line member definitions template diff --git a/src/Proxy.cpp b/src/Proxy.cpp index 82fa933..568ef8f 100644 --- a/src/Proxy.cpp +++ b/src/Proxy.cpp @@ -93,16 +93,20 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout) return sendMethodCallMessageAndWaitForReply(message, timeout); } -void Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) +PendingAsyncCall Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) { SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL); auto callback = (void*)&Proxy::sdbus_async_reply_handler; - auto callData = std::make_unique(AsyncCalls::CallData{*this, std::move(asyncReplyCallback), {}}); + auto callData = std::make_shared(AsyncCalls::CallData{*this, std::move(asyncReplyCallback), {}}); + auto weakData = std::weak_ptr{callData}; callData->slot = message.send(callback, callData.get(), timeout); - pendingAsyncCalls_.addCall(callData->slot.get(), std::move(callData)); + auto slotPtr = callData->slot.get(); + pendingAsyncCalls_.addCall(slotPtr, std::move(callData)); + + return {weakData}; } MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout) @@ -124,7 +128,7 @@ MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& messag void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error) { SCOPE_EXIT{ cond_.notify_one(); }; - std::unique_lock lock{mutex_}; + std::unique_lock lock{mutex_}; SCOPE_EXIT{ arrived_ = true; }; //error_ = nullptr; // Necessary if SyncCallReplyData instance is thread_local @@ -137,7 +141,7 @@ void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply MethodReply Proxy::SyncCallReplyData::waitForMethodReply() { - std::unique_lock lock{mutex_}; + std::unique_lock lock{mutex_}; cond_.wait(lock, [this](){ return arrived_; }); //arrived_ = false; // Necessary if SyncCallReplyData instance is thread_local @@ -202,11 +206,12 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat auto& proxy = asyncCallData->proxy; auto slot = asyncCallData->slot.get(); + // We are removing the CallData item at the complete scope exit, after the callback has been invoked. + // We can't do it earlier (before callback invocation for example), because CallBack data (slot release) + // is the synchronization point between callback invocation and Proxy::unregister. SCOPE_EXIT { - // Slot will be nullptr in case of synchronous call. In that case, the call data lives on the call stack, - // in another thread. But that thread may have already been woken up by now and cleared its call stack, - // so we can't access asyncCallData here. Hence we save the slot pointer at the beginning of this function. + // Remove call meta-data if it's a real async call (a sync call done in terms of async has slot == nullptr) if (slot) proxy.pendingAsyncCalls_.removeCall(slot); }; @@ -247,6 +252,34 @@ int Proxy::sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd namespace sdbus { +PendingAsyncCall::PendingAsyncCall(std::weak_ptr callData) + : callData_(std::move(callData)) +{ +} + +void PendingAsyncCall::cancel() +{ + if (auto ptr = callData_.lock(); ptr != nullptr) + { + auto* callData = static_cast(ptr.get()); + callData->proxy.pendingAsyncCalls_.removeCall(callData->slot.get()); + + // At this point, the callData item is being deleted, leading to the release of the + // sd-bus slot pointer. This release locks the global sd-bus mutex. If the async + // callback is currently being processed, the sd-bus mutex is locked by the event + // loop thread, thus access to the callData item is synchronized and thread-safe. + } +} + +bool PendingAsyncCall::isPending() const +{ + return !callData_.expired(); +} + +} + +namespace sdbus { + std::unique_ptr createProxy( IConnection& connection , std::string destination , std::string objectPath ) diff --git a/src/Proxy.h b/src/Proxy.h index 4c35997..dc373a9 100644 --- a/src/Proxy.h +++ b/src/Proxy.h @@ -52,7 +52,7 @@ namespace sdbus::internal { MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override; MethodReply callMethod(const MethodCall& message, uint64_t timeout) override; - void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override; + PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override; void registerSignalHandler( const std::string& interfaceName , const std::string& signalName @@ -81,6 +81,8 @@ namespace sdbus::internal { static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); private: + friend PendingAsyncCall; + std::unique_ptr< sdbus::internal::IConnection , std::function > connection_; @@ -119,29 +121,42 @@ namespace sdbus::internal { clear(); } - bool addCall(void* slot, std::unique_ptr&& asyncCallData) + bool addCall(void* slot, std::shared_ptr asyncCallData) { std::lock_guard lock(mutex_); return calls_.emplace(slot, std::move(asyncCallData)).second; } - bool removeCall(void* slot) + void removeCall(void* slot) { - std::lock_guard lock(mutex_); - return calls_.erase(slot) > 0; + std::unique_lock lock(mutex_); + if (auto it = calls_.find(slot); it != calls_.end()) + { + auto callData = std::move(it->second); + calls_.erase(it); + lock.unlock(); + + // Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release + // out of the `mutex_' critical section here, because if the `removeCall` is called by some + // thread and at the same time Proxy's async reply handler (which already holds global sd-bus + // mutex) is in progress in a different thread, we get double-mutex deadlock. + } } void clear() { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); auto asyncCallSlots = std::move(calls_); - // Perform releasing of sd-bus slots outside of the calls_ critical section which avoids - // double mutex dead lock when the async reply handler is invoked at the same time. lock.unlock(); + + // Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release + // out of the `mutex_' critical section here, because if the `clear` is called by some thread + // and at the same time Proxy's async reply handler (which already holds global sd-bus + // mutex) is in progress in a different thread, we get double-mutex deadlock. } private: - std::unordered_map> calls_; + std::unordered_map> calls_; std::mutex mutex_; } pendingAsyncCalls_; }; diff --git a/tests/integrationtests/AdaptorAndProxy_test.cpp b/tests/integrationtests/AdaptorAndProxy_test.cpp index 74b8ce9..ddd814f 100644 --- a/tests/integrationtests/AdaptorAndProxy_test.cpp +++ b/tests/integrationtests/AdaptorAndProxy_test.cpp @@ -385,6 +385,51 @@ TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide) ASSERT_THAT(future.get(), Eq(100)); } +TEST_F(SdbusTestObject, AnswersThatAsyncCallIsPendingIfItIsInProgress) +{ + m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){}); + + auto call = m_proxy->doOperationClientSideAsync(100); + + ASSERT_TRUE(call.isPending()); +} + +TEST_F(SdbusTestObject, CancelsPendingAsyncCallOnClientSide) +{ + std::promise promise; + auto future = promise.get_future(); + m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); }); + auto call = m_proxy->doOperationClientSideAsync(100); + + call.cancel(); + + ASSERT_THAT(future.wait_for(300ms), Eq(std::future_status::timeout)); +} + +TEST_F(SdbusTestObject, AnswersThatAsyncCallIsNotPendingAfterItHasBeenCancelled) +{ + std::promise promise; + auto future = promise.get_future(); + m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); }); + auto call = m_proxy->doOperationClientSideAsync(100); + + call.cancel(); + + ASSERT_FALSE(call.isPending()); +} + +TEST_F(SdbusTestObject, AnswersThatAsyncCallIsNotPendingAfterItHasBeenCompleted) +{ + std::promise promise; + auto future = promise.get_future(); + m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); }); + + auto call = m_proxy->doOperationClientSideAsync(0); + (void) future.get(); // Wait for the call to finish + + ASSERT_FALSE(call.isPending()); +} + TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide) { std::promise promise; diff --git a/tests/integrationtests/proxy-glue.h b/tests/integrationtests/proxy-glue.h index f2b8442..965e0bd 100644 --- a/tests/integrationtests/proxy-glue.h +++ b/tests/integrationtests/proxy-glue.h @@ -156,15 +156,15 @@ public: return result; } - void doOperationClientSideAsync(uint32_t param) + sdbus::PendingAsyncCall doOperationClientSideAsync(uint32_t param) { - object_.callMethodAsync("doOperation") - .onInterface(INTERFACE_NAME) - .withArguments(param) - .uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue) - { - this->onDoOperationReply(returnValue, error); - }); + return object_.callMethodAsync("doOperation") + .onInterface(INTERFACE_NAME) + .withArguments(param) + .uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue) + { + this->onDoOperationReply(returnValue, error); + }); } void doErroneousOperationClientSideAsync() diff --git a/tests/stresstests/concatenator-proxy.h b/tests/stresstests/concatenator-proxy.h index 08e79ac..ef7964d 100644 --- a/tests/stresstests/concatenator-proxy.h +++ b/tests/stresstests/concatenator-proxy.h @@ -33,9 +33,9 @@ protected: virtual void onConcatenateReply(const std::string& result, const sdbus::Error* error) = 0; public: - void concatenate(const std::map& params) + sdbus::PendingAsyncCall concatenate(const std::map& params) { - proxy_.callMethodAsync("concatenate").onInterface(INTERFACE_NAME).withArguments(params).uponReplyInvoke([this](const sdbus::Error* error, const std::string& result){ this->onConcatenateReply(result, error); }); + return proxy_.callMethodAsync("concatenate").onInterface(INTERFACE_NAME).withArguments(params).uponReplyInvoke([this](const sdbus::Error* error, const std::string& result){ this->onConcatenateReply(result, error); }); } private: diff --git a/tools/xml2cpp-codegen/ProxyGenerator.cpp b/tools/xml2cpp-codegen/ProxyGenerator.cpp index 9e45944..8cb2e2c 100644 --- a/tools/xml2cpp-codegen/ProxyGenerator.cpp +++ b/tools/xml2cpp-codegen/ProxyGenerator.cpp @@ -173,7 +173,8 @@ std::tuple ProxyGenerator::processMethods(const Nodes& std::string outArgStr, outArgTypeStr; std::tie(outArgStr, outArgTypeStr, std::ignore, std::ignore) = argsToNamesAndTypes(outArgs); - definitionSS << tab << (async ? "void" : retType) << " " << name << "(" << inArgTypeStr << ")" << endl + const std::string realRetType = (async && !dontExpectReply ? "sdbus::PendingAsyncCall" : async ? "void" : retType); + definitionSS << tab << realRetType << " " << name << "(" << inArgTypeStr << ")" << endl << tab << "{" << endl; if (!timeoutValue.empty()) @@ -186,8 +187,8 @@ std::tuple ProxyGenerator::processMethods(const Nodes& definitionSS << tab << tab << retType << " result;" << endl; } - definitionSS << tab << tab << "proxy_.callMethod" << (async ? "Async" : "") << "(\"" << name << "\")" - ".onInterface(INTERFACE_NAME)"; + definitionSS << tab << tab << (async && !dontExpectReply ? "return " : "") + << "proxy_.callMethod" << (async ? "Async" : "") << "(\"" << name << "\").onInterface(INTERFACE_NAME)"; if (!timeoutValue.empty()) {