Fix issue of event loop thread and synchronous method call thread polling on the same D-Bus connection

Synchronous D-Bus method calls are now done in terms of blocking asynchronous calls.
This commit is contained in:
sangelovic
2020-01-12 13:10:34 +01:00
parent 5121d46eed
commit f41d9bc395
17 changed files with 281 additions and 82 deletions

View File

@@ -48,6 +48,7 @@
using ::testing::Eq;
using ::testing::DoubleEq;
using ::testing::Gt;
using ::testing::AnyOf;
using ::testing::ElementsAre;
using ::testing::SizeIs;
using namespace std::chrono_literals;
@@ -106,6 +107,7 @@ public:
};
std::unique_ptr<sdbus::IConnection> AdaptorAndProxyFixture::s_connection = sdbus::createSystemBusConnection();
}
/*-------------------------------------*/
@@ -247,8 +249,38 @@ TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenMethodTimesOut)
}
catch (const sdbus::Error& e)
{
ASSERT_THAT(e.getName(), Eq("org.freedesktop.DBus.Error.Timeout"));
ASSERT_THAT(e.getMessage(), Eq("Connection timed out"));
ASSERT_THAT(e.getName(), AnyOf("org.freedesktop.DBus.Error.Timeout", "org.freedesktop.DBus.Error.NoReply"));
ASSERT_THAT(e.getMessage(), AnyOf("Connection timed out", "Method call timed out"));
}
catch(...)
{
FAIL() << "Expected sdbus::Error exception";
}
}
TEST_F(SdbusTestObject, ThrowsTimeoutErrorWhenClientSideAsyncMethodTimesOut)
{
try
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err)
{
if (err == nullptr)
promise.set_value(res);
else
promise.set_exception(std::make_exception_ptr(*err));
});
m_proxy->doOperationClientSideAsyncWith500msTimeout(1000); // The operation will take 1s, but the timeout is 500ms, so we should time out
future.get(), Eq(100);
FAIL() << "Expected sdbus::Error exception";
}
catch (const sdbus::Error& e)
{
ASSERT_THAT(e.getName(), AnyOf("org.freedesktop.DBus.Error.Timeout", "org.freedesktop.DBus.Error.NoReply"));
ASSERT_THAT(e.getMessage(), AnyOf("Connection timed out", "Method call timed out"));
}
catch(...)
{
@@ -392,6 +424,14 @@ TEST_F(SdbusTestObject, FailsCallingMethodOnNonexistentObject)
ASSERT_THROW(proxy.getInt(), sdbus::Error);
}
TEST_F(SdbusTestObject, ReceivesTwoSignalsWhileMakingMethodCall)
{
m_proxy->emitTwoSimpleSignals();
ASSERT_TRUE(waitUntil(m_proxy->m_gotSimpleSignal));
ASSERT_TRUE(waitUntil(m_proxy->m_gotSignalWithMap));
}
#if LIBSYSTEMD_VERSION>=240
TEST_F(SdbusTestObject, CanSetGeneralMethodTimeoutWithLibsystemdVersionGreaterThan239)
{

View File

@@ -197,6 +197,13 @@ protected:
throw sdbus::createError(1, "A test error occurred");
}
void emitTwoSimpleSignals() override
{
emitSimpleSignal();
emitSignalWithMap({});
}
std::string state()
{
return m_state;

View File

@@ -108,6 +108,7 @@ protected:
//private:
public: // for tests
int m_SimpleSignals = 0;
std::atomic<bool> m_gotSimpleSignal{false};
std::atomic<bool> m_gotSignalWithMap{false};
std::map<int32_t, std::string> m_mapFromSignal;

View File

@@ -106,6 +106,8 @@ protected:
object_.registerMethod("doPrivilegedStuff").onInterface(INTERFACE_NAME).implementedAs([](){}).markAsPrivileged();
object_.registerMethod("emitTwoSimpleSignals").onInterface(INTERFACE_NAME).implementedAs([this](){ this->emitTwoSimpleSignals(); });
// registration of signals is optional, it is useful because of introspection
object_.registerSignal("simpleSignal").onInterface(INTERFACE_NAME).markAsDeprecated();
object_.registerSignal("signalWithMap").onInterface(INTERFACE_NAME).withParameters<std::map<int32_t, std::string>>();
@@ -168,6 +170,7 @@ protected:
virtual sdbus::UnixFd getUnixFd() const = 0;
virtual ComplexType getComplex() const = 0;
virtual void throwError() const = 0;
virtual void emitTwoSimpleSignals() = 0;
virtual std::string state() = 0;
virtual uint32_t action() = 0;
@@ -240,6 +243,8 @@ R"delimiter(<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspectio
<method name="doPrivilegedStuff">
<annotation name="org.freedesktop.systemd1.Privileged" value="true"/>
</method>
<method name="emitTwoSimpleSignals">
</method>
<method name="getComplex">
<arg type="a{t(a{ya(obva{is})}gs)}" direction="out"/>
<annotation name="org.freedesktop.DBus.Deprecated" value="true"/>

View File

@@ -56,6 +56,11 @@ protected:
virtual void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) = 0;
public:
void emitTwoSimpleSignals()
{
object_.callMethod("emitTwoSimpleSignals").onInterface(INTERFACE_NAME);
}
void noArgNoReturn()
{
object_.callMethod("noArgNoReturn").onInterface(INTERFACE_NAME);
@@ -172,6 +177,19 @@ public:
});
}
void doOperationClientSideAsyncWith500msTimeout(uint32_t param)
{
using namespace std::chrono_literals;
object_.callMethodAsync("doOperation")
.onInterface(INTERFACE_NAME)
.withTimeout(500000us)
.withArguments(param)
.uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue)
{
this->onDoOperationReply(returnValue, error);
});
}
sdbus::Signature getSignature()
{
sdbus::Signature result;

View File

@@ -37,6 +37,8 @@
using namespace std::chrono_literals;
uint64_t totalDuration = 0;
class PerftestProxy : public sdbus::ProxyInterfaces<org::sdbuscpp::perftests_proxy>
{
public:
@@ -66,7 +68,9 @@ protected:
else if (counter == m_msgCount)
{
auto stopTime = std::chrono::steady_clock::now();
std::cout << "Received " << m_msgCount << " signals in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
totalDuration += duration;
std::cout << "Received " << m_msgCount << " signals in: " << duration << " ms" << std::endl;
counter = 0;
}
}
@@ -114,6 +118,9 @@ int main(int /*argc*/, char */*argv*/[])
std::this_thread::sleep_for(1000ms);
}
std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
totalDuration = 0;
msgSize = 1000;
std::cout << std::endl << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
client.m_msgCount = msgCount; client.m_msgSize = msgSize;
@@ -124,6 +131,9 @@ int main(int /*argc*/, char */*argv*/[])
std::this_thread::sleep_for(1000ms);
}
std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
totalDuration = 0;
msgSize = 20;
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
for (unsigned int r = 0; r < repetitions; ++r)
@@ -140,11 +150,16 @@ int main(int /*argc*/, char */*argv*/[])
assert(result.size() == msgSize);
}
auto stopTime = std::chrono::steady_clock::now();
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
totalDuration += duration;
std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl;
std::this_thread::sleep_for(1000ms);
}
std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
totalDuration = 0;
msgSize = 1000;
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
for (unsigned int r = 0; r < repetitions; ++r)
@@ -161,10 +176,15 @@ int main(int /*argc*/, char */*argv*/[])
assert(result.size() == msgSize);
}
auto stopTime = std::chrono::steady_clock::now();
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
totalDuration += duration;
std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl;
std::this_thread::sleep_for(1000ms);
}
std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
totalDuration = 0;
return 0;
}