feat: add support for std::future-based async calls

This commit is contained in:
Stanislav Angelovič
2023-01-29 22:12:10 +01:00
parent 3a56113422
commit 737f04abc7
10 changed files with 218 additions and 9 deletions

View File

@ -1066,11 +1066,11 @@ For a real example of a server-side asynchronous D-Bus method, please look at sd
Asynchronous client-side methods
--------------------------------
sdbus-c++ also supports asynchronous approach at the client (the proxy) side. With this approach, we can issue a D-Bus method call without blocking current thread's execution while waiting for the reply. We go on doing other things, and when the reply comes, a given callback is invoked within the context of the D-Bus dispatcher thread.
sdbus-c++ also supports asynchronous approach at the client (the proxy) side. With this approach, we can issue a D-Bus method call without blocking current thread's execution while waiting for the reply. We go on doing other things, and when the reply comes, either a given callback handler will be invoked within the context of the event loop thread, or a future object returned by the async call will be set the returned value.6
### Lower-level API
Considering the Concatenator example based on lower-level API, if we wanted to call `concatenate` in an async way, we'd have to pass a callback to the proxy when issuing the call, and that callback gets invoked when the reply arrives:
Considering the Concatenator example based on lower-level API, if we wanted to call `concatenate` in an async way, we have two options: We either pass a callback to the proxy when issuing the call, and that callback gets invoked when the reply arrives:
```c++
int main(int argc, char *argv[])
@ -1115,9 +1115,34 @@ int main(int argc, char *argv[])
The callback is a void-returning function taking two arguments: a reference to the reply message, and a pointer to the prospective `sdbus::Error` instance. Zero `Error` pointer means that no D-Bus error occurred while making the call, and the reply message contains valid reply. Non-zero `Error` pointer, however, points to the valid `Error` instance, meaning that an error occurred. Error name and message can then be read out by the client from that instance.
There is also an overload of this `IProxy::callMethod()` function taking method call timeout argument.
Another option is to use `std::future`-based overload of the `IProxy::callMethod()` function. A future object will be returned which will later, when the reply arrives, be set to contain the returned reply message. Or if the call returns an error, `sdbus::Error` will be thrown by `std::future::get()`.
```c++
...
// Invoke concatenate on given interface of the object
{
auto method = concatenatorProxy->createMethodCall(interfaceName, "concatenate");
method << numbers << separator;
auto future = concatenatorProxy->callMethod(method, sdbus::with_future);
try
{
auto reply = future.get(); // This will throw if call ends with an error
std::string result;
reply >> result;
std::cout << "Got concatenate result: " << result << std::endl;
}
catch (const sdbus::Error& e)
{
std::cerr << "Got concatenate error " << e.getName() << " with message " << e.getMessage() << std::endl;
}
}
```
### Convenience API
On the convenience API level, the call statement starts with `callMethodAsync()`, and ends with `uponReplyInvoke()` that takes a callback handler. The callback is a void-returning function that takes at least one argument: pointer to the `sdbus::Error` instance. All subsequent arguments shall exactly reflect the D-Bus method output arguments. A concatenator example:
On the convenience API level, the call statement starts with `callMethodAsync()`, and one option is to finish the statement with `uponReplyInvoke()` that takes a callback handler. The callback is a void-returning function that takes at least one argument: pointer to the `sdbus::Error` instance. All subsequent arguments shall exactly reflect the D-Bus method output arguments. A concatenator example:
```c++
int main(int argc, char *argv[])
@ -1152,6 +1177,25 @@ int main(int argc, char *argv[])
When the `Error` pointer is zero, it means that no D-Bus error occurred while making the call, and subsequent arguments are valid D-Bus method return values. Non-zero `Error` pointer, however, points to the valid `Error` instance, meaning that an error occurred during the call (and subsequent arguments are simply default-constructed). Error name and message can then be read out by the client from `Error` instance.
Another option is to finish the async call statement with `getResultAsFuture()`, which is a template function which takes the list of types returned by the D-Bus method (empty list in case of `void`-returning method) which returns a `std::future` object, which will later, when the reply arrives, be set to contain the return value(s). Or if the call returns an error, `sdbus::Error` will be thrown by `std::future::get()`.
The future object will contain void for a void-returning D-Bus method, a single type for a single value returning D-Bus method, and a `std::tuple` to hold multiple return values of a D-Bus method.
```c++
...
auto future = concatenatorProxy->callMethodAsync("concatenate").onInterface(interfaceName).withArguments(numbers, separator).getResultAsFuture<std::string>();
try
{
auto concatenatedString = future.get(); // This waits for the reply
std::cout << "Got concatenate result: " << concatenatedString << std::endl;
}
catch (const sdbus::Error& e)
{
std::cerr << "Got concatenate error " << e.getName() << " with message " << e.getMessage() << std::endl;
}
...
```
### Marking client-side async methods in the IDL
sdbus-c++ stub generator can generate stub code for client-side async methods. We just need to annotate the method with `org.freedesktop.DBus.Method.Async`. The annotation element value must be either `client` (async on the client-side only) or `clientserver` (async method on both client- and server-side):

