From 0cf27f7262373d08ba8ec842c2ddb7385ae60068 Mon Sep 17 00:00:00 2001 From: sangelovic Date: Sun, 27 Jan 2019 14:55:20 +0100 Subject: [PATCH] Introduce support for client-side asynchronous method invocations --- include/sdbus-c++/ConvenienceClasses.h | 17 +++++ include/sdbus-c++/ConvenienceClasses.inl | 46 ++++++++++++- include/sdbus-c++/Error.h | 5 ++ include/sdbus-c++/IObjectProxy.h | 65 ++++++++++++++++++- include/sdbus-c++/Message.h | 8 +++ include/sdbus-c++/TypeTraits.h | 28 ++++++++ src/Message.cpp | 11 ++++ src/ObjectProxy.cpp | 30 +++++++++ src/ObjectProxy.h | 3 + .../integrationtests/AdaptorAndProxy_test.cpp | 37 ++++++++++- test/integrationtests/TestingAdaptor.h | 2 +- test/integrationtests/TestingProxy.h | 12 ++++ test/integrationtests/adaptor-glue.h | 6 +- test/integrationtests/proxy-glue.h | 25 ++++++- 14 files changed, 284 insertions(+), 11 deletions(-) diff --git a/include/sdbus-c++/ConvenienceClasses.h b/include/sdbus-c++/ConvenienceClasses.h index 5c86e28..e319772 100644 --- a/include/sdbus-c++/ConvenienceClasses.h +++ b/include/sdbus-c++/ConvenienceClasses.h @@ -37,6 +37,7 @@ namespace sdbus { class IObject; class IObjectProxy; class Variant; + class Error; } namespace sdbus { @@ -165,6 +166,7 @@ namespace sdbus { MethodInvoker& onInterface(const std::string& interfaceName); template MethodInvoker& withArguments(_Args&&... args); template void storeResultsTo(_Args&... args); + void dontExpectReply(); private: @@ -175,6 +177,21 @@ namespace sdbus { bool methodCalled_{}; }; + class AsyncMethodInvoker + { + public: + AsyncMethodInvoker(IObjectProxy& objectProxy, const std::string& methodName); + AsyncMethodInvoker& onInterface(const std::string& interfaceName); + template AsyncMethodInvoker& withArguments(_Args&&... args); + //template void uponReplyInvoke(std::function callback); + template void uponReplyInvoke(_Function&& callback); + + private: + IObjectProxy& objectProxy_; + const std::string& methodName_; + AsyncMethodCall method_; + }; + class SignalSubscriber { public: diff --git a/include/sdbus-c++/ConvenienceClasses.inl b/include/sdbus-c++/ConvenienceClasses.inl index 9cfbca9..db9535f 100755 --- a/include/sdbus-c++/ConvenienceClasses.inl +++ b/include/sdbus-c++/ConvenienceClasses.inl @@ -119,8 +119,7 @@ namespace sdbus { // as a storage for the argument values deserialized from the message. tuple_of_function_input_arg_types_t<_Function> inputArgs; - // Deserialize input arguments from the message into the tuple, - // plus store the result object as a last item of the tuple. + // Deserialize input arguments from the message into the tuple. msg >> inputArgs; // Invoke callback with input arguments from the tuple. @@ -487,6 +486,49 @@ namespace sdbus { } + inline AsyncMethodInvoker::AsyncMethodInvoker(IObjectProxy& objectProxy, const std::string& methodName) + : objectProxy_(objectProxy) + , methodName_(methodName) + { + } + + inline AsyncMethodInvoker& AsyncMethodInvoker::onInterface(const std::string& interfaceName) + { + method_ = objectProxy_.createAsyncMethodCall(interfaceName, methodName_); + + return *this; + } + + template + inline AsyncMethodInvoker& AsyncMethodInvoker::withArguments(_Args&&... args) + { + SDBUS_THROW_ERROR_IF(!method_.isValid(), "DBus interface not specified when calling a DBus method", EINVAL); + + detail::serialize_pack(method_, std::forward<_Args>(args)...); + + return *this; + } + + template + void AsyncMethodInvoker::uponReplyInvoke(_Function&& callback) + { + SDBUS_THROW_ERROR_IF(!method_.isValid(), "DBus interface not specified when calling a DBus method", EINVAL); + + objectProxy_.callMethod(method_, [callback = std::forward<_Function>(callback)](MethodReply& reply, const Error* error) + { + // Create a tuple of callback input arguments' types, which will be used + // as a storage for the argument values deserialized from the message. + tuple_of_function_input_arg_types_t<_Function> args; + + // Deserialize input arguments from the message into the tuple. + reply >> args; + + // Invoke callback with input arguments from the tuple. + sdbus::apply(callback, error, args); // TODO: Use std::apply when switching to full C++17 support + }); + } + + inline SignalSubscriber::SignalSubscriber(IObjectProxy& objectProxy, const std::string& signalName) : objectProxy_(objectProxy) , signalName_(signalName) diff --git a/include/sdbus-c++/Error.h b/include/sdbus-c++/Error.h index efdcd41..66edbf6 100755 --- a/include/sdbus-c++/Error.h +++ b/include/sdbus-c++/Error.h @@ -57,6 +57,11 @@ namespace sdbus { return message_; } + bool isValid() const + { + return !getName().empty(); + } + private: std::string name_; std::string message_; diff --git a/include/sdbus-c++/IObjectProxy.h b/include/sdbus-c++/IObjectProxy.h index b09d8dd..9a1f733 100755 --- a/include/sdbus-c++/IObjectProxy.h +++ b/include/sdbus-c++/IObjectProxy.h @@ -34,6 +34,7 @@ // Forward declarations namespace sdbus { class MethodCall; + class AsyncMethodCall; class MethodReply; class IConnection; } @@ -58,7 +59,7 @@ namespace sdbus { * * @param[in] interfaceName Name of an interface that the method is defined under * @param[in] methodName Name of the method - * @return A method call message message + * @return A method call message * * Serialize method arguments into the returned message and invoke the method by passing * the message with serialized arguments to the @c callMethod function. @@ -68,6 +69,21 @@ namespace sdbus { */ virtual MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) = 0; + /*! + * @brief Creates an asynchronous method call message + * + * @param[in] interfaceName Name of an interface that the method is defined under + * @param[in] methodName Name of the method + * @return A method call message + * + * Serialize method arguments into the returned message and invoke the method by passing + * the message with serialized arguments to the @c callMethod function. + * Alternatively, use higher-level API @c callMethodAsync(const std::string& methodName) defined below. + * + * @throws sdbus::Error in case of failure + */ + virtual AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) = 0; + /*! * @brief Calls method on the proxied D-Bus object * @@ -85,7 +101,23 @@ namespace sdbus { * * @throws sdbus::Error in case of failure */ - virtual MethodReply callMethod(const sdbus::MethodCall& message) = 0; + virtual MethodReply callMethod(const MethodCall& message) = 0; + + /*! + * @brief Calls method on the proxied D-Bus object asynchronously + * + * @param[in] message Message representing an async method call + * @param[in] asyncReplyHandler Handler for the async reply + * + * The call is non-blocking. It doesn't wait for the reply. Once the reply arrives, + * the provided async reply handler will get invoked from the context of the connection + * event loop processing thread. + * + * Note: To avoid messing with messages, use higher-level API defined below. + * + * @throws sdbus::Error in case of failure + */ + virtual void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback) = 0; /*! * @brief Registers a handler for the desired signal emitted by the proxied D-Bus object @@ -131,6 +163,30 @@ namespace sdbus { */ MethodInvoker callMethod(const std::string& methodName); + /*! + * @brief Calls method on the proxied D-Bus object asynchronously + * + * @param[in] methodName Name of the method + * @return A helper object for convenient asynchronous invocation of the method + * + * This is a high-level, convenience way of calling D-Bus methods that abstracts + * from the D-Bus message concept. Method arguments/return value are automatically (de)serialized + * in a message and D-Bus signatures automatically deduced from the provided native arguments + * and return values. + * + * Example of use: + * @code + * int a = ..., b = ...; + * object_.callMethodAsync("multiply").onInterface(INTERFACE_NAME).withArguments(a, b).uponReplyInvoke([](int result) + * { + * std::cout << "Got result of multiplying " << a << " and " << b << ": " << result << std::endl; + * }); + * @endcode + * + * @throws sdbus::Error in case of failure + */ + AsyncMethodInvoker callMethodAsync(const std::string& methodName); + /*! * @brief Registers signal handler for a given signal of the proxied D-Bus object * @@ -197,6 +253,11 @@ namespace sdbus { return MethodInvoker(*this, methodName); } + inline AsyncMethodInvoker IObjectProxy::callMethodAsync(const std::string& methodName) + { + return AsyncMethodInvoker(*this, methodName); + } + inline SignalSubscriber IObjectProxy::uponSignal(const std::string& signalName) { return SignalSubscriber(*this, signalName); diff --git a/include/sdbus-c++/Message.h b/include/sdbus-c++/Message.h index e253d4b..6d5197e 100644 --- a/include/sdbus-c++/Message.h +++ b/include/sdbus-c++/Message.h @@ -163,6 +163,14 @@ namespace sdbus { MethodReply sendWithNoReply() const; }; + class AsyncMethodCall : public Message + { + public: + using Message::Message; + AsyncMethodCall(MethodCall&& call) noexcept; + void send(void* callback, void* userData) const; + }; + class MethodReply : public Message { public: diff --git a/include/sdbus-c++/TypeTraits.h b/include/sdbus-c++/TypeTraits.h index ab898ac..2b37899 100644 --- a/include/sdbus-c++/TypeTraits.h +++ b/include/sdbus-c++/TypeTraits.h @@ -46,12 +46,14 @@ namespace sdbus { class Signal; class MethodResult; template class Result; + class Error; } namespace sdbus { using method_callback = std::function; using async_method_callback = std::function; + using async_reply_handler = std::function; using signal_handler = std::function; using property_set_callback = std::function; using property_get_callback = std::function; @@ -368,6 +370,12 @@ namespace sdbus { static constexpr bool is_async = false; }; + template + struct function_traits + : public function_traits_base + { + }; + template struct function_traits, _Args...)> : public function_traits_base, _Args...> @@ -498,6 +506,15 @@ namespace sdbus { return std::forward<_Function>(f)(std::move(r), std::get<_I>(std::forward<_Tuple>(t))...); } + template + constexpr decltype(auto) apply_impl( _Function&& f + , const Error* e + , _Tuple&& t + , std::index_sequence<_I...> ) + { + return std::forward<_Function>(f)(e, std::get<_I>(std::forward<_Tuple>(t))...); + } + // Version of apply_impl for functions returning non-void values. // In this case just forward function return value. template @@ -542,6 +559,17 @@ namespace sdbus { , std::forward<_Tuple>(t) , std::make_index_sequence>::value>{} ); } + + // Convert tuple `t' of values into a list of arguments + // and invoke function `f' with those arguments. + template + constexpr decltype(auto) apply(_Function&& f, const Error* e, _Tuple&& t) + { + return detail::apply_impl( std::forward<_Function>(f) + , e + , std::forward<_Tuple>(t) + , std::make_index_sequence>::value>{} ); + } } #endif /* SDBUS_CXX_TYPETRAITS_H_ */ diff --git a/src/Message.cpp b/src/Message.cpp index f624f96..635756a 100755 --- a/src/Message.cpp +++ b/src/Message.cpp @@ -648,6 +648,17 @@ MethodReply MethodCall::createErrorReply(const Error& error) const return MethodReply(sdbusErrorReply); } +AsyncMethodCall::AsyncMethodCall(MethodCall&& call) noexcept + : Message(call) +{ +} + +void AsyncMethodCall::send(void* callback, void* userData) const +{ + auto r = sd_bus_call_async(nullptr, nullptr, (sd_bus_message*)getMsg(), (sd_bus_message_handler_t)callback, userData, 0); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method asynchronously", -r); +} + void MethodReply::send() const { auto r = sd_bus_send(nullptr, (sd_bus_message*)getMsg(), nullptr); diff --git a/src/ObjectProxy.cpp b/src/ObjectProxy.cpp index c3983b9..3a51f0f 100755 --- a/src/ObjectProxy.cpp +++ b/src/ObjectProxy.cpp @@ -66,11 +66,22 @@ MethodCall ObjectProxy::createMethodCall(const std::string& interfaceName, const return connection_->createMethodCall(destination_, objectPath_, interfaceName, methodName); } +AsyncMethodCall ObjectProxy::createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) +{ + return AsyncMethodCall{createMethodCall(interfaceName, methodName)}; +} + MethodReply ObjectProxy::callMethod(const MethodCall& message) { return message.send(); } +void ObjectProxy::callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback) +{ + // The new-ed handler gets deleted in the sdbus_async_reply_handler + message.send((void*)&ObjectProxy::sdbus_async_reply_handler, new async_reply_handler(std::move(asyncReplyCallback))); +} + void ObjectProxy::registerSignalHandler( const std::string& interfaceName , const std::string& signalName , signal_handler signalHandler ) @@ -136,11 +147,30 @@ void ObjectProxy::registerSignalHandlers(sdbus::internal::IConnection& connectio } } +int ObjectProxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError) +{ + MethodReply message(sdbusMessage); + + std::unique_ptr asyncReplyCallback{static_cast(userData)}; + assert(asyncReplyCallback != nullptr); + + if (!sd_bus_error_is_set(retError)) + { + (*asyncReplyCallback)(message, nullptr); + } + else + { + sdbus::Error error(retError->name, retError->message); + (*asyncReplyCallback)(message, &error); + } +} + int ObjectProxy::sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/) { Signal message(sdbusMessage); auto* object = static_cast(userData); + assert(object != nullptr); // Note: The lookup can be optimized by using sorted vectors instead of associative containers auto& callback = object->interfaces_[message.getInterfaceName()].signals_[message.getMemberName()].callback_; assert(callback); diff --git a/src/ObjectProxy.h b/src/ObjectProxy.h index df59a76..c16717f 100755 --- a/src/ObjectProxy.h +++ b/src/ObjectProxy.h @@ -53,7 +53,9 @@ namespace internal { ~ObjectProxy(); MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override; + AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) override; MethodReply callMethod(const MethodCall& message) override; + void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback) override; void registerSignalHandler( const std::string& interfaceName , const std::string& signalName @@ -63,6 +65,7 @@ namespace internal { private: bool listensToSignals() const; void registerSignalHandlers(sdbus::internal::IConnection& connection); + static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); static int sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); private: diff --git a/test/integrationtests/AdaptorAndProxy_test.cpp b/test/integrationtests/AdaptorAndProxy_test.cpp index b904a5b..a46fbb4 100644 --- a/test/integrationtests/AdaptorAndProxy_test.cpp +++ b/test/integrationtests/AdaptorAndProxy_test.cpp @@ -42,6 +42,7 @@ #include #include #include +#include using ::testing::Eq; using ::testing::Gt; @@ -246,7 +247,7 @@ TEST_F(SdbusTestObject, CallsErrorThrowingMethodWithDontExpectReplySet) ASSERT_TRUE(m_adaptor->wasThrowErrorCalled()); } -TEST_F(SdbusTestObject, DoesServerSideAsynchoronousMethodInParallel) +TEST_F(SdbusTestObject, RunsServerSideAsynchoronousMethodAsynchronously) { // Yeah, this is kinda timing-dependent test, but times should be safe... std::mutex mtx; @@ -301,6 +302,40 @@ TEST_F(SdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncMethods) ASSERT_THAT(resultCount, Eq(1500)); } +TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide) +{ + std::promise promise; + auto future = promise.get_future(); + m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err) + { + if (err == nullptr) + promise.set_value(res); + else + promise.set_exception(std::make_exception_ptr(*err)); + }); + + m_proxy->doOperationClientSideAsync(100); + + ASSERT_THAT(future.get(), Eq(100)); +} + +TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide) +{ + std::promise promise; + auto future = promise.get_future(); + m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err) + { + if (err == nullptr) + promise.set_value(res); + else + promise.set_exception(std::make_exception_ptr(*err)); + }); + + m_proxy->doErroneousOperationClientSideAsync(); + + ASSERT_THROW(future.get(), sdbus::Error); +} + TEST_F(SdbusTestObject, FailsCallingNonexistentMethod) { ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error); diff --git a/test/integrationtests/TestingAdaptor.h b/test/integrationtests/TestingAdaptor.h index e8b8e59..5f09feb 100644 --- a/test/integrationtests/TestingAdaptor.h +++ b/test/integrationtests/TestingAdaptor.h @@ -110,7 +110,7 @@ protected: return res; } - uint32_t doOperationSync(uint32_t param) + uint32_t doOperation(uint32_t param) { std::this_thread::sleep_for(std::chrono::milliseconds(param)); return param; diff --git a/test/integrationtests/TestingProxy.h b/test/integrationtests/TestingProxy.h index 8e619b2..03b5210 100644 --- a/test/integrationtests/TestingProxy.h +++ b/test/integrationtests/TestingProxy.h @@ -38,6 +38,11 @@ public: double getVariantValue() const { return m_variantValue; } std::map getSignatureFromSignal() const { return m_signature; } + void installDoOperationClientSideAsyncReplyHandler(std::function handler) + { + m_DoOperationClientSideAsyncReplyHandler = handler; + } + protected: void onSimpleSignal() override { ++m_simpleCallCounter; } @@ -53,12 +58,19 @@ protected: m_signature[std::get<0>(s)] = static_cast(std::get<0>(std::get<1>(s))); } + void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) override + { + if (m_DoOperationClientSideAsyncReplyHandler) + m_DoOperationClientSideAsyncReplyHandler(returnValue, error); + } + private: int m_simpleCallCounter{}; std::map m_map; double m_variantValue; std::map m_signature; + std::function m_DoOperationClientSideAsyncReplyHandler; }; diff --git a/test/integrationtests/adaptor-glue.h b/test/integrationtests/adaptor-glue.h index ce66a48..df9a819 100644 --- a/test/integrationtests/adaptor-glue.h +++ b/test/integrationtests/adaptor-glue.h @@ -85,9 +85,9 @@ protected: return this->sumVectorItems(a, b); }); - object_.registerMethod("doOperationSync").onInterface(INTERFACE_NAME).implementedAs([this](uint32_t param) + object_.registerMethod("doOperation").onInterface(INTERFACE_NAME).implementedAs([this](uint32_t param) { - return this->doOperationSync(param); + return this->doOperation(param); }); object_.registerMethod("doOperationAsync").onInterface(INTERFACE_NAME).implementedAs([this](sdbus::Result result, uint32_t param) @@ -158,7 +158,7 @@ protected: virtual sdbus::Struct>> getStructInStruct() const = 0; virtual int32_t sumStructItems(const sdbus::Struct& a, const sdbus::Struct& b) = 0; virtual uint32_t sumVectorItems(const std::vector& a, const std::vector& b) = 0; - virtual uint32_t doOperationSync(uint32_t param) = 0; + virtual uint32_t doOperation(uint32_t param) = 0; virtual void doOperationAsync(uint32_t param, sdbus::Result result) = 0; virtual sdbus::Signature getSignature() const = 0; virtual sdbus::ObjectPath getObjectPath() const = 0; diff --git a/test/integrationtests/proxy-glue.h b/test/integrationtests/proxy-glue.h index 20cf68e..1efc172 100644 --- a/test/integrationtests/proxy-glue.h +++ b/test/integrationtests/proxy-glue.h @@ -50,6 +50,8 @@ protected: virtual void onSignalWithVariant(const sdbus::Variant& v) = 0; virtual void onSignalWithoutRegistration(const sdbus::Struct>& s) = 0; + virtual void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) = 0; + public: void noArgNoReturn() { @@ -124,10 +126,10 @@ public: return result; } - uint32_t doOperationSync(uint32_t param) + uint32_t doOperation(uint32_t param) { uint32_t result; - object_.callMethod("doOperationSync").onInterface(INTERFACE_NAME).withArguments(param).storeResultsTo(result); + object_.callMethod("doOperation").onInterface(INTERFACE_NAME).withArguments(param).storeResultsTo(result); return result; } @@ -138,6 +140,25 @@ public: return result; } + uint32_t doOperationClientSideAsync(uint32_t param) + { + object_.callMethodAsync("doOperation") + .onInterface(INTERFACE_NAME) + .withArguments(param) + .uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue) + { + this->onDoOperationReply(returnValue, error); + }); + } + + uint32_t doErroneousOperationClientSideAsync() + { + object_.callMethodAsync("throwError").onInterface(INTERFACE_NAME).uponReplyInvoke([this](const sdbus::Error* error) + { + this->onDoOperationReply(0, error); + }); + } + sdbus::Signature getSignature() { sdbus::Signature result;