From d3d698f02adb2acc5699f301a0d110b52655b8b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanislav=20Angelovi=C4=8D?= Date: Tue, 24 Jul 2018 12:54:31 +0200 Subject: [PATCH] Fix CPU hog on async methods: Clear the event descriptor by reading from it (#16) --- configure.ac | 4 +-- src/Connection.cpp | 12 +++++-- test/Makefile.am | 7 ++--- .../integrationtests/AdaptorAndProxy_test.cpp | 31 +++++++++++++++++++ test/integrationtests/TestingAdaptor.h | 16 +++++++--- 5 files changed, 58 insertions(+), 12 deletions(-) diff --git a/configure.ac b/configure.ac index 3641921..a43fd60 100644 --- a/configure.ac +++ b/configure.ac @@ -25,8 +25,8 @@ AC_PROG_INSTALL # enable pkg-config PKG_PROG_PKG_CONFIG -PKG_CHECK_MODULES(SYSTEMD, [libsystemd >= 0.10.1],, - AC_MSG_ERROR([You need the libsystemd library (version 0.10.1 or better)] +PKG_CHECK_MODULES(SYSTEMD, [libsystemd >= 236],, + AC_MSG_ERROR([You need the libsystemd library (version 236 or newer)] [https://www.freedesktop.org/wiki/Software/systemd/]) ) diff --git a/src/Connection.cpp b/src/Connection.cpp index 1f81d79..e5815d7 100755 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -222,7 +222,7 @@ void Connection::finishHandshake(sd_bus* bus) int Connection::createLoopNotificationDescriptor() { - auto r = eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC); + auto r = eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC | EFD_NONBLOCK); SDBUS_THROW_ERROR_IF(r < 0, "Failed to create event object", -errno); @@ -237,8 +237,11 @@ void Connection::closeLoopNotificationDescriptor(int fd) void Connection::notifyProcessingLoop() { assert(notificationFd_ >= 0); + uint64_t value = 1; - write(notificationFd_, &value, sizeof(value)); + auto r = write(notificationFd_, &value, sizeof(value)); + + SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify processing loop", -errno); } void Connection::notifyProcessingLoopToExit() @@ -312,6 +315,11 @@ Connection::WaitResult Connection::waitForNextRequest() return {false, false}; // Got exit notification // Otherwise we have some async messages to process + + uint64_t value{}; + auto r = read(notificationFd_, &value, sizeof(value)); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to read from the event descriptor", -errno); + return {false, true}; } diff --git a/test/Makefile.am b/test/Makefile.am index a1fbe03..455dfe5 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -29,7 +29,7 @@ unittests/TypeTraits_test.cpp \ unittests/Types_test.cpp \ unittests/Message_test.cpp -libsdbus_c___unittests_LDFLAGS = -L$(top_builddir)/src +libsdbus_c___unittests_LDFLAGS = -L$(top_builddir)/src -pthread libsdbus_c___unittests_LDADD = \ -lsdbus-c++ \ @@ -45,14 +45,13 @@ integrationtests/libsdbus-c++_integrationtests.cpp \ integrationtests/Connection_test.cpp \ integrationtests/AdaptorAndProxy_test.cpp -libsdbus_c___integrationtests_LDFLAGS = -L$(top_builddir)/src +libsdbus_c___integrationtests_LDFLAGS = -L$(top_builddir)/src -pthread libsdbus_c___integrationtests_LDADD = \ -lsdbus-c++ \ @libsdbus_cpp_LIBS@ \ @SYSTEMD_LIBS@ \ - -lgmock \ - -lpthread + -lgmock TESTS += libsdbus-c++_integrationtests diff --git a/test/integrationtests/AdaptorAndProxy_test.cpp b/test/integrationtests/AdaptorAndProxy_test.cpp index ee4e021..0ccd5ce 100644 --- a/test/integrationtests/AdaptorAndProxy_test.cpp +++ b/test/integrationtests/AdaptorAndProxy_test.cpp @@ -225,6 +225,37 @@ TEST_F(SdbusTestObject, DoesServerSideAsynchoronousMethodInParallel) ASSERT_THAT(results, ElementsAre(500, 1000, 1500)); } +TEST_F(SdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncMethods) +{ + std::mutex mtx; + std::atomic resultCount{}; + std::atomic invoke{}; + std::atomic startedCount{}; + auto call = [&]() + { + TestingProxy proxy{INTERFACE_NAME, OBJECT_PATH}; + ++startedCount; + while (!invoke) ; + + size_t localResultCount{}; + for (size_t i = 0; i < 500; ++i) + { + auto result = proxy.doOperationAsync(i % 2); + if (result == (i % 2)) // Correct return value? + localResultCount++; + } + + resultCount += localResultCount; + }; + + std::thread invocations[]{std::thread{call}, std::thread{call}, std::thread{call}}; + while (startedCount != 3) ; + invoke = true; + std::for_each(std::begin(invocations), std::end(invocations), [](auto& t){ t.join(); }); + + ASSERT_THAT(resultCount, Eq(1500)); +} + TEST_F(SdbusTestObject, FailsCallingNonexistentMethod) { ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error); diff --git a/test/integrationtests/TestingAdaptor.h b/test/integrationtests/TestingAdaptor.h index f5136fe..163ca14 100644 --- a/test/integrationtests/TestingAdaptor.h +++ b/test/integrationtests/TestingAdaptor.h @@ -107,12 +107,20 @@ protected: void doOperationAsync(uint32_t param, sdbus::Result result) { - // The same as doOperationSync, just written as an asynchronous method callback - std::thread([param, result = std::move(result)]() + if (param == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(param)); + // Don't sleep and return the result from this thread result.returnResults(param); - }).detach(); + } + else + { + // Process asynchronously in another thread and return the result from there + std::thread([param, result = std::move(result)]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(param)); + result.returnResults(param); + }).detach(); + } } sdbus::Signature getSignature() const { return SIGNATURE_VALUE; }