View File

@ -34,6 +34,7 @@
#include <vector>
#include <type_traits>
#include <chrono>
#include <future>
#include <cstdint>
// Forward declarations
@ -195,6 +196,10 @@ namespace sdbus {
AsyncMethodInvoker& withTimeout(const std::chrono::duration<_Rep, _Period>& timeout);
template <typename... _Args> AsyncMethodInvoker& withArguments(_Args&&... args);
template <typename _Function> PendingAsyncCall uponReplyInvoke(_Function&& callback);
// Returned future will be std::future<void> for no (void) D-Bus method return value
// or std::future<T> for single D-Bus method return value
// or std::future<std::tuple<...>> for multiple method return values
template <typename... _Args> std::future<future_return_t<_Args...>> getResultAsFuture();
private:
IProxy& proxy_;

View File

@ -610,6 +610,29 @@ namespace sdbus {
return proxy_.callMethod(method_, std::move(asyncReplyHandler), timeout_);
}
template <typename... _Args>
std::future<future_return_t<_Args...>> AsyncMethodInvoker::getResultAsFuture()
{
auto promise = std::make_shared<std::promise<future_return_t<_Args...>>>();
auto future = promise->get_future();
uponReplyInvoke([promise = std::move(promise)](const Error* error, _Args... args)
{
if (error == nullptr)
if constexpr (!std::is_void_v<future_return_t<_Args...>>)
promise->set_value({std::move(args)...});
else
promise->set_value();
else
promise->set_exception(std::make_exception_ptr(*error));
});
// Will be std::future<void> for no D-Bus method return value
// or std::future<T> for single D-Bus method return value
// or std::future<std::tuple<...>> for multiple method return values
return future;
}
/*** ---------------- ***/
/*** SignalSubscriber ***/
/*** ---------------- ***/

View File

@ -28,10 +28,12 @@
#define SDBUS_CXX_IPROXY_H_
#include <sdbus-c++/ConvenienceApiClasses.h>
#include <sdbus-c++/TypeTraits.h>
#include <string>
#include <memory>
#include <functional>
#include <chrono>
#include <future>
// Forward declarations
namespace sdbus {
@ -322,6 +324,33 @@ namespace sdbus {
* @return A pointer to the currently processed D-Bus message
*/
virtual const Message* getCurrentlyProcessedMessage() const = 0;
/*!
* @brief Calls method on the proxied D-Bus object asynchronously
*
* @param[in] message Message representing an async method call
* @param[in] asyncReplyCallback Handler for the async reply
* @param[in] timeout Timeout for dbus call in microseconds
* @return Cookie for the the pending asynchronous call
*
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided async reply handler will get invoked from the context of the connection
* I/O event loop thread.
*
* Note: To avoid messing with messages, use higher-level API defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual std::future<MethodReply> callMethod(const MethodCall& message, with_future_t) = 0;
virtual std::future<MethodReply> callMethod(const MethodCall& message, uint64_t timeout, with_future_t) = 0;
/*!
* @copydoc IProxy::callMethod(const MethodCall&,uint64_t,with_future_t)
*/
template <typename _Rep, typename _Period>
std::future<MethodReply> callMethod( const MethodCall& message
, const std::chrono::duration<_Rep, _Period>& timeout
, with_future_t );
};
/********************************************//**
@ -382,6 +411,15 @@ namespace sdbus {
return callMethod(message, std::move(asyncReplyCallback), microsecs.count());
}
template <typename _Rep, typename _Period>
inline std::future<MethodReply> IProxy::callMethod( const MethodCall& message
, const std::chrono::duration<_Rep, _Period>& timeout
, with_future_t )
{
auto microsecs = std::chrono::duration_cast<std::chrono::microseconds>(timeout);
return callMethod(message, microsecs.count(), with_future);
}
inline MethodInvoker IProxy::callMethod(const std::string& methodName)
{
return MethodInvoker(*this, methodName);
@ -412,11 +450,6 @@ namespace sdbus {
return PropertySetter(*this, propertyName);
}
// Tag specifying that the proxy shall not run an event loop thread on its D-Bus connection.
// Such proxies are typically created to carry out a simple synchronous D-Bus call(s) and then are destroyed.
struct dont_run_event_loop_thread_t { explicit dont_run_event_loop_thread_t() = default; };
inline constexpr dont_run_event_loop_thread_t dont_run_event_loop_thread{};
/*!
* @brief Creates a proxy object for a specific remote D-Bus object
*

View File

@ -81,6 +81,13 @@ namespace sdbus {
// Tag denoting the assumption that the caller has already obtained fd ownership
struct adopt_fd_t { explicit adopt_fd_t() = default; };
inline constexpr adopt_fd_t adopt_fd{};
// Tag specifying that the proxy shall not run an event loop thread on its D-Bus connection.
// Such proxies are typically created to carry out a simple synchronous D-Bus call(s) and then are destroyed.
struct dont_run_event_loop_thread_t { explicit dont_run_event_loop_thread_t() = default; };
inline constexpr dont_run_event_loop_thread_t dont_run_event_loop_thread{};
// Tag denoting an asynchronous call that returns std::future as a handle
struct with_future_t { explicit with_future_t() = default; };
inline constexpr with_future_t with_future{};
// Template specializations for getting D-Bus signatures from C++ types
template <typename _T>
@ -540,6 +547,26 @@ namespace sdbus {
}
};
template <typename... _Args> struct future_return
{
typedef std::tuple<_Args...> type;
};
template <> struct future_return<>
{
typedef void type;
};
template <typename _Type> struct future_return<_Type>
{
typedef _Type type;
};
template <typename... _Args>
using future_return_t = typename future_return<_Args...>::type;
namespace detail
{
template <class _Function, class _Tuple, typename... _Args, std::size_t... _I>

View File

@ -131,6 +131,29 @@ PendingAsyncCall Proxy::callMethod(const MethodCall& message, async_reply_handle
return {weakData};
}
std::future<MethodReply> Proxy::callMethod(const MethodCall& message, with_future_t)
{
return Proxy::callMethod(message, {}, with_future);
}
std::future<MethodReply> Proxy::callMethod(const MethodCall& message, uint64_t timeout, with_future_t)
{
auto promise = std::make_shared<std::promise<MethodReply>>();
auto future = promise->get_future();
async_reply_handler asyncReplyCallback = [promise = std::move(promise)](MethodReply& reply, const Error* error) noexcept
{
if (error == nullptr)
promise->set_value(reply); // TODO: std::move? Can't move now because currently processed message. TODO: Refactor
else
promise->set_exception(std::make_exception_ptr(*error));
};
(void)Proxy::callMethod(message, std::move(asyncReplyCallback), timeout);
return future;
}
MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout)
{
/*thread_local*/ SyncCallReplyData syncCallReplyData;

View File

@ -58,6 +58,8 @@ namespace sdbus::internal {
MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override;
MethodReply callMethod(const MethodCall& message, uint64_t timeout) override;
PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override;
std::future<MethodReply> callMethod(const MethodCall& message, with_future_t) override;
std::future<MethodReply> callMethod(const MethodCall& message, uint64_t timeout, with_future_t) override;
void registerSignalHandler( const std::string& interfaceName
, const std::string& signalName

View File

@ -161,6 +161,24 @@ TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide)
ASSERT_THAT(future.get(), Eq(100));
}
TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSideWithFuture)
{
auto future = m_proxy->doOperationClientSideAsync(100, sdbus::with_future);
ASSERT_THAT(future.get(), Eq(100));
}
TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSideWithFutureOnBasicAPILevel)
{
auto future = m_proxy->doOperationClientSideAsyncOnBasicAPILevel(100);
auto methodReply = future.get();
uint32_t returnValue{};
methodReply >> returnValue;
ASSERT_THAT(returnValue, Eq(100));
}
TEST_F(SdbusTestObject, AnswersThatAsyncCallIsPendingIfItIsInProgress)
{
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){});
@ -222,7 +240,7 @@ TEST_F(SdbusTestObject, SupportsAsyncCallCopyAssignment)
ASSERT_TRUE(call.isPending());
}
TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
TEST_F(SdbusTestObject, ReturnsNonnullErrorWhenAsynchronousMethodCallFails)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
@ -238,3 +256,10 @@ TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
ASSERT_THROW(future.get(), sdbus::Error);
}
TEST_F(SdbusTestObject, ThrowsErrorWhenClientSideAsynchronousMethodCallWithFutureFails)
{
auto future = m_proxy->doErroneousOperationClientSideAsync(sdbus::with_future);
ASSERT_THROW(future.get(), sdbus::Error);
}

