From f41d9bc39596c1e79afa880e8de78a3532396d26 Mon Sep 17 00:00:00 2001 From: sangelovic Date: Sun, 12 Jan 2020 13:10:34 +0100 Subject: [PATCH] Fix issue of event loop thread and synchronous method call thread polling on the same D-Bus connection Synchronous D-Bus method calls are now done in terms of blocking asynchronous calls. --- docs/using-sdbus-c++.md | 11 ++- include/sdbus-c++/ConvenienceApiClasses.h | 2 +- include/sdbus-c++/ConvenienceApiClasses.inl | 2 +- include/sdbus-c++/IProxy.h | 24 +----- include/sdbus-c++/Message.h | 39 ++++----- src/Connection.cpp | 35 ++++++++ src/Connection.h | 6 ++ src/IConnection.h | 3 +- src/Message.cpp | 31 +++---- src/Proxy.cpp | 86 ++++++++++++++++--- src/Proxy.h | 23 ++++- .../integrationtests/AdaptorAndProxy_test.cpp | 44 +++++++++- tests/integrationtests/TestingAdaptor.h | 7 ++ tests/integrationtests/TestingProxy.h | 1 + tests/integrationtests/adaptor-glue.h | 5 ++ tests/integrationtests/proxy-glue.h | 18 ++++ tests/perftests/client.cpp | 26 +++++- 17 files changed, 281 insertions(+), 82 deletions(-) diff --git a/docs/using-sdbus-c++.md b/docs/using-sdbus-c++.md index 0368e2f..2bb9230 100644 --- a/docs/using-sdbus-c++.md +++ b/docs/using-sdbus-c++.md @@ -157,8 +157,7 @@ The following diagram illustrates the major entities in sdbus-c++. `Message` class represents a message, which is the fundamental DBus concept. There are three distinctive types of message that are derived from the `Message` class: - * `MethodCall` (with serialized parameters), - * `AsyncMethodCall` (with serialized parameters), + * `MethodCall` (be it synchronous or asynchronous method call, with serialized parameters), * `MethodReply` (with serialized return values), * `Signal` (with serialized parameters), * `PropertySetCall` (with serialized parameter value to be set) @@ -167,13 +166,17 @@ The following diagram illustrates the major entities in sdbus-c++. ### Thread safety in sdbus-c++ -sdbus-c++ is thread-aware by design. But, in general, it's not thread-safe. At least not in all places. There are situations where sdbus-c++ provides and guarantees API-level thread safety by design. It is safe to do these operations from multiple threads at the same time: +sdbus-c++ is completely thread-aware by design. Though sdbus-c++ is not thread-safe in general, there are situations where sdbus-c++ provides and guarantees API-level thread safety by design. It is safe to do these operations (operations within the bullet points, not across them) from multiple threads at the same time: - * Making and destroying `Object`s and `Proxy`s, even on a shared connection that is already running an event loop. Under *making* here is meant a complete atomic sequence of construction, registration of method/signal/property callbacks and export of the `Object`/`Proxy` so it is ready to issue/receive messages. This sequence must be done in one thread. + * Making or destroying distinct `Object`/`Proxy` instances simultaneously (even on a shared connection that is running an event loop already, see below). Under *making* here is meant a complete sequence of construction, registration of method/signal/property callbacks and export of the `Object`/`Proxy` so it is ready to issue/receive messages. This sequence must be completely done within the context of one thread. * Creating and sending asynchronous method replies on an `Object` instance. * Creating and emitting signals on an `Object` instance. * Creating and sending method calls (both synchronously and asynchronously) on an `Proxy` instance. (But it's generally better that our threads use their own exclusive instances of proxy, to minimize shared state and contention.) +sdbus-c++ is designed such that all the above operations are thread-safe also on a connection that is running an event loop (usually in a separate thread) at that time. It's an internal thread safety. For example, a signal arrives and is processed by sdbus-c++ even loop at an appropriate `Proxy` instance, while the user is going to destroy that instance in their application thread. The user cannot explicitly control these situations (or they could, but that would be very limiting and cubersome on the API level). + +However, other combinations, that the user invokes explicitly from within more threads are NOT thread-safe in sdbus-c++ by design, and the user should make sure by their design that these cases never occur. For example, destroying an `Object` instance in one thread while emitting a signal on it in another thread is not thread-safe. In this specific case, the user should make sure in their application that all threads stop working with a specific instance before a thread proceeds with deleting that instance. + Multiple layers of sdbus-c++ API ------------------------------- diff --git a/include/sdbus-c++/ConvenienceApiClasses.h b/include/sdbus-c++/ConvenienceApiClasses.h index 52a4f88..225e00e 100644 --- a/include/sdbus-c++/ConvenienceApiClasses.h +++ b/include/sdbus-c++/ConvenienceApiClasses.h @@ -192,7 +192,7 @@ namespace sdbus { IProxy& proxy_; const std::string& methodName_; uint64_t timeout_{}; - AsyncMethodCall method_; + MethodCall method_; }; class SignalSubscriber diff --git a/include/sdbus-c++/ConvenienceApiClasses.inl b/include/sdbus-c++/ConvenienceApiClasses.inl index a41cd22..a0c5688 100644 --- a/include/sdbus-c++/ConvenienceApiClasses.inl +++ b/include/sdbus-c++/ConvenienceApiClasses.inl @@ -503,7 +503,7 @@ namespace sdbus { inline AsyncMethodInvoker& AsyncMethodInvoker::onInterface(const std::string& interfaceName) { - method_ = proxy_.createAsyncMethodCall(interfaceName, methodName_); + method_ = proxy_.createMethodCall(interfaceName, methodName_); return *this; } diff --git a/include/sdbus-c++/IProxy.h b/include/sdbus-c++/IProxy.h index 0f3715c..054fd33 100644 --- a/include/sdbus-c++/IProxy.h +++ b/include/sdbus-c++/IProxy.h @@ -36,7 +36,6 @@ // Forward declarations namespace sdbus { class MethodCall; - class AsyncMethodCall; class MethodReply; class IConnection; } @@ -77,21 +76,6 @@ namespace sdbus { */ virtual MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) = 0; - /*! - * @brief Creates an asynchronous method call message - * - * @param[in] interfaceName Name of an interface that provides a given method - * @param[in] methodName Name of the method - * @return A method call message - * - * Serialize method arguments into the returned message and invoke the method by passing - * the message with serialized arguments to the @c callMethod function. - * Alternatively, use higher-level API @c callMethodAsync(const std::string& methodName) defined below. - * - * @throws sdbus::Error in case of failure - */ - virtual AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) = 0; - /*! * @brief Calls method on the proxied D-Bus object * @@ -133,13 +117,13 @@ namespace sdbus { * * @throws sdbus::Error in case of failure */ - virtual void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0; + virtual void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout = 0) = 0; /*! - * @copydoc IProxy::callMethod(const AsyncMethodCall&,async_reply_handler,uint64_t) + * @copydoc IProxy::callMethod(const MethodCall&,async_reply_handler,uint64_t) */ template - void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback, const std::chrono::duration<_Rep, _Period>& timeout); + void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, const std::chrono::duration<_Rep, _Period>& timeout); /*! * @brief Registers a handler for the desired signal emitted by the proxied D-Bus object @@ -289,7 +273,7 @@ namespace sdbus { } template - inline void IProxy::callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback, const std::chrono::duration<_Rep, _Period>& timeout) + inline void IProxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, const std::chrono::duration<_Rep, _Period>& timeout) { auto microsecs = std::chrono::duration_cast(timeout); callMethod(message, std::move(asyncReplyCallback), microsecs.count()); diff --git a/include/sdbus-c++/Message.h b/include/sdbus-c++/Message.h index 9889425..5726d63 100644 --- a/include/sdbus-c++/Message.h +++ b/include/sdbus-c++/Message.h @@ -165,35 +165,32 @@ namespace sdbus { mutable bool ok_{true}; }; + struct dont_request_slot_t { explicit dont_request_slot_t() = default; }; + inline constexpr dont_request_slot_t dont_request_slot{}; + class MethodCall : public Message { using Message::Message; friend Factory; - public: - MethodCall() = default; - MethodReply send(uint64_t timeout = 0) const; - MethodReply createReply() const; - MethodReply createErrorReply(const sdbus::Error& error) const; - void dontExpectReply(); - bool doesntExpectReply() const; - - private: - MethodReply sendWithReply(uint64_t timeout) const; - MethodReply sendWithNoReply() const; - }; - - class AsyncMethodCall : public Message - { - using Message::Message; - friend Factory; - public: using Slot = std::unique_ptr>; - AsyncMethodCall() = default; - explicit AsyncMethodCall(MethodCall&& call) noexcept; - Slot send(void* callback, void* userData, uint64_t timeout = 0) const; + MethodCall() = default; + + MethodReply send(uint64_t timeout) const; + void send(void* callback, void* userData, uint64_t timeout, dont_request_slot_t) const; + [[nodiscard]] Slot send(void* callback, void* userData, uint64_t timeout) const; + + MethodReply createReply() const; + MethodReply createErrorReply(const sdbus::Error& error) const; + + void dontExpectReply(); + bool doesntExpectReply() const; + + private: + MethodReply sendWithReply(uint64_t timeout = 0) const; + MethodReply sendWithNoReply() const; }; class MethodReply : public Message diff --git a/src/Connection.cpp b/src/Connection.cpp index db385f3..5d09392 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -86,6 +86,10 @@ std::string Connection::getUniqueName() const void Connection::enterProcessingLoop() { + loopThreadId_ = std::this_thread::get_id(); + + std::lock_guard guard(loopMutex_); + while (true) { auto processed = processPendingRequest(); @@ -96,6 +100,8 @@ void Connection::enterProcessingLoop() if (!success) break; // Exit processing loop } + + loopThreadId_ = std::thread::id{}; } void Connection::enterProcessingLoopAsync() @@ -288,6 +294,35 @@ SlotPtr Connection::registerSignalHandler( const std::string& objectPath return {slot, [this](void *slot){ iface_->sd_bus_slot_unref((sd_bus_slot*)slot); }}; } +MethodReply Connection::tryCallMethodSynchronously(const MethodCall& message, uint64_t timeout) +{ + auto loopThreadId = loopThreadId_.load(std::memory_order_relaxed); + + // Is the loop not yet on? => Go make synchronous call + while (loopThreadId == std::thread::id{}) + { + // Did the loop begin in the meantime? Or try_lock() failed spuriously? + if (!loopMutex_.try_lock()) + { + loopThreadId = loopThreadId_.load(std::memory_order_relaxed); + continue; + } + + // Synchronous D-Bus call + std::lock_guard guard(loopMutex_, std::adopt_lock); + return message.send(timeout); + } + + // Is the loop on and we are in the same thread? => Go for synchronous call + if (loopThreadId == std::this_thread::get_id()) + { + assert(!loopMutex_.try_lock()); + return message.send(timeout); + } + + return {}; +} + Connection::BusPtr Connection::openBus(const BusFactory& busFactory) { sd_bus* bus{}; diff --git a/src/Connection.h b/src/Connection.h index d5c6b57..2fa5f90 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -37,6 +37,8 @@ #include #include #include +#include +#include namespace sdbus { namespace internal { @@ -103,6 +105,8 @@ namespace sdbus { namespace internal { , sd_bus_message_handler_t callback , void* userData ) override; + MethodReply tryCallMethodSynchronously(const MethodCall& message, uint64_t timeout) override; + private: using BusFactory = std::function; using BusPtr = std::unique_ptr>; @@ -130,6 +134,8 @@ namespace sdbus { namespace internal { std::unique_ptr iface_; BusPtr bus_; std::thread asyncLoopThread_; + std::atomic loopThreadId_; + std::mutex loopMutex_; LoopExitEventFd loopExitFd_; }; diff --git a/src/IConnection.h b/src/IConnection.h index 07b74b3..549f735 100644 --- a/src/IConnection.h +++ b/src/IConnection.h @@ -36,7 +36,6 @@ // Forward declaration namespace sdbus { class MethodCall; - class AsyncMethodCall; class MethodReply; class Signal; namespace internal { @@ -91,6 +90,8 @@ namespace internal { virtual void enterProcessingLoopAsync() = 0; virtual void leaveProcessingLoop() = 0; + + virtual MethodReply tryCallMethodSynchronously(const MethodCall& message, uint64_t timeout) = 0; }; } diff --git a/src/Message.cpp b/src/Message.cpp index 8ebefa2..851303f 100644 --- a/src/Message.cpp +++ b/src/Message.cpp @@ -660,6 +660,22 @@ MethodReply MethodCall::sendWithNoReply() const return Factory::create(); // No reply } +void MethodCall::send(void* callback, void* userData, uint64_t timeout, dont_request_slot_t) const +{ + auto r = sdbus_->sd_bus_call_async(nullptr, nullptr, (sd_bus_message*)msg_, (sd_bus_message_handler_t)callback, userData, timeout); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method", -r); +} + +MethodCall::Slot MethodCall::send(void* callback, void* userData, uint64_t timeout) const +{ + sd_bus_slot* slot; + + auto r = sdbus_->sd_bus_call_async(nullptr, &slot, (sd_bus_message*)msg_, (sd_bus_message_handler_t)callback, userData, timeout); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method asynchronously", -r); + + return Slot{slot, [sdbus_ = sdbus_](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }}; +} + MethodReply MethodCall::createReply() const { sd_bus_message* sdbusReply{}; @@ -682,21 +698,6 @@ MethodReply MethodCall::createErrorReply(const Error& error) const return Factory::create(sdbusErrorReply, sdbus_, adopt_message); } -AsyncMethodCall::AsyncMethodCall(MethodCall&& call) noexcept - : Message(std::move(call)) -{ -} - -AsyncMethodCall::Slot AsyncMethodCall::send(void* callback, void* userData, uint64_t timeout) const -{ - sd_bus_slot* slot; - - auto r = sdbus_->sd_bus_call_async(nullptr, &slot, (sd_bus_message*)msg_, (sd_bus_message_handler_t)callback, userData, timeout); - SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method asynchronously", -r); - - return Slot{slot, [sdbus_ = sdbus_](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }}; -} - void MethodReply::send() const { auto r = sdbus_->sd_bus_send(nullptr, (sd_bus_message*)msg_, nullptr); diff --git a/src/Proxy.cpp b/src/Proxy.cpp index e396f2f..fbf80ad 100644 --- a/src/Proxy.cpp +++ b/src/Proxy.cpp @@ -64,19 +64,36 @@ MethodCall Proxy::createMethodCall(const std::string& interfaceName, const std:: return connection_->createMethodCall(destination_, objectPath_, interfaceName, methodName); } -AsyncMethodCall Proxy::createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) -{ - return AsyncMethodCall{Proxy::createMethodCall(interfaceName, methodName)}; -} - MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout) { + // Sending method call synchronously is the only operation that blocks, waiting for the method + // reply message among the incoming message on the sd-bus connection socket. But typically there + // already is somebody that generally handles incoming D-Bus messages -- the connection event loop + // running typically in its own thread. We have to avoid polling on socket from several threads. + // So we have to branch here: either we are within the context of the event loop thread, then we + // can send the message simply via sd_bus_call, which blocks. Or we are in another thread, then + // we can perform the send operation of the method call message from here (because that is thread- + // safe like other sd-bus API accesses), but the incoming reply we have to get through the event + // loop thread, because this is be the only rightful listener on the sd-bus connection socket. + // So, technically, we use async means to wait here for reply received by the event loop thread. + SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid method call message provided", EINVAL); - return message.send(timeout); + // If we don't need to wait for any reply, we can send the message now irrespective of the context + if (message.doesntExpectReply()) + return message.send(timeout); + + // If we are in the context of event loop thread, we can send the D-Bus call synchronously + // and wait blockingly for the reply, because we are the exclusive listeners on the socket + auto reply = connection_->tryCallMethodSynchronously(message, timeout); + if (reply.isValid()) + return reply; + + // Otherwise we send the call asynchronously and do blocking wait for the reply from the event loop thread + return sendMethodCallMessageAndWaitForReply(message, timeout); } -void Proxy::callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) +void Proxy::callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) { SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid async method call message provided", EINVAL); @@ -88,6 +105,49 @@ void Proxy::callMethod(const AsyncMethodCall& message, async_reply_handler async pendingAsyncCalls_.addCall(callData->slot.get(), std::move(callData)); } +MethodReply Proxy::sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout) +{ + /*thread_local*/ SyncCallReplyData syncCallReplyData; + + async_reply_handler asyncReplyCallback = [&syncCallReplyData](MethodReply& reply, const Error* error) + { + syncCallReplyData.sendMethodReplyToWaitingThread(reply, error); + }; + auto callback = (void*)&Proxy::sdbus_async_reply_handler; + AsyncCalls::CallData callData{*this, std::move(asyncReplyCallback), {}}; + + message.send(callback, &callData, timeout, dont_request_slot); + + return syncCallReplyData.waitForMethodReply(); +} + +void Proxy::SyncCallReplyData::sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error) +{ + SCOPE_EXIT{ cond_.notify_one(); }; + std::unique_lock lock{mutex_}; + SCOPE_EXIT{ arrived_ = true; }; + + //error_ = nullptr; // Necessary if SyncCallReplyData instance is thread_local + + if (error == nullptr) + reply_ = std::move(reply); + else + error_ = std::make_unique(*error); +} + +MethodReply Proxy::SyncCallReplyData::waitForMethodReply() +{ + std::unique_lock lock{mutex_}; + cond_.wait(lock, [this](){ return arrived_; }); + + //arrived_ = false; // Necessary if SyncCallReplyData instance is thread_local + + if (error_) + throw *error_; + + return std::move(reply_); +} + void Proxy::registerSignalHandler( const std::string& interfaceName , const std::string& signalName , signal_handler signalHandler ) @@ -122,7 +182,7 @@ void Proxy::registerSignalHandlers(sdbus::internal::IConnection& connection) slot = connection.registerSignalHandler( objectPath_ , interfaceName , signalName - , &Proxy::sdbus_signal_callback + , &Proxy::sdbus_signal_handler , this ); } } @@ -141,7 +201,13 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat assert(asyncCallData->callback); auto& proxy = asyncCallData->proxy; - SCOPE_EXIT{ proxy.pendingAsyncCalls_.removeCall(asyncCallData->slot.get()); }; + SCOPE_EXIT + { + // Slot may be null if we're doing blocking synchronous call implemented by means of asynchronous call, + // because in that case the call data is still alive on the stack, we don't need to manage it separately. + if (asyncCallData->slot) + proxy.pendingAsyncCalls_.removeCall(asyncCallData->slot.get()); + }; auto message = Message::Factory::create(sdbusMessage, &proxy.connection_->getSdBusInterface()); @@ -159,7 +225,7 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat return 1; } -int Proxy::sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/) +int Proxy::sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/) { auto* proxy = static_cast(userData); assert(proxy != nullptr); diff --git a/src/Proxy.h b/src/Proxy.h index d211db6..1b0dfbc 100644 --- a/src/Proxy.h +++ b/src/Proxy.h @@ -35,6 +35,7 @@ #include #include #include +#include namespace sdbus { namespace internal { @@ -51,9 +52,8 @@ namespace internal { , std::string objectPath ); MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override; - AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) override; MethodReply callMethod(const MethodCall& message, uint64_t timeout) override; - void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override; + void callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) override; void registerSignalHandler( const std::string& interfaceName , const std::string& signalName @@ -62,9 +62,24 @@ namespace internal { void unregister() override; private: + class SyncCallReplyData + { + public: + void sendMethodReplyToWaitingThread(MethodReply& reply, const Error* error); + MethodReply waitForMethodReply(); + + private: + std::mutex mutex_; + std::condition_variable cond_; + bool arrived_{}; + MethodReply reply_; + std::unique_ptr error_; + }; + + MethodReply sendMethodCallMessageAndWaitForReply(const MethodCall& message, uint64_t timeout); void registerSignalHandlers(sdbus::internal::IConnection& connection); static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); - static int sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); + static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); private: std::unique_ptr< sdbus::internal::IConnection @@ -97,7 +112,7 @@ namespace internal { { Proxy& proxy; async_reply_handler callback; - AsyncMethodCall::Slot slot; + MethodCall::Slot slot; }; ~AsyncCalls() diff --git a/tests/integrationtests/AdaptorAndProxy_test.cpp b/tests/integrationtests/AdaptorAndProxy_test.cpp index 68753a0..91b62b5 100644 --- a/tests/integrationtests/AdaptorAndProxy_test.cpp +++ b/tests/integrationtests/AdaptorAndProxy_test.cpp @@ -48,6 +48,7 @@ using ::testing::Eq; using ::testing::DoubleEq; using ::testing::Gt; +using ::testing::AnyOf; using ::testing::ElementsAre; using ::testing::SizeIs; using namespace std::chrono_literals; @@ -106,6 +107,7 @@ public: }; std::unique_ptr AdaptorAndProxyFixture::s_connection = sdbus::createSystemBusConnection(); + } /*-------------------------------------*/ @@ -247,8 +249,38 @@ TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenMethodTimesOut) } catch (const sdbus::Error& e) { - ASSERT_THAT(e.getName(), Eq("org.freedesktop.DBus.Error.Timeout")); - ASSERT_THAT(e.getMessage(), Eq("Connection timed out")); + ASSERT_THAT(e.getName(), AnyOf("org.freedesktop.DBus.Error.Timeout", "org.freedesktop.DBus.Error.NoReply")); + ASSERT_THAT(e.getMessage(), AnyOf("Connection timed out", "Method call timed out")); + } + catch(...) + { + FAIL() << "Expected sdbus::Error exception"; + } +} + +TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenClientSideAsyncMethodTimesOut) +{ + try + { + std::promise promise; + auto future = promise.get_future(); + m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err) + { + if (err == nullptr) + promise.set_value(res); + else + promise.set_exception(std::make_exception_ptr(*err)); + }); + + m_proxy->doOperationClientSideAsyncWith500msTimeout(1000); // The operation will take 1s, but the timeout is 500ms, so we should time out + future.get(), Eq(100); + + FAIL() << "Expected sdbus::Error exception"; + } + catch (const sdbus::Error& e) + { + ASSERT_THAT(e.getName(), AnyOf("org.freedesktop.DBus.Error.Timeout", "org.freedesktop.DBus.Error.NoReply")); + ASSERT_THAT(e.getMessage(), AnyOf("Connection timed out", "Method call timed out")); } catch(...) { @@ -392,6 +424,14 @@ TEST_F(SdbusTestObject, FailsCallingMethodOnNonexistentObject) ASSERT_THROW(proxy.getInt(), sdbus::Error); } +TEST_F(SdbusTestObject, ReceivesTwoSignalsWhileMakingMethodCall) +{ + m_proxy->emitTwoSimpleSignals(); + + ASSERT_TRUE(waitUntil(m_proxy->m_gotSimpleSignal)); + ASSERT_TRUE(waitUntil(m_proxy->m_gotSignalWithMap)); +} + #if LIBSYSTEMD_VERSION>=240 TEST_F(SdbusTestObject, CanSetGeneralMethodTimeoutWithLibsystemdVersionGreaterThan239) { diff --git a/tests/integrationtests/TestingAdaptor.h b/tests/integrationtests/TestingAdaptor.h index 19caa2b..4ebdd65 100644 --- a/tests/integrationtests/TestingAdaptor.h +++ b/tests/integrationtests/TestingAdaptor.h @@ -197,6 +197,13 @@ protected: throw sdbus::createError(1, "A test error occurred"); } + + void emitTwoSimpleSignals() override + { + emitSimpleSignal(); + emitSignalWithMap({}); + } + std::string state() { return m_state; diff --git a/tests/integrationtests/TestingProxy.h b/tests/integrationtests/TestingProxy.h index d45254e..027af00 100644 --- a/tests/integrationtests/TestingProxy.h +++ b/tests/integrationtests/TestingProxy.h @@ -108,6 +108,7 @@ protected: //private: public: // for tests + int m_SimpleSignals = 0; std::atomic m_gotSimpleSignal{false}; std::atomic m_gotSignalWithMap{false}; std::map m_mapFromSignal; diff --git a/tests/integrationtests/adaptor-glue.h b/tests/integrationtests/adaptor-glue.h index 88bdaf8..d5d88d5 100644 --- a/tests/integrationtests/adaptor-glue.h +++ b/tests/integrationtests/adaptor-glue.h @@ -106,6 +106,8 @@ protected: object_.registerMethod("doPrivilegedStuff").onInterface(INTERFACE_NAME).implementedAs([](){}).markAsPrivileged(); + object_.registerMethod("emitTwoSimpleSignals").onInterface(INTERFACE_NAME).implementedAs([this](){ this->emitTwoSimpleSignals(); }); + // registration of signals is optional, it is useful because of introspection object_.registerSignal("simpleSignal").onInterface(INTERFACE_NAME).markAsDeprecated(); object_.registerSignal("signalWithMap").onInterface(INTERFACE_NAME).withParameters>(); @@ -168,6 +170,7 @@ protected: virtual sdbus::UnixFd getUnixFd() const = 0; virtual ComplexType getComplex() const = 0; virtual void throwError() const = 0; + virtual void emitTwoSimpleSignals() = 0; virtual std::string state() = 0; virtual uint32_t action() = 0; @@ -240,6 +243,8 @@ R"delimiter( + + diff --git a/tests/integrationtests/proxy-glue.h b/tests/integrationtests/proxy-glue.h index fb2b768..f2b8442 100644 --- a/tests/integrationtests/proxy-glue.h +++ b/tests/integrationtests/proxy-glue.h @@ -56,6 +56,11 @@ protected: virtual void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) = 0; public: + void emitTwoSimpleSignals() + { + object_.callMethod("emitTwoSimpleSignals").onInterface(INTERFACE_NAME); + } + void noArgNoReturn() { object_.callMethod("noArgNoReturn").onInterface(INTERFACE_NAME); @@ -172,6 +177,19 @@ public: }); } + void doOperationClientSideAsyncWith500msTimeout(uint32_t param) + { + using namespace std::chrono_literals; + object_.callMethodAsync("doOperation") + .onInterface(INTERFACE_NAME) + .withTimeout(500000us) + .withArguments(param) + .uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue) + { + this->onDoOperationReply(returnValue, error); + }); + } + sdbus::Signature getSignature() { sdbus::Signature result; diff --git a/tests/perftests/client.cpp b/tests/perftests/client.cpp index dacb216..3fe16a5 100644 --- a/tests/perftests/client.cpp +++ b/tests/perftests/client.cpp @@ -37,6 +37,8 @@ using namespace std::chrono_literals; +uint64_t totalDuration = 0; + class PerftestProxy : public sdbus::ProxyInterfaces { public: @@ -66,7 +68,9 @@ protected: else if (counter == m_msgCount) { auto stopTime = std::chrono::steady_clock::now(); - std::cout << "Received " << m_msgCount << " signals in: " << std::chrono::duration_cast(stopTime - startTime).count() << " ms" << std::endl; + auto duration = std::chrono::duration_cast(stopTime - startTime).count(); + totalDuration += duration; + std::cout << "Received " << m_msgCount << " signals in: " << duration << " ms" << std::endl; counter = 0; } } @@ -114,6 +118,9 @@ int main(int /*argc*/, char */*argv*/[]) std::this_thread::sleep_for(1000ms); } + std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; + totalDuration = 0; + msgSize = 1000; std::cout << std::endl << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; client.m_msgCount = msgCount; client.m_msgSize = msgSize; @@ -124,6 +131,9 @@ int main(int /*argc*/, char */*argv*/[]) std::this_thread::sleep_for(1000ms); } + std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; + totalDuration = 0; + msgSize = 20; std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; for (unsigned int r = 0; r < repetitions; ++r) @@ -140,11 +150,16 @@ int main(int /*argc*/, char */*argv*/[]) assert(result.size() == msgSize); } auto stopTime = std::chrono::steady_clock::now(); - std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast(stopTime - startTime).count() << " ms" << std::endl; + auto duration = std::chrono::duration_cast(stopTime - startTime).count(); + totalDuration += duration; + std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl; std::this_thread::sleep_for(1000ms); } + std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; + totalDuration = 0; + msgSize = 1000; std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; for (unsigned int r = 0; r < repetitions; ++r) @@ -161,10 +176,15 @@ int main(int /*argc*/, char */*argv*/[]) assert(result.size() == msgSize); } auto stopTime = std::chrono::steady_clock::now(); - std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast(stopTime - startTime).count() << " ms" << std::endl; + auto duration = std::chrono::duration_cast(stopTime - startTime).count(); + totalDuration += duration; + std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl; std::this_thread::sleep_for(1000ms); } + std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; + totalDuration = 0; + return 0; }