diff --git a/include/sdbus-c++/IConnection.h b/include/sdbus-c++/IConnection.h index efa3341..1847dc4 100644 --- a/include/sdbus-c++/IConnection.h +++ b/include/sdbus-c++/IConnection.h @@ -31,6 +31,7 @@ #include #include #include +#include namespace sdbus { @@ -47,11 +48,60 @@ namespace sdbus { class IConnection { public: + /*! + * Poll Data for external event loop implementations. + * + * To integrate sdbus with your app's own custom event handling system + * you can use this method to query which file descriptors, poll events + * and timeouts you should add to your app's poll(2), or select(2) + * call in your main event loop. + * + * If you are unsure what this all means then use + * enterEventLoop() or enterEventLoopAsync() instead. + * + * See: getEventLoopPollData() + */ struct PollData { + /*! + * The read fd to be monitored by the event loop. + */ int fd; + /*! + * The events to use for poll(2) alongside fd. + */ short int events; + + /*! + * Absolute timeout value in micro seconds and based of CLOCK_MONOTONIC. + */ uint64_t timeout_usec; + + /*! + * Get the event poll timeout. + * + * The timeout is an absolute value based of CLOCK_MONOTONIC. + * + * @return a duration since the CLOCK_MONOTONIC epoch started. + */ + [[nodiscard]] std::chrono::microseconds getAbsoluteTimeout() const { + return std::chrono::microseconds(timeout_usec); + } + + /*! + * Get the timeout as relative value from now + * + * @return std::nullopt if the timeout is indefinite. A duration otherwise. + */ + [[nodiscard]] std::optional getRelativeTimeout() const; + + /*! + * Get a converted, relative timeout which can be passed as argument 'timeout' to poll(2) + * + * @return -1 if the timeout is indefinite. 0 if the poll(2) shouldn't block. An integer in milli + * seconds otherwise. + */ + [[nodiscard]] int getPollTimeout() const; }; virtual ~IConnection() = default; diff --git a/include/sdbus-c++/Message.h b/include/sdbus-c++/Message.h index 7a48bf9..0fabca5 100644 --- a/include/sdbus-c++/Message.h +++ b/include/sdbus-c++/Message.h @@ -49,6 +49,7 @@ namespace sdbus { class MethodReply; namespace internal { class ISdBus; + class IConnection; } } @@ -195,9 +196,13 @@ namespace sdbus { void dontExpectReply(); bool doesntExpectReply() const; + protected: + MethodCall(void *msg, internal::ISdBus* sdbus, const internal::IConnection* connection, adopt_message_t) noexcept; + private: MethodReply sendWithReply(uint64_t timeout = 0) const; MethodReply sendWithNoReply() const; + const internal::IConnection* connection_{}; }; class MethodReply : public Message diff --git a/src/Connection.cpp b/src/Connection.cpp index b4067d9..2dbd21c 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -122,7 +122,7 @@ void Connection::leaveEventLoop() Connection::PollData Connection::getEventLoopPollData() const { - ISdBus::PollData pollData; + ISdBus::PollData pollData{}; auto r = iface_->sd_bus_get_poll_data(bus_.get(), &pollData); SDBUS_THROW_ERROR_IF(r < 0, "Failed to get bus poll data", -r); @@ -221,7 +221,8 @@ MethodCall Connection::createMethodCall( const std::string& destination SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method call", -r); - return Message::Factory::create(sdbusMsg, iface_.get(), adopt_message); + return Message::Factory::create(sdbusMsg, iface_.get(), + static_cast(this), adopt_message); } Signal Connection::createSignal( const std::string& objectPath @@ -366,19 +367,38 @@ void Connection::finishHandshake(sd_bus* bus) SDBUS_THROW_ERROR_IF(r < 0, "Failed to flush bus on opening", -r); } -void Connection::notifyEventLoopToExit() +void Connection::notifyEventLoop(int fd) const { - assert(loopExitFd_.fd >= 0); + assert(fd >= 0); uint64_t value = 1; - auto r = write(loopExitFd_.fd, &value, sizeof(value)); + auto r = write(fd, &value, sizeof(value)); SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify event loop", -errno); } -void Connection::clearExitNotification() +void Connection::notifyEventLoopToExit() const +{ + notifyEventLoop(loopExitFd_.fd); +} + +void Connection::notifyEventLoopNewTimeout() const { + // The extra notifications for new timeouts are only needed if calls are made asynchronously to the event loop. + // Are we in the same thread as the event loop? Note that it's ok to fail this check because the event loop isn't yet started. + if (loopThreadId_.load(std::memory_order_relaxed) == std::this_thread::get_id()) { + return; + } + + // alternatively use ::sd_bus_get_timeout(..) + auto sdbusPollData = getEventLoopPollData(); + if (sdbusPollData.timeout_usec < activeTimeout_) { + notifyEventLoop(eventFd_.fd); + } +} + +void Connection::clearEventLoopNotification(int fd) const { uint64_t value{}; - auto r = read(loopExitFd_.fd, &value, sizeof(value)); + auto r = read(fd, &value, sizeof(value)); SDBUS_THROW_ERROR_IF(r < 0, "Failed to read from the event descriptor", -errno); } @@ -402,13 +422,18 @@ bool Connection::processPendingRequest() bool Connection::waitForNextRequest() { assert(bus_ != nullptr); - assert(loopExitFd_.fd != 0); + assert(eventFd_.fd != 0); auto sdbusPollData = getEventLoopPollData(); - struct pollfd fds[] = {{sdbusPollData.fd, sdbusPollData.events, 0}, {loopExitFd_.fd, POLLIN, 0}}; + struct pollfd fds[] = { + {sdbusPollData.fd, sdbusPollData.events, 0}, + {eventFd_.fd, POLLIN, 0}, + {loopExitFd_.fd, POLLIN, 0} + }; auto fdsCount = sizeof(fds)/sizeof(fds[0]); - auto timeout = sdbusPollData.timeout_usec == (uint64_t) -1 ? (uint64_t)-1 : (sdbusPollData.timeout_usec+999)/1000; + auto timeout = sdbusPollData.getPollTimeout(); + activeTimeout_ = sdbusPollData.timeout_usec; auto r = poll(fds, fdsCount, timeout); if (r < 0 && errno == EINTR) @@ -416,9 +441,15 @@ bool Connection::waitForNextRequest() SDBUS_THROW_ERROR_IF(r < 0, "Failed to wait on the bus", -errno); + // new timeout notification if (fds[1].revents & POLLIN) { - clearExitNotification(); + clearEventLoopNotification(fds[1].fd); + } + // loop exit notification + if (fds[2].revents & POLLIN) + { + clearEventLoopNotification(fds[2].fd); return false; } @@ -450,13 +481,13 @@ std::vector Connection::to_strv(const std::vector& return strv; } -Connection::LoopExitEventFd::LoopExitEventFd() +Connection::EventFd::EventFd() { - fd = eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC | EFD_NONBLOCK); + fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); SDBUS_THROW_ERROR_IF(fd < 0, "Failed to create event object", -errno); } -Connection::LoopExitEventFd::~LoopExitEventFd() +Connection::EventFd::~EventFd() { assert(fd >= 0); close(fd); @@ -537,4 +568,38 @@ std::unique_ptr createRemoteSystemBusConnection(const std::s return std::make_unique(std::move(interface), remote_system_bus, host); } +} // namespace sdbus::inernal + +namespace sdbus { + +std::optional IConnection::PollData::getRelativeTimeout() const +{ + constexpr auto zero =std::chrono::microseconds::zero(); + if (timeout_usec == 0) { + return zero; + } + else if (timeout_usec == UINT64_MAX) { + return std::nullopt; + } + + // We need CLOCK_MONOTONIC so that we use the same clock as the underlying sd-bus lib. + // We use POSIX's clock_gettime in favour of std::chrono::steady_clock to ensure this. + struct timespec ts{}; + auto r = clock_gettime(CLOCK_MONOTONIC, &ts); + SDBUS_THROW_ERROR_IF(r < 0, "clock_gettime failed: ", -errno); + auto now = std::chrono::nanoseconds(ts.tv_nsec) + std::chrono::seconds(ts.tv_sec); + auto absTimeout = std::chrono::microseconds(timeout_usec); + auto result = std::chrono::duration_cast(absTimeout - now); + return std::max(result, zero); } + +int IConnection::PollData::getPollTimeout() const +{ + auto timeout = getRelativeTimeout(); + if (!timeout) { + return -1; + } + return (int) std::chrono::ceil(timeout.value()).count(); +} + +} // namespace sdbus \ No newline at end of file diff --git a/src/Connection.h b/src/Connection.h index 4a54715..f2b1c04 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -121,15 +121,19 @@ namespace sdbus::internal { static std::string composeSignalMatchFilter(const std::string &sender, const std::string &objectPath, const std::string &interfaceName, const std::string &signalName); - void notifyEventLoopToExit(); - void clearExitNotification(); + void notifyEventLoop(int fd) const; + void notifyEventLoopToExit() const; + void clearEventLoopNotification(int fd) const; + void notifyEventLoopNewTimeout() const override; + + private: void joinWithEventLoop(); static std::vector to_strv(const std::vector& strings); - struct LoopExitEventFd + struct EventFd { - LoopExitEventFd(); - ~LoopExitEventFd(); + EventFd(); + ~EventFd(); int fd; }; @@ -139,7 +143,9 @@ namespace sdbus::internal { std::thread asyncLoopThread_; std::atomic loopThreadId_; std::mutex loopMutex_; - LoopExitEventFd loopExitFd_; + EventFd loopExitFd_; + EventFd eventFd_; + std::atomic activeTimeout_{}; }; } diff --git a/src/IConnection.h b/src/IConnection.h index 754486a..ed32b72 100644 --- a/src/IConnection.h +++ b/src/IConnection.h @@ -91,7 +91,7 @@ namespace sdbus::internal { virtual void enterEventLoopAsync() = 0; virtual void leaveEventLoop() = 0; - + virtual void notifyEventLoopNewTimeout() const = 0; virtual MethodReply tryCallMethodSynchronously(const MethodCall& message, uint64_t timeout) = 0; }; diff --git a/src/Message.cpp b/src/Message.cpp index d59d0af..a8c23ca 100644 --- a/src/Message.cpp +++ b/src/Message.cpp @@ -737,6 +737,15 @@ std::string Message::getSELinuxContext() const return cLabel; } + +MethodCall::MethodCall(void *msg, internal::ISdBus *sdbus, const internal::IConnection *connection, + adopt_message_t) noexcept + : Message(msg, sdbus, adopt_message) + , connection_(connection) +{ + assert(connection_ != nullptr); +} + void MethodCall::dontExpectReply() { auto r = sd_bus_message_set_expect_reply((sd_bus_message*)msg_, 0); @@ -771,6 +780,10 @@ MethodReply MethodCall::sendWithReply(uint64_t timeout) const SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method", -r); + if (connection_) { + connection_->notifyEventLoopNewTimeout(); + } + return Factory::create(sdbusReply, sdbus_, adopt_message); } @@ -786,6 +799,9 @@ void MethodCall::send(void* callback, void* userData, uint64_t timeout, dont_req { auto r = sdbus_->sd_bus_call_async(nullptr, nullptr, (sd_bus_message*)msg_, (sd_bus_message_handler_t)callback, userData, timeout); SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method", -r); + if (connection_) { + connection_->notifyEventLoopNewTimeout(); + } } MethodCall::Slot MethodCall::send(void* callback, void* userData, uint64_t timeout) const @@ -794,6 +810,9 @@ MethodCall::Slot MethodCall::send(void* callback, void* userData, uint64_t timeo auto r = sdbus_->sd_bus_call_async(nullptr, &slot, (sd_bus_message*)msg_, (sd_bus_message_handler_t)callback, userData, timeout); SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method asynchronously", -r); + if (connection_) { + connection_->notifyEventLoopNewTimeout(); + } return Slot{slot, [sdbus_ = sdbus_](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }}; } diff --git a/src/MessageUtils.h b/src/MessageUtils.h index f8cae77..3515c5d 100644 --- a/src/MessageUtils.h +++ b/src/MessageUtils.h @@ -57,6 +57,12 @@ namespace sdbus { return _Msg{msg, sdbus, adopt_message}; } + + template + static _Msg create(void *msg, internal::ISdBus* sdbus, const internal::IConnection* connection, adopt_message_t) + { + return _Msg{msg, sdbus, connection, adopt_message}; + } }; PlainMessage createPlainMessage(); diff --git a/tests/integrationtests/DBusAsyncMethodsTests.cpp b/tests/integrationtests/DBusAsyncMethodsTests.cpp index 5bb5811..44b2d4f 100644 --- a/tests/integrationtests/DBusAsyncMethodsTests.cpp +++ b/tests/integrationtests/DBusAsyncMethodsTests.cpp @@ -42,6 +42,7 @@ using ::testing::Eq; using ::testing::DoubleEq; using ::testing::Gt; +using ::testing::Le; using ::testing::AnyOf; using ::testing::ElementsAre; using ::testing::SizeIs; @@ -56,6 +57,7 @@ using SdbusTestObject = TestFixture; TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenClientSideAsyncMethodTimesOut) { + std::chrono::time_point start; try { std::promise promise; @@ -68,7 +70,8 @@ TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenClientSideAsyncMethodTimesOut) promise.set_exception(std::make_exception_ptr(*err)); }); - m_proxy->doOperationClientSideAsyncWith500msTimeout(1000); // The operation will take 1s, but the timeout is 500ms, so we should time out + start = std::chrono::steady_clock::now(); + m_proxy->doOperationClientSideAsyncWithTimeout(1us, 1000); // The operation will take 1s, but the timeout is 500ms, so we should time out future.get(); FAIL() << "Expected sdbus::Error exception"; @@ -77,6 +80,8 @@ TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenClientSideAsyncMethodTimesOut) { ASSERT_THAT(e.getName(), AnyOf("org.freedesktop.DBus.Error.Timeout", "org.freedesktop.DBus.Error.NoReply")); ASSERT_THAT(e.getMessage(), AnyOf("Connection timed out", "Method call timed out")); + auto measuredTimeout = std::chrono::steady_clock::now() - start; + ASSERT_THAT(measuredTimeout, Le(50ms)); } catch(...) { diff --git a/tests/integrationtests/DBusConnectionTests.cpp b/tests/integrationtests/DBusConnectionTests.cpp index 7258feb..90edc1c 100644 --- a/tests/integrationtests/DBusConnectionTests.cpp +++ b/tests/integrationtests/DBusConnectionTests.cpp @@ -37,9 +37,11 @@ // STL #include +#include using ::testing::Eq; using namespace sdbus::test; +using namespace std::chrono_literals; /*-------------------------------------*/ /* -- TEST CASES -- */ @@ -88,3 +90,44 @@ TEST(Connection, CanEnterAndLeaveEventLoop) t.join(); } + +TEST(Connection, PollDataGetZeroTimeout) +{ + sdbus::IConnection::PollData pd{}; + pd.timeout_usec = 0; + ASSERT_TRUE(pd.getRelativeTimeout().has_value()); + EXPECT_THAT(pd.getRelativeTimeout().value(), Eq(std::chrono::microseconds::zero())); + EXPECT_THAT(pd.getPollTimeout(), Eq(0)); +} + +TEST(Connection, PollDataGetInfiniteTimeout) +{ + sdbus::IConnection::PollData pd{}; + pd.timeout_usec = UINT64_MAX; + ASSERT_FALSE(pd.getRelativeTimeout().has_value()); + EXPECT_THAT(pd.getPollTimeout(), Eq(-1)); +} + +TEST(Connection, PollDataGetZeroRelativeTimeoutForPast) +{ + sdbus::IConnection::PollData pd{}; + auto past = std::chrono::steady_clock::now() - 10s; + pd.timeout_usec = std::chrono::duration_cast(past.time_since_epoch()).count(); + ASSERT_TRUE(pd.getRelativeTimeout().has_value()); + EXPECT_THAT(pd.getRelativeTimeout().value(), Eq(0us)); + EXPECT_THAT(pd.getPollTimeout(), Eq(0)); +} + +TEST(Connection, PollDataGetRelativeTimeoutInTolerance) +{ + sdbus::IConnection::PollData pd{}; + constexpr auto TIMEOUT = 1s; + constexpr auto TOLERANCE = 100ms; + auto future = std::chrono::steady_clock::now() + TIMEOUT; + pd.timeout_usec = std::chrono::duration_cast(future.time_since_epoch()).count(); + ASSERT_TRUE(pd.getRelativeTimeout().has_value()); + EXPECT_GE(pd.getRelativeTimeout().value(), TIMEOUT - TOLERANCE); + EXPECT_LE(pd.getRelativeTimeout().value(), TIMEOUT + TOLERANCE); + EXPECT_GE(pd.getPollTimeout(), 900); + EXPECT_LE(pd.getPollTimeout(), 1100); +} \ No newline at end of file diff --git a/tests/integrationtests/DBusMethodsTests.cpp b/tests/integrationtests/DBusMethodsTests.cpp index 8afaf88..1fcd365 100644 --- a/tests/integrationtests/DBusMethodsTests.cpp +++ b/tests/integrationtests/DBusMethodsTests.cpp @@ -42,6 +42,7 @@ using ::testing::Eq; using ::testing::DoubleEq; using ::testing::Gt; +using ::testing::Le; using ::testing::AnyOf; using ::testing::ElementsAre; using ::testing::SizeIs; @@ -162,21 +163,24 @@ TEST_F(SdbusTestObject, CallsMultiplyMethodWithNoReplyFlag) TEST_F(SdbusTestObject, CallsMethodWithCustomTimeoutSuccessfully) { - auto res = m_proxy->doOperationWith500msTimeout(20); // The operation will take 20ms, but the timeout is 500ms, so we are fine + auto res = m_proxy->doOperationWithTimeout(500ms, 20); // The operation will take 20ms, but the timeout is 500ms, so we are fine ASSERT_THAT(res, Eq(20)); } TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenMethodTimesOut) { + auto start = std::chrono::steady_clock::now(); try { - m_proxy->doOperationWith500msTimeout(1000); // The operation will take 1s, but the timeout is 500ms, so we should time out + m_proxy->doOperationWithTimeout(1us, 1000); // The operation will take 1s, but the timeout is 1us, so we should time out FAIL() << "Expected sdbus::Error exception"; } catch (const sdbus::Error& e) { ASSERT_THAT(e.getName(), AnyOf("org.freedesktop.DBus.Error.Timeout", "org.freedesktop.DBus.Error.NoReply")); ASSERT_THAT(e.getMessage(), AnyOf("Connection timed out", "Method call timed out")); + auto measuredTimeout = std::chrono::steady_clock::now() - start; + ASSERT_THAT(measuredTimeout, Le(50ms)); } catch(...) { diff --git a/tests/integrationtests/Defs.h b/tests/integrationtests/Defs.h index 5f3f675..2f129a4 100644 --- a/tests/integrationtests/Defs.h +++ b/tests/integrationtests/Defs.h @@ -28,6 +28,8 @@ #define SDBUS_CPP_INTEGRATIONTESTS_DEFS_H_ #include "sdbus-c++/Types.h" +#include +#include namespace sdbus { namespace test { @@ -54,6 +56,20 @@ const bool DEFAULT_BLOCKING_VALUE{true}; constexpr const double DOUBLE_VALUE{3.24L}; +/** Duration stream operator for human readable gtest value output. + * + * Note that the conversion to double is lossy if the input type has 64 or more bits. + * This is ok for our integration tests because they don't have very + * accurate timing requirements. + * + * @return human readable duration in seconds + */ +template< class Rep, class Period > +static std::ostream& operator<<(std::ostream& os, const std::chrono::duration& d) +{ + auto seconds = std::chrono::duration_cast>(d); + return os << seconds.count() << " s"; +} }} #endif /* SDBUS_CPP_INTEGRATIONTESTS_DEFS_H_ */ diff --git a/tests/integrationtests/TestProxy.cpp b/tests/integrationtests/TestProxy.cpp index c5f9571..37e1130 100644 --- a/tests/integrationtests/TestProxy.cpp +++ b/tests/integrationtests/TestProxy.cpp @@ -97,11 +97,11 @@ void TestProxy::installDoOperationClientSideAsyncReplyHandler(std::function handler); - uint32_t doOperationWith500msTimeout(uint32_t param); + uint32_t doOperationWithTimeout(const std::chrono::microseconds &timeout, uint32_t param); sdbus::PendingAsyncCall doOperationClientSideAsync(uint32_t param); void doErroneousOperationClientSideAsync(); - void doOperationClientSideAsyncWith500msTimeout(uint32_t param); + void doOperationClientSideAsyncWithTimeout(const std::chrono::microseconds &timeout, uint32_t param); int32_t callNonexistentMethod(); int32_t callMethodOnNonexistentInterface(); void setStateProperty(const std::string& value);