View File

@ -123,6 +123,22 @@ sdbus::PendingAsyncCall TestProxy::doOperationClientSideAsync(uint32_t param)
});
}
std::future<uint32_t> TestProxy::doOperationClientSideAsync(uint32_t param, with_future_t)
{
return getProxy().callMethodAsync("doOperation")
.onInterface(sdbus::test::INTERFACE_NAME)
.withArguments(param)
.getResultAsFuture<uint32_t>();
}
std::future<MethodReply> TestProxy::doOperationClientSideAsyncOnBasicAPILevel(uint32_t param)
{
auto methodCall = getProxy().createMethodCall(sdbus::test::INTERFACE_NAME, "doOperation");
methodCall << param;
return getProxy().callMethod(methodCall, sdbus::with_future);
}
void TestProxy::doErroneousOperationClientSideAsync()
{
getProxy().callMethodAsync("throwError")
@ -133,6 +149,13 @@ void TestProxy::doErroneousOperationClientSideAsync()
});
}
std::future<void> TestProxy::doErroneousOperationClientSideAsync(with_future_t)
{
return getProxy().callMethodAsync("throwError")
.onInterface(sdbus::test::INTERFACE_NAME)
.getResultAsFuture<>();;
}
void TestProxy::doOperationClientSideAsyncWithTimeout(const std::chrono::microseconds &timeout, uint32_t param)
{
using namespace std::chrono_literals;

View File

@ -32,6 +32,7 @@
#include <thread>
#include <chrono>
#include <atomic>
#include <future>
namespace sdbus { namespace test {
@ -94,6 +95,9 @@ public:
void installDoOperationClientSideAsyncReplyHandler(std::function<void(uint32_t res, const sdbus::Error* err)> handler);
uint32_t doOperationWithTimeout(const std::chrono::microseconds &timeout, uint32_t param);
sdbus::PendingAsyncCall doOperationClientSideAsync(uint32_t param);
std::future<uint32_t> doOperationClientSideAsync(uint32_t param, with_future_t);
std::future<MethodReply> doOperationClientSideAsyncOnBasicAPILevel(uint32_t param);
std::future<void> doErroneousOperationClientSideAsync(with_future_t);
void doErroneousOperationClientSideAsync();
void doOperationClientSideAsyncWithTimeout(const std::chrono::microseconds &timeout, uint32_t param);
int32_t callNonexistentMethod();