Introduce support for client-side asynchronous method invocations

This commit is contained in:
sangelovic
2019-01-27 14:55:20 +01:00
parent 97c47cb6df
commit 0cf27f7262
14 changed files with 284 additions and 11 deletions

View File

@ -37,6 +37,7 @@ namespace sdbus {
class IObject;
class IObjectProxy;
class Variant;
class Error;
}
namespace sdbus {
@ -165,6 +166,7 @@ namespace sdbus {
MethodInvoker& onInterface(const std::string& interfaceName);
template <typename... _Args> MethodInvoker& withArguments(_Args&&... args);
template <typename... _Args> void storeResultsTo(_Args&... args);
void dontExpectReply();
private:
@ -175,6 +177,21 @@ namespace sdbus {
bool methodCalled_{};
};
class AsyncMethodInvoker
{
public:
AsyncMethodInvoker(IObjectProxy& objectProxy, const std::string& methodName);
AsyncMethodInvoker& onInterface(const std::string& interfaceName);
template <typename... _Args> AsyncMethodInvoker& withArguments(_Args&&... args);
//template <typename... _OutputArgs> void uponReplyInvoke(std::function<void(const Error*, _OutputArgs...)> callback);
template <typename _Function> void uponReplyInvoke(_Function&& callback);
private:
IObjectProxy& objectProxy_;
const std::string& methodName_;
AsyncMethodCall method_;
};
class SignalSubscriber
{
public:

View File

@ -119,8 +119,7 @@ namespace sdbus {
// as a storage for the argument values deserialized from the message.
tuple_of_function_input_arg_types_t<_Function> inputArgs;
// Deserialize input arguments from the message into the tuple,
// plus store the result object as a last item of the tuple.
// Deserialize input arguments from the message into the tuple.
msg >> inputArgs;
// Invoke callback with input arguments from the tuple.
@ -487,6 +486,49 @@ namespace sdbus {
}
inline AsyncMethodInvoker::AsyncMethodInvoker(IObjectProxy& objectProxy, const std::string& methodName)
: objectProxy_(objectProxy)
, methodName_(methodName)
{
}
inline AsyncMethodInvoker& AsyncMethodInvoker::onInterface(const std::string& interfaceName)
{
method_ = objectProxy_.createAsyncMethodCall(interfaceName, methodName_);
return *this;
}
template <typename... _Args>
inline AsyncMethodInvoker& AsyncMethodInvoker::withArguments(_Args&&... args)
{
SDBUS_THROW_ERROR_IF(!method_.isValid(), "DBus interface not specified when calling a DBus method", EINVAL);
detail::serialize_pack(method_, std::forward<_Args>(args)...);
return *this;
}
template <typename _Function>
void AsyncMethodInvoker::uponReplyInvoke(_Function&& callback)
{
SDBUS_THROW_ERROR_IF(!method_.isValid(), "DBus interface not specified when calling a DBus method", EINVAL);
objectProxy_.callMethod(method_, [callback = std::forward<_Function>(callback)](MethodReply& reply, const Error* error)
{
// Create a tuple of callback input arguments' types, which will be used
// as a storage for the argument values deserialized from the message.
tuple_of_function_input_arg_types_t<_Function> args;
// Deserialize input arguments from the message into the tuple.
reply >> args;
// Invoke callback with input arguments from the tuple.
sdbus::apply(callback, error, args); // TODO: Use std::apply when switching to full C++17 support
});
}
inline SignalSubscriber::SignalSubscriber(IObjectProxy& objectProxy, const std::string& signalName)
: objectProxy_(objectProxy)
, signalName_(signalName)

View File

@ -57,6 +57,11 @@ namespace sdbus {
return message_;
}
bool isValid() const
{
return !getName().empty();
}
private:
std::string name_;
std::string message_;

View File

@ -34,6 +34,7 @@
// Forward declarations
namespace sdbus {
class MethodCall;
class AsyncMethodCall;
class MethodReply;
class IConnection;
}
@ -58,7 +59,7 @@ namespace sdbus {
*
* @param[in] interfaceName Name of an interface that the method is defined under
* @param[in] methodName Name of the method
* @return A method call message message
* @return A method call message
*
* Serialize method arguments into the returned message and invoke the method by passing
* the message with serialized arguments to the @c callMethod function.
@ -68,6 +69,21 @@ namespace sdbus {
*/
virtual MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) = 0;
/*!
* @brief Creates an asynchronous method call message
*
* @param[in] interfaceName Name of an interface that the method is defined under
* @param[in] methodName Name of the method
* @return A method call message
*
* Serialize method arguments into the returned message and invoke the method by passing
* the message with serialized arguments to the @c callMethod function.
* Alternatively, use higher-level API @c callMethodAsync(const std::string& methodName) defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) = 0;
/*!
* @brief Calls method on the proxied D-Bus object
*
@ -85,7 +101,23 @@ namespace sdbus {
*
* @throws sdbus::Error in case of failure
*/
virtual MethodReply callMethod(const sdbus::MethodCall& message) = 0;
virtual MethodReply callMethod(const MethodCall& message) = 0;
/*!
* @brief Calls method on the proxied D-Bus object asynchronously
*
* @param[in] message Message representing an async method call
* @param[in] asyncReplyHandler Handler for the async reply
*
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided async reply handler will get invoked from the context of the connection
* event loop processing thread.
*
* Note: To avoid messing with messages, use higher-level API defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback) = 0;
/*!
* @brief Registers a handler for the desired signal emitted by the proxied D-Bus object
@ -131,6 +163,30 @@ namespace sdbus {
*/
MethodInvoker callMethod(const std::string& methodName);
/*!
* @brief Calls method on the proxied D-Bus object asynchronously
*
* @param[in] methodName Name of the method
* @return A helper object for convenient asynchronous invocation of the method
*
* This is a high-level, convenience way of calling D-Bus methods that abstracts
* from the D-Bus message concept. Method arguments/return value are automatically (de)serialized
* in a message and D-Bus signatures automatically deduced from the provided native arguments
* and return values.
*
* Example of use:
* @code
* int a = ..., b = ...;
* object_.callMethodAsync("multiply").onInterface(INTERFACE_NAME).withArguments(a, b).uponReplyInvoke([](int result)
* {
* std::cout << "Got result of multiplying " << a << " and " << b << ": " << result << std::endl;
* });
* @endcode
*
* @throws sdbus::Error in case of failure
*/
AsyncMethodInvoker callMethodAsync(const std::string& methodName);
/*!
* @brief Registers signal handler for a given signal of the proxied D-Bus object
*
@ -197,6 +253,11 @@ namespace sdbus {
return MethodInvoker(*this, methodName);
}
inline AsyncMethodInvoker IObjectProxy::callMethodAsync(const std::string& methodName)
{
return AsyncMethodInvoker(*this, methodName);
}
inline SignalSubscriber IObjectProxy::uponSignal(const std::string& signalName)
{
return SignalSubscriber(*this, signalName);

View File

@ -163,6 +163,14 @@ namespace sdbus {
MethodReply sendWithNoReply() const;
};
class AsyncMethodCall : public Message
{
public:
using Message::Message;
AsyncMethodCall(MethodCall&& call) noexcept;
void send(void* callback, void* userData) const;
};
class MethodReply : public Message
{
public:

View File

@ -46,12 +46,14 @@ namespace sdbus {
class Signal;
class MethodResult;
template <typename... _Results> class Result;
class Error;
}
namespace sdbus {
using method_callback = std::function<void(MethodCall& msg, MethodReply& reply)>;
using async_method_callback = std::function<void(MethodCall& msg, MethodResult result)>;
using async_reply_handler = std::function<void(MethodReply& reply, const Error* error)>;
using signal_handler = std::function<void(Signal& signal)>;
using property_set_callback = std::function<void(Message& msg)>;
using property_get_callback = std::function<void(Message& reply)>;
@ -368,6 +370,12 @@ namespace sdbus {
static constexpr bool is_async = false;
};
template <typename... _Args>
struct function_traits<void(const Error*, _Args...)>
: public function_traits_base<void, _Args...>
{
};
template <typename... _Args, typename... _Results>
struct function_traits<void(Result<_Results...>, _Args...)>
: public function_traits_base<std::tuple<_Results...>, _Args...>
@ -498,6 +506,15 @@ namespace sdbus {
return std::forward<_Function>(f)(std::move(r), std::get<_I>(std::forward<_Tuple>(t))...);
}
template <class _Function, class _Tuple, std::size_t... _I>
constexpr decltype(auto) apply_impl( _Function&& f
, const Error* e
, _Tuple&& t
, std::index_sequence<_I...> )
{
return std::forward<_Function>(f)(e, std::get<_I>(std::forward<_Tuple>(t))...);
}
// Version of apply_impl for functions returning non-void values.
// In this case just forward function return value.
template <class _Function, class _Tuple, std::size_t... _I>
@ -542,6 +559,17 @@ namespace sdbus {
, std::forward<_Tuple>(t)
, std::make_index_sequence<std::tuple_size<std::decay_t<_Tuple>>::value>{} );
}
// Convert tuple `t' of values into a list of arguments
// and invoke function `f' with those arguments.
template <class _Function, class _Tuple>
constexpr decltype(auto) apply(_Function&& f, const Error* e, _Tuple&& t)
{
return detail::apply_impl( std::forward<_Function>(f)
, e
, std::forward<_Tuple>(t)
, std::make_index_sequence<std::tuple_size<std::decay_t<_Tuple>>::value>{} );
}
}
#endif /* SDBUS_CXX_TYPETRAITS_H_ */

View File

@ -648,6 +648,17 @@ MethodReply MethodCall::createErrorReply(const Error& error) const
return MethodReply(sdbusErrorReply);
}
AsyncMethodCall::AsyncMethodCall(MethodCall&& call) noexcept
: Message(call)
{
}
void AsyncMethodCall::send(void* callback, void* userData) const
{
auto r = sd_bus_call_async(nullptr, nullptr, (sd_bus_message*)getMsg(), (sd_bus_message_handler_t)callback, userData, 0);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method asynchronously", -r);
}
void MethodReply::send() const
{
auto r = sd_bus_send(nullptr, (sd_bus_message*)getMsg(), nullptr);

View File

@ -66,11 +66,22 @@ MethodCall ObjectProxy::createMethodCall(const std::string& interfaceName, const
return connection_->createMethodCall(destination_, objectPath_, interfaceName, methodName);
}
AsyncMethodCall ObjectProxy::createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName)
{
return AsyncMethodCall{createMethodCall(interfaceName, methodName)};
}
MethodReply ObjectProxy::callMethod(const MethodCall& message)
{
return message.send();
}
void ObjectProxy::callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback)
{
// The new-ed handler gets deleted in the sdbus_async_reply_handler
message.send((void*)&ObjectProxy::sdbus_async_reply_handler, new async_reply_handler(std::move(asyncReplyCallback)));
}
void ObjectProxy::registerSignalHandler( const std::string& interfaceName
, const std::string& signalName
, signal_handler signalHandler )
@ -136,11 +147,30 @@ void ObjectProxy::registerSignalHandlers(sdbus::internal::IConnection& connectio
}
}
int ObjectProxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError)
{
MethodReply message(sdbusMessage);
std::unique_ptr<async_reply_handler> asyncReplyCallback{static_cast<async_reply_handler*>(userData)};
assert(asyncReplyCallback != nullptr);
if (!sd_bus_error_is_set(retError))
{
(*asyncReplyCallback)(message, nullptr);
}
else
{
sdbus::Error error(retError->name, retError->message);
(*asyncReplyCallback)(message, &error);
}
}
int ObjectProxy::sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/)
{
Signal message(sdbusMessage);
auto* object = static_cast<ObjectProxy*>(userData);
assert(object != nullptr);
// Note: The lookup can be optimized by using sorted vectors instead of associative containers
auto& callback = object->interfaces_[message.getInterfaceName()].signals_[message.getMemberName()].callback_;
assert(callback);

View File

@ -53,7 +53,9 @@ namespace internal {
~ObjectProxy();
MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override;
AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) override;
MethodReply callMethod(const MethodCall& message) override;
void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback) override;
void registerSignalHandler( const std::string& interfaceName
, const std::string& signalName
@ -63,6 +65,7 @@ namespace internal {
private:
bool listensToSignals() const;
void registerSignalHandlers(sdbus::internal::IConnection& connection);
static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
static int sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
private:

View File

@ -42,6 +42,7 @@
#include <tuple>
#include <chrono>
#include <fstream>
#include <future>
using ::testing::Eq;
using ::testing::Gt;
@ -246,7 +247,7 @@ TEST_F(SdbusTestObject, CallsErrorThrowingMethodWithDontExpectReplySet)
ASSERT_TRUE(m_adaptor->wasThrowErrorCalled());
}
TEST_F(SdbusTestObject, DoesServerSideAsynchoronousMethodInParallel)
TEST_F(SdbusTestObject, RunsServerSideAsynchoronousMethodAsynchronously)
{
// Yeah, this is kinda timing-dependent test, but times should be safe...
std::mutex mtx;
@ -301,6 +302,40 @@ TEST_F(SdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncMethods)
ASSERT_THAT(resultCount, Eq(1500));
}
TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err)
{
if (err == nullptr)
promise.set_value(res);
else
promise.set_exception(std::make_exception_ptr(*err));
});
m_proxy->doOperationClientSideAsync(100);
ASSERT_THAT(future.get(), Eq(100));
}
TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err)
{
if (err == nullptr)
promise.set_value(res);
else
promise.set_exception(std::make_exception_ptr(*err));
});
m_proxy->doErroneousOperationClientSideAsync();
ASSERT_THROW(future.get(), sdbus::Error);
}
TEST_F(SdbusTestObject, FailsCallingNonexistentMethod)
{
ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error);

