forked from Kistler-Group/sdbus-cpp
Introduce support for cancellable async calls
This commit is contained in:
@ -42,6 +42,7 @@ namespace sdbus {
|
|||||||
class IProxy;
|
class IProxy;
|
||||||
class Variant;
|
class Variant;
|
||||||
class Error;
|
class Error;
|
||||||
|
class PendingAsyncCall;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace sdbus {
|
namespace sdbus {
|
||||||
@ -193,7 +194,7 @@ namespace sdbus {
|
|||||||
template <typename _Rep, typename _Period>
|
template <typename _Rep, typename _Period>
|
||||||
AsyncMethodInvoker& withTimeout(const std::chrono::duration<_Rep, _Period>& timeout);
|
AsyncMethodInvoker& withTimeout(const std::chrono::duration<_Rep, _Period>& timeout);
|
||||||
template <typename... _Args> AsyncMethodInvoker& withArguments(_Args&&... args);
|
template <typename... _Args> AsyncMethodInvoker& withArguments(_Args&&... args);
|
||||||
template <typename _Function> void uponReplyInvoke(_Function&& callback);
|
template <typename _Function> PendingAsyncCall uponReplyInvoke(_Function&& callback);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
IProxy& proxy_;
|
IProxy& proxy_;
|
||||||
|
@ -576,7 +576,7 @@ namespace sdbus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename _Function>
|
template <typename _Function>
|
||||||
void AsyncMethodInvoker::uponReplyInvoke(_Function&& callback)
|
PendingAsyncCall AsyncMethodInvoker::uponReplyInvoke(_Function&& callback)
|
||||||
{
|
{
|
||||||
assert(method_.isValid()); // onInterface() must be placed/called prior to this function
|
assert(method_.isValid()); // onInterface() must be placed/called prior to this function
|
||||||
|
|
||||||
@ -594,7 +594,7 @@ namespace sdbus {
|
|||||||
sdbus::apply(callback, error, args);
|
sdbus::apply(callback, error, args);
|
||||||
};
|
};
|
||||||
|
|
||||||
proxy_.callMethod(method_, std::move(asyncReplyHandler), timeout_);
|
return proxy_.callMethod(method_, std::move(asyncReplyHandler), timeout_);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*** ---------------- ***/
|
/*** ---------------- ***/
|
||||||
|
@ -38,6 +38,10 @@ namespace sdbus {
|
|||||||
class MethodCall;
|
class MethodCall;
|
||||||
class MethodReply;
|
class MethodReply;
|
||||||
class IConnection;
|
class IConnection;
|
||||||
|
class PendingAsyncCall;
|
||||||
|
namespace internal {
|
||||||
|
class Proxy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace sdbus {
|
namespace sdbus {
|
||||||
@ -108,6 +112,7 @@ namespace sdbus {
|
|||||||
* @param[in] message Message representing an async method call
|
* @param[in] message Message representing an async method call
|
||||||
* @param[in] asyncReplyCallback Handler for the async reply
|
* @param[in] asyncReplyCallback Handler for the async reply
|
||||||
* @param[in] timeout Timeout for dbus call in microseconds
|
* @param[in] timeout Timeout for dbus call in microseconds
|
||||||
|
* @return Cookie for the the pending asynchronous call
|
||||||
*
|
*
|
||||||
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
|
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
|
||||||
* the provided async reply handler will get invoked from the context of the connection
|
* the provided async reply handler will get invoked from the context of the connection
|
||||||
@ -117,7 +122,7 @@ namespace sdbus {
|
|||||||
*
|
*
|
||||||
* @throws sdbus::Error in case of failure
|
* @throws sdbus::Error in case of failure
|
||||||
*/
|
*/
|
||||||
virtual void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0;
|
virtual PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0;
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* @copydoc IProxy::callMethod(const MethodCall&,async_reply_handler,uint64_t)
|
* @copydoc IProxy::callMethod(const MethodCall&,async_reply_handler,uint64_t)
|
||||||
@ -263,6 +268,40 @@ namespace sdbus {
|
|||||||
[[nodiscard]] PropertySetter setProperty(const std::string& propertyName);
|
[[nodiscard]] PropertySetter setProperty(const std::string& propertyName);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/********************************************//**
|
||||||
|
* @class PendingAsyncCall
|
||||||
|
*
|
||||||
|
* PendingAsyncCall represents a simple handle type to cancel the delivery
|
||||||
|
* of the asynchronous D-Bus call result to the application.
|
||||||
|
*
|
||||||
|
***********************************************/
|
||||||
|
class PendingAsyncCall
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
/*!
|
||||||
|
* @brief Cancels the pending asynchronous call
|
||||||
|
*
|
||||||
|
* Removes the callback handler registered to the call result delivery.
|
||||||
|
* Does nothing if the callback handler is in progress already.
|
||||||
|
*/
|
||||||
|
void cancel();
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* @brief Answers whether the asynchronous call is still pending
|
||||||
|
*
|
||||||
|
* @return True if the call is pending, false if the call has been fully completed
|
||||||
|
*/
|
||||||
|
bool isPending();
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend internal::Proxy;
|
||||||
|
PendingAsyncCall(IProxy& proxy, void* slot);
|
||||||
|
|
||||||
|
private:
|
||||||
|
IProxy& proxy_;
|
||||||
|
void* slot_;
|
||||||
|
};
|
||||||
|
|
||||||
// Out-of-line member definitions
|
// Out-of-line member definitions
|
||||||
|
|
||||||
template <typename _Rep, typename _Period>
|
template <typename _Rep, typename _Period>
|
||||||
@ -304,6 +343,11 @@ namespace sdbus {
|
|||||||
return PropertySetter(*this, propertyName);
|
return PropertySetter(*this, propertyName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline PendingAsyncCall::PendingAsyncCall(IProxy& proxy, void* slot)
|
||||||
|
: proxy_(proxy), slot_(slot)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* @brief Creates a proxy object for a specific remote D-Bus object
|
* @brief Creates a proxy object for a specific remote D-Bus object
|
||||||
*
|
*
|
||||||
|
@ -93,7 +93,7 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
|
|||||||
return sendMethodCallMessageAndWaitForReply(message, timeout);
|
return sendMethodCallMessageAndWaitForReply(message, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout)
|
PendingAsyncCall Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout)
|
||||||
{
|
{
|
||||||
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL);
|
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL);
|
||||||
|
|
||||||
@ -102,7 +102,10 @@ void Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReply
|
|||||||
|
|
||||||
callData->slot = message.send(callback, callData.get(), timeout);
|
callData->slot = message.send(callback, callData.get(), timeout);
|
||||||
|
|
||||||
pendingAsyncCalls_.addCall(callData->slot.get(), std::move(callData));
|
auto slotPtr = callData->slot.get();
|
||||||
|
pendingAsyncCalls_.addCall(slotPtr, std::move(callData));
|
||||||
|
|
||||||
|
return {*this, slotPtr};
|
||||||
}
|
}
|
||||||
|
|
||||||
MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout)
|
MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout)
|
||||||
@ -124,7 +127,7 @@ MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& messag
|
|||||||
void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error)
|
void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error)
|
||||||
{
|
{
|
||||||
SCOPE_EXIT{ cond_.notify_one(); };
|
SCOPE_EXIT{ cond_.notify_one(); };
|
||||||
std::unique_lock<std::mutex> lock{mutex_};
|
std::unique_lock lock{mutex_};
|
||||||
SCOPE_EXIT{ arrived_ = true; };
|
SCOPE_EXIT{ arrived_ = true; };
|
||||||
|
|
||||||
//error_ = nullptr; // Necessary if SyncCallReplyData instance is thread_local
|
//error_ = nullptr; // Necessary if SyncCallReplyData instance is thread_local
|
||||||
@ -137,7 +140,7 @@ void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply
|
|||||||
|
|
||||||
MethodReply Proxy::SyncCallReplyData::waitForMethodReply()
|
MethodReply Proxy::SyncCallReplyData::waitForMethodReply()
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock{mutex_};
|
std::unique_lock lock{mutex_};
|
||||||
cond_.wait(lock, [this](){ return arrived_; });
|
cond_.wait(lock, [this](){ return arrived_; });
|
||||||
|
|
||||||
//arrived_ = false; // Necessary if SyncCallReplyData instance is thread_local
|
//arrived_ = false; // Necessary if SyncCallReplyData instance is thread_local
|
||||||
@ -245,6 +248,26 @@ int Proxy::sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd
|
|||||||
|
|
||||||
namespace sdbus {
|
namespace sdbus {
|
||||||
|
|
||||||
|
void PendingAsyncCall::cancel()
|
||||||
|
{
|
||||||
|
assert(dynamic_cast<internal::Proxy*>(&proxy_) != nullptr);
|
||||||
|
assert(slot_ != nullptr);
|
||||||
|
|
||||||
|
static_cast<internal::Proxy&>(proxy_).pendingAsyncCalls_.removeCall(slot_);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PendingAsyncCall::isPending()
|
||||||
|
{
|
||||||
|
assert(dynamic_cast<internal::Proxy*>(&proxy_) != nullptr);
|
||||||
|
assert(slot_ != nullptr);
|
||||||
|
|
||||||
|
return static_cast<internal::Proxy&>(proxy_).pendingAsyncCalls_.existsCall(slot_);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace sdbus {
|
||||||
|
|
||||||
std::unique_ptr<sdbus::IProxy> createProxy( IConnection& connection
|
std::unique_ptr<sdbus::IProxy> createProxy( IConnection& connection
|
||||||
, std::string destination
|
, std::string destination
|
||||||
, std::string objectPath )
|
, std::string objectPath )
|
||||||
|
33
src/Proxy.h
33
src/Proxy.h
@ -52,7 +52,7 @@ namespace sdbus::internal {
|
|||||||
|
|
||||||
MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override;
|
MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override;
|
||||||
MethodReply callMethod(const MethodCall& message, uint64_t timeout) override;
|
MethodReply callMethod(const MethodCall& message, uint64_t timeout) override;
|
||||||
void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override;
|
PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override;
|
||||||
|
|
||||||
void registerSignalHandler( const std::string& interfaceName
|
void registerSignalHandler( const std::string& interfaceName
|
||||||
, const std::string& signalName
|
, const std::string& signalName
|
||||||
@ -81,6 +81,8 @@ namespace sdbus::internal {
|
|||||||
static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
|
static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
friend PendingAsyncCall;
|
||||||
|
|
||||||
std::unique_ptr< sdbus::internal::IConnection
|
std::unique_ptr< sdbus::internal::IConnection
|
||||||
, std::function<void(sdbus::internal::IConnection*)>
|
, std::function<void(sdbus::internal::IConnection*)>
|
||||||
> connection_;
|
> connection_;
|
||||||
@ -125,19 +127,38 @@ namespace sdbus::internal {
|
|||||||
return calls_.emplace(slot, std::move(asyncCallData)).second;
|
return calls_.emplace(slot, std::move(asyncCallData)).second;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool removeCall(void* slot)
|
bool existsCall(void* slot)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex_);
|
std::lock_guard lock(mutex_);
|
||||||
return calls_.erase(slot) > 0;
|
return calls_.count(slot) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void removeCall(void* slot)
|
||||||
|
{
|
||||||
|
std::unique_lock lock(mutex_);
|
||||||
|
if (auto it = calls_.find(slot); it != calls_.end())
|
||||||
|
{
|
||||||
|
auto callData = std::move(it->second);
|
||||||
|
calls_.erase(it);
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
// Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release
|
||||||
|
// out of the `mutex_' critical section here, because if the `removeCall` is called by some
|
||||||
|
// thread and at the same time Proxy's async reply handler (which already holds global sd-bus
|
||||||
|
// mutex) is in progress in a different thread, we get double-mutex deadlock.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void clear()
|
void clear()
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(mutex_);
|
std::unique_lock lock(mutex_);
|
||||||
auto asyncCallSlots = std::move(calls_);
|
auto asyncCallSlots = std::move(calls_);
|
||||||
// Perform releasing of sd-bus slots outside of the calls_ critical section which avoids
|
|
||||||
// double mutex dead lock when the async reply handler is invoked at the same time.
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
|
// Releasing call slot pointer acquires global sd-bus mutex. We have to perform the release
|
||||||
|
// out of the `mutex_' critical section here, because if the `clear` is called by some thread
|
||||||
|
// and at the same time Proxy's async reply handler (which already holds global sd-bus
|
||||||
|
// mutex) is in progress in a different thread, we get double-mutex deadlock.
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -385,6 +385,53 @@ TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide)
|
|||||||
ASSERT_THAT(future.get(), Eq(100));
|
ASSERT_THAT(future.get(), Eq(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(SdbusTestObject, AnswersThatAsyncCallIsPendingIfItIsInProgress)
|
||||||
|
{
|
||||||
|
std::promise<uint32_t> promise;
|
||||||
|
auto future = promise.get_future();
|
||||||
|
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); });
|
||||||
|
|
||||||
|
auto call = m_proxy->doOperationClientSideAsync(100);
|
||||||
|
|
||||||
|
ASSERT_TRUE(call.isPending());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(SdbusTestObject, CancelsPendingAsyncCallOnClientSide)
|
||||||
|
{
|
||||||
|
std::promise<uint32_t> promise;
|
||||||
|
auto future = promise.get_future();
|
||||||
|
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); });
|
||||||
|
auto call = m_proxy->doOperationClientSideAsync(100);
|
||||||
|
|
||||||
|
call.cancel();
|
||||||
|
|
||||||
|
ASSERT_THAT(future.wait_for(300ms), Eq(std::future_status::timeout));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(SdbusTestObject, AnswersThatAsyncCallIsNotPendingAfterItHasBeenCancelled)
|
||||||
|
{
|
||||||
|
std::promise<uint32_t> promise;
|
||||||
|
auto future = promise.get_future();
|
||||||
|
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); });
|
||||||
|
auto call = m_proxy->doOperationClientSideAsync(100);
|
||||||
|
|
||||||
|
call.cancel();
|
||||||
|
|
||||||
|
ASSERT_FALSE(call.isPending());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(SdbusTestObject, AnswersThatAsyncCallIsNotPendingAfterItHasBeenCompleted)
|
||||||
|
{
|
||||||
|
std::promise<uint32_t> promise;
|
||||||
|
auto future = promise.get_future();
|
||||||
|
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, const sdbus::Error* /*err*/){ promise.set_value(1); });
|
||||||
|
|
||||||
|
auto call = m_proxy->doOperationClientSideAsync(0);
|
||||||
|
(void) future.get(); // Wait for the call to finish
|
||||||
|
|
||||||
|
ASSERT_FALSE(call.isPending());
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
|
TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
|
||||||
{
|
{
|
||||||
std::promise<uint32_t> promise;
|
std::promise<uint32_t> promise;
|
||||||
|
@ -156,15 +156,15 @@ public:
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doOperationClientSideAsync(uint32_t param)
|
sdbus::PendingAsyncCall doOperationClientSideAsync(uint32_t param)
|
||||||
{
|
{
|
||||||
object_.callMethodAsync("doOperation")
|
return object_.callMethodAsync("doOperation")
|
||||||
.onInterface(INTERFACE_NAME)
|
.onInterface(INTERFACE_NAME)
|
||||||
.withArguments(param)
|
.withArguments(param)
|
||||||
.uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue)
|
.uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue)
|
||||||
{
|
{
|
||||||
this->onDoOperationReply(returnValue, error);
|
this->onDoOperationReply(returnValue, error);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void doErroneousOperationClientSideAsync()
|
void doErroneousOperationClientSideAsync()
|
||||||
|
@ -33,9 +33,9 @@ protected:
|
|||||||
virtual void onConcatenateReply(const std::string& result, const sdbus::Error* error) = 0;
|
virtual void onConcatenateReply(const std::string& result, const sdbus::Error* error) = 0;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void concatenate(const std::map<std::string, sdbus::Variant>& params)
|
sdbus::PendingAsyncCall concatenate(const std::map<std::string, sdbus::Variant>& params)
|
||||||
{
|
{
|
||||||
proxy_.callMethodAsync("concatenate").onInterface(INTERFACE_NAME).withArguments(params).uponReplyInvoke([this](const sdbus::Error* error, const std::string& result){ this->onConcatenateReply(result, error); });
|
return proxy_.callMethodAsync("concatenate").onInterface(INTERFACE_NAME).withArguments(params).uponReplyInvoke([this](const sdbus::Error* error, const std::string& result){ this->onConcatenateReply(result, error); });
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -173,7 +173,8 @@ std::tuple<std::string, std::string> ProxyGenerator::processMethods(const Nodes&
|
|||||||
std::string outArgStr, outArgTypeStr;
|
std::string outArgStr, outArgTypeStr;
|
||||||
std::tie(outArgStr, outArgTypeStr, std::ignore, std::ignore) = argsToNamesAndTypes(outArgs);
|
std::tie(outArgStr, outArgTypeStr, std::ignore, std::ignore) = argsToNamesAndTypes(outArgs);
|
||||||
|
|
||||||
definitionSS << tab << (async ? "void" : retType) << " " << name << "(" << inArgTypeStr << ")" << endl
|
const std::string realRetType = (async && !dontExpectReply ? "sdbus::PendingAsyncCall" : async ? "void" : retType);
|
||||||
|
definitionSS << tab << realRetType << " " << name << "(" << inArgTypeStr << ")" << endl
|
||||||
<< tab << "{" << endl;
|
<< tab << "{" << endl;
|
||||||
|
|
||||||
if (!timeoutValue.empty())
|
if (!timeoutValue.empty())
|
||||||
@ -186,8 +187,8 @@ std::tuple<std::string, std::string> ProxyGenerator::processMethods(const Nodes&
|
|||||||
definitionSS << tab << tab << retType << " result;" << endl;
|
definitionSS << tab << tab << retType << " result;" << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
definitionSS << tab << tab << "proxy_.callMethod" << (async ? "Async" : "") << "(\"" << name << "\")"
|
definitionSS << tab << tab << (async && !dontExpectReply ? "return " : "")
|
||||||
".onInterface(INTERFACE_NAME)";
|
<< "proxy_.callMethod" << (async ? "Async" : "") << "(\"" << name << "\").onInterface(INTERFACE_NAME)";
|
||||||
|
|
||||||
if (!timeoutValue.empty())
|
if (!timeoutValue.empty())
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user