mirror of
https://github.com/Kistler-Group/sdbus-cpp.git
synced 2025-07-31 02:27:14 +02:00
Fix CPU hog on async methods: Clear the event descriptor by reading from it (#16)
This commit is contained in:
committed by
Lukáš Ďurfina
parent
d8fd053714
commit
d3d698f02a
@ -25,8 +25,8 @@ AC_PROG_INSTALL
|
|||||||
# enable pkg-config
|
# enable pkg-config
|
||||||
PKG_PROG_PKG_CONFIG
|
PKG_PROG_PKG_CONFIG
|
||||||
|
|
||||||
PKG_CHECK_MODULES(SYSTEMD, [libsystemd >= 0.10.1],,
|
PKG_CHECK_MODULES(SYSTEMD, [libsystemd >= 236],,
|
||||||
AC_MSG_ERROR([You need the libsystemd library (version 0.10.1 or better)]
|
AC_MSG_ERROR([You need the libsystemd library (version 236 or newer)]
|
||||||
[https://www.freedesktop.org/wiki/Software/systemd/])
|
[https://www.freedesktop.org/wiki/Software/systemd/])
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -222,7 +222,7 @@ void Connection::finishHandshake(sd_bus* bus)
|
|||||||
|
|
||||||
int Connection::createLoopNotificationDescriptor()
|
int Connection::createLoopNotificationDescriptor()
|
||||||
{
|
{
|
||||||
auto r = eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC);
|
auto r = eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC | EFD_NONBLOCK);
|
||||||
|
|
||||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create event object", -errno);
|
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create event object", -errno);
|
||||||
|
|
||||||
@ -237,8 +237,11 @@ void Connection::closeLoopNotificationDescriptor(int fd)
|
|||||||
void Connection::notifyProcessingLoop()
|
void Connection::notifyProcessingLoop()
|
||||||
{
|
{
|
||||||
assert(notificationFd_ >= 0);
|
assert(notificationFd_ >= 0);
|
||||||
|
|
||||||
uint64_t value = 1;
|
uint64_t value = 1;
|
||||||
write(notificationFd_, &value, sizeof(value));
|
auto r = write(notificationFd_, &value, sizeof(value));
|
||||||
|
|
||||||
|
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify processing loop", -errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::notifyProcessingLoopToExit()
|
void Connection::notifyProcessingLoopToExit()
|
||||||
@ -312,6 +315,11 @@ Connection::WaitResult Connection::waitForNextRequest()
|
|||||||
return {false, false}; // Got exit notification
|
return {false, false}; // Got exit notification
|
||||||
|
|
||||||
// Otherwise we have some async messages to process
|
// Otherwise we have some async messages to process
|
||||||
|
|
||||||
|
uint64_t value{};
|
||||||
|
auto r = read(notificationFd_, &value, sizeof(value));
|
||||||
|
SDBUS_THROW_ERROR_IF(r < 0, "Failed to read from the event descriptor", -errno);
|
||||||
|
|
||||||
return {false, true};
|
return {false, true};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ unittests/TypeTraits_test.cpp \
|
|||||||
unittests/Types_test.cpp \
|
unittests/Types_test.cpp \
|
||||||
unittests/Message_test.cpp
|
unittests/Message_test.cpp
|
||||||
|
|
||||||
libsdbus_c___unittests_LDFLAGS = -L$(top_builddir)/src
|
libsdbus_c___unittests_LDFLAGS = -L$(top_builddir)/src -pthread
|
||||||
|
|
||||||
libsdbus_c___unittests_LDADD = \
|
libsdbus_c___unittests_LDADD = \
|
||||||
-lsdbus-c++ \
|
-lsdbus-c++ \
|
||||||
@ -45,14 +45,13 @@ integrationtests/libsdbus-c++_integrationtests.cpp \
|
|||||||
integrationtests/Connection_test.cpp \
|
integrationtests/Connection_test.cpp \
|
||||||
integrationtests/AdaptorAndProxy_test.cpp
|
integrationtests/AdaptorAndProxy_test.cpp
|
||||||
|
|
||||||
libsdbus_c___integrationtests_LDFLAGS = -L$(top_builddir)/src
|
libsdbus_c___integrationtests_LDFLAGS = -L$(top_builddir)/src -pthread
|
||||||
|
|
||||||
libsdbus_c___integrationtests_LDADD = \
|
libsdbus_c___integrationtests_LDADD = \
|
||||||
-lsdbus-c++ \
|
-lsdbus-c++ \
|
||||||
@libsdbus_cpp_LIBS@ \
|
@libsdbus_cpp_LIBS@ \
|
||||||
@SYSTEMD_LIBS@ \
|
@SYSTEMD_LIBS@ \
|
||||||
-lgmock \
|
-lgmock
|
||||||
-lpthread
|
|
||||||
|
|
||||||
TESTS += libsdbus-c++_integrationtests
|
TESTS += libsdbus-c++_integrationtests
|
||||||
|
|
||||||
|
@ -225,6 +225,37 @@ TEST_F(SdbusTestObject, DoesServerSideAsynchoronousMethodInParallel)
|
|||||||
ASSERT_THAT(results, ElementsAre(500, 1000, 1500));
|
ASSERT_THAT(results, ElementsAre(500, 1000, 1500));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(SdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncMethods)
|
||||||
|
{
|
||||||
|
std::mutex mtx;
|
||||||
|
std::atomic<size_t> resultCount{};
|
||||||
|
std::atomic<bool> invoke{};
|
||||||
|
std::atomic<int> startedCount{};
|
||||||
|
auto call = [&]()
|
||||||
|
{
|
||||||
|
TestingProxy proxy{INTERFACE_NAME, OBJECT_PATH};
|
||||||
|
++startedCount;
|
||||||
|
while (!invoke) ;
|
||||||
|
|
||||||
|
size_t localResultCount{};
|
||||||
|
for (size_t i = 0; i < 500; ++i)
|
||||||
|
{
|
||||||
|
auto result = proxy.doOperationAsync(i % 2);
|
||||||
|
if (result == (i % 2)) // Correct return value?
|
||||||
|
localResultCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
resultCount += localResultCount;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::thread invocations[]{std::thread{call}, std::thread{call}, std::thread{call}};
|
||||||
|
while (startedCount != 3) ;
|
||||||
|
invoke = true;
|
||||||
|
std::for_each(std::begin(invocations), std::end(invocations), [](auto& t){ t.join(); });
|
||||||
|
|
||||||
|
ASSERT_THAT(resultCount, Eq(1500));
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(SdbusTestObject, FailsCallingNonexistentMethod)
|
TEST_F(SdbusTestObject, FailsCallingNonexistentMethod)
|
||||||
{
|
{
|
||||||
ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error);
|
ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error);
|
||||||
|
@ -107,12 +107,20 @@ protected:
|
|||||||
|
|
||||||
void doOperationAsync(uint32_t param, sdbus::Result<uint32_t> result)
|
void doOperationAsync(uint32_t param, sdbus::Result<uint32_t> result)
|
||||||
{
|
{
|
||||||
// The same as doOperationSync, just written as an asynchronous method callback
|
if (param == 0)
|
||||||
std::thread([param, result = std::move(result)]()
|
|
||||||
{
|
{
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(param));
|
// Don't sleep and return the result from this thread
|
||||||
result.returnResults(param);
|
result.returnResults(param);
|
||||||
}).detach();
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Process asynchronously in another thread and return the result from there
|
||||||
|
std::thread([param, result = std::move(result)]()
|
||||||
|
{
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(param));
|
||||||
|
result.returnResults(param);
|
||||||
|
}).detach();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbus::Signature getSignature() const { return SIGNATURE_VALUE; }
|
sdbus::Signature getSignature() const { return SIGNATURE_VALUE; }
|
||||||
|
Reference in New Issue
Block a user