View File

@ -110,7 +110,7 @@ protected:
return res;
}
uint32_t doOperationSync(uint32_t param)
uint32_t doOperation(uint32_t param)
{
std::this_thread::sleep_for(std::chrono::milliseconds(param));
return param;

View File

@ -38,6 +38,11 @@ public:
double getVariantValue() const { return m_variantValue; }
std::map<std::string, std::string> getSignatureFromSignal() const { return m_signature; }
void installDoOperationClientSideAsyncReplyHandler(std::function<void(uint32_t res, const sdbus::Error* err)> handler)
{
m_DoOperationClientSideAsyncReplyHandler = handler;
}
protected:
void onSimpleSignal() override { ++m_simpleCallCounter; }
@ -53,12 +58,19 @@ protected:
m_signature[std::get<0>(s)] = static_cast<std::string>(std::get<0>(std::get<1>(s)));
}
void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) override
{
if (m_DoOperationClientSideAsyncReplyHandler)
m_DoOperationClientSideAsyncReplyHandler(returnValue, error);
}
private:
int m_simpleCallCounter{};
std::map<int32_t, std::string> m_map;
double m_variantValue;
std::map<std::string, std::string> m_signature;
std::function<void(uint32_t res, const sdbus::Error* err)> m_DoOperationClientSideAsyncReplyHandler;
};

View File

@ -85,9 +85,9 @@ protected:
return this->sumVectorItems(a, b);
});
object_.registerMethod("doOperationSync").onInterface(INTERFACE_NAME).implementedAs([this](uint32_t param)
object_.registerMethod("doOperation").onInterface(INTERFACE_NAME).implementedAs([this](uint32_t param)
{
return this->doOperationSync(param);
return this->doOperation(param);
});
object_.registerMethod("doOperationAsync").onInterface(INTERFACE_NAME).implementedAs([this](sdbus::Result<uint32_t> result, uint32_t param)
@ -158,7 +158,7 @@ protected:
virtual sdbus::Struct<std::string, sdbus::Struct<std::map<int32_t, int32_t>>> getStructInStruct() const = 0;
virtual int32_t sumStructItems(const sdbus::Struct<uint8_t, uint16_t>& a, const sdbus::Struct<int32_t, int64_t>& b) = 0;
virtual uint32_t sumVectorItems(const std::vector<uint16_t>& a, const std::vector<uint64_t>& b) = 0;
virtual uint32_t doOperationSync(uint32_t param) = 0;
virtual uint32_t doOperation(uint32_t param) = 0;
virtual void doOperationAsync(uint32_t param, sdbus::Result<uint32_t> result) = 0;
virtual sdbus::Signature getSignature() const = 0;
virtual sdbus::ObjectPath getObjectPath() const = 0;

View File

@ -50,6 +50,8 @@ protected:
virtual void onSignalWithVariant(const sdbus::Variant& v) = 0;
virtual void onSignalWithoutRegistration(const sdbus::Struct<std::string, sdbus::Struct<sdbus::Signature>>& s) = 0;
virtual void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) = 0;
public:
void noArgNoReturn()
{
@ -124,10 +126,10 @@ public:
return result;
}
uint32_t doOperationSync(uint32_t param)
uint32_t doOperation(uint32_t param)
{
uint32_t result;
object_.callMethod("doOperationSync").onInterface(INTERFACE_NAME).withArguments(param).storeResultsTo(result);
object_.callMethod("doOperation").onInterface(INTERFACE_NAME).withArguments(param).storeResultsTo(result);
return result;
}
@ -138,6 +140,25 @@ public:
return result;
}
uint32_t doOperationClientSideAsync(uint32_t param)
{
object_.callMethodAsync("doOperation")
.onInterface(INTERFACE_NAME)
.withArguments(param)
.uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue)
{
this->onDoOperationReply(returnValue, error);
});
}
uint32_t doErroneousOperationClientSideAsync()
{
object_.callMethodAsync("throwError").onInterface(INTERFACE_NAME).uponReplyInvoke([this](const sdbus::Error* error)
{
this->onDoOperationReply(0, error);
});
}
sdbus::Signature getSignature()
{
sdbus::Signature result;