Add loops in stress tests to test adaptor/proxy initialization/deinitialization

This commit is contained in:
sangelovic
2019-04-13 21:28:43 +02:00
parent e3a74a3ff2
commit 1d44d8b37f
2 changed files with 144 additions and 105 deletions

View File

@ -54,8 +54,7 @@ protected:
virtual void sendDataSignals(const uint32_t& numberOfSignals, const uint32_t& signalMsgSize) override
{
auto data = createRandomString(signalMsgSize);
char digits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'};
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < numberOfSignals; ++i)
{
@ -65,7 +64,7 @@ protected:
auto stop_time = std::chrono::steady_clock::now();
std::cout << "Server sent " << numberOfSignals << " signals in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stop_time - start_time).count() << " ms" << std::endl;
}
virtual std::string concatenateTwoStrings(const std::string& string1, const std::string& string2) override
{
return string1 + string2;

View File

@ -37,6 +37,7 @@
#include <thread>
#include <chrono>
#include <cassert>
#include <cstdlib>
#include <atomic>
#include <sstream>
#include <mutex>
@ -319,7 +320,7 @@ private:
uint32_t aNumber;
str >> aNumber;
assert(aNumber >= 0);
assert(aNumber > 0);
++repliesReceived_;
}
@ -333,7 +334,7 @@ private:
uint32_t aNumber;
str >> aNumber;
assert(aNumber >= 0);
assert(aNumber > 0);
++signalsReceived_;
}
@ -344,109 +345,31 @@ public:
};
//-----------------------------------------
int main(int /*argc*/, char */*argv*/[])
int main(int argc, char *argv[])
{
auto service2Connection = sdbus::createSystemBusConnection(SERVICE_2_BUS_NAME);
std::atomic<bool> service2ThreadReady{};
std::thread service2Thread([&con = *service2Connection, &service2ThreadReady]()
{
CelsiusThermometerAdaptor thermometer(con, CELSIUS_THERMOMETER_OBJECT_PATH);
service2ThreadReady = true;
con.enterProcessingLoop();
});
long loops;
long loopDuration;
auto service1Connection = sdbus::createSystemBusConnection(SERVICE_1_BUS_NAME);
std::atomic<bool> service1ThreadReady{};
std::thread service1Thread([&con = *service1Connection, &service1ThreadReady]()
if (argc == 1)
{
ConcatenatorAdaptor concatenator(con, CONCATENATOR_OBJECT_PATH);
FahrenheitThermometerAdaptor thermometer(con, FAHRENHEIT_THERMOMETER_OBJECT_PATH, false);
service1ThreadReady = true;
con.enterProcessingLoop();
});
loops = 1;
loopDuration = 30000;
}
else if (argc == 3)
{
loops = std::atol(argv[1]);
loopDuration = std::atol(argv[2]);
}
else
throw std::runtime_error("Wrong program options");
// Wait for both services to export their D-Bus objects
while (!service2ThreadReady || !service1ThreadReady)
std::this_thread::sleep_for(1ms);
std::cout << "Going on with " << loops << " loops and " << loopDuration << "ms loop duration" << std::endl;
std::atomic<uint32_t> concatenationCallsMade{0};
std::atomic<uint32_t> concatenationRepliesReceived{0};
std::atomic<uint32_t> concatenationSignalsReceived{0};
std::atomic<uint32_t> thermometerCallsMade{0};
auto clientConnection = sdbus::createSystemBusConnection();
std::thread clientThread([&, &con = *clientConnection]()
{
std::atomic<bool> stopClients{false};
std::thread concatenatorThread([&]()
{
ConcatenatorProxy concatenator(con, SERVICE_1_BUS_NAME, CONCATENATOR_OBJECT_PATH);
uint32_t localCounter{};
// Issue async concatenate calls densely one after another
while (!stopClients)
{
std::map<std::string, sdbus::Variant> param;
param["key1"] = "sdbus-c++-stress-tests";
param["key2"] = localCounter++;
concatenator.concatenate(param);
if ((localCounter % 10) == 0)
{
// Make sure the system is catching up with our async requests,
// otherwise sleep a bit to slow down flooding the server.
assert(localCounter >= concatenator.repliesReceived_);
while ((localCounter - concatenator.repliesReceived_) > 40 && !stopClients)
std::this_thread::sleep_for(1ms);
// Update statistics
concatenationCallsMade = localCounter;
concatenationRepliesReceived = (uint32_t)concatenator.repliesReceived_;
concatenationSignalsReceived = (uint32_t)concatenator.signalsReceived_;
}
}
});
std::thread thermometerThread([&]()
{
// Here we continuously remotely call getCurrentTemperature(). We have one proxy object,
// first we use it's factory interface to create another proxy object, call getCurrentTemperature()
// on that one, and then destroy that proxy object. All that continously in a loop.
// This tests dynamic creation and destruction of remote D-Bus objects and local object proxies.
FahrenheitThermometerProxy thermometer(con, SERVICE_1_BUS_NAME, FAHRENHEIT_THERMOMETER_OBJECT_PATH);
uint32_t localCounter{};
uint32_t previousTemperature{};
while (!stopClients)
{
localCounter++;
auto newObjectPath = thermometer.createDelegateObject();
FahrenheitThermometerProxy proxy{con, SERVICE_1_BUS_NAME, newObjectPath};
auto temperature = proxy.getCurrentTemperature();
assert(temperature >= previousTemperature); // The temperature shall rise continually
previousTemperature = temperature;
//std::this_thread::sleep_for(1ms);
if ((localCounter % 10) == 0)
thermometerCallsMade = localCounter;
thermometer.destroyDelegateObject(newObjectPath);
}
});
con.enterProcessingLoop();
stopClients = true;
concatenatorThread.join();
thermometerThread.join();
});
std::atomic<bool> exitLogger{};
std::thread loggerThread([&]()
{
@ -459,16 +382,133 @@ int main(int /*argc*/, char */*argv*/[])
}
});
getchar();
for (long loop = 0; loop < loops; ++loop)
{
std::cout << "Entering loop " << loop+1 << std::endl;
auto service2Connection = sdbus::createSystemBusConnection(SERVICE_2_BUS_NAME);
std::atomic<bool> service2ThreadReady{};
std::thread service2Thread([&con = *service2Connection, &service2ThreadReady]()
{
CelsiusThermometerAdaptor thermometer(con, CELSIUS_THERMOMETER_OBJECT_PATH);
service2ThreadReady = true;
con.enterProcessingLoop();
});
auto service1Connection = sdbus::createSystemBusConnection(SERVICE_1_BUS_NAME);
std::atomic<bool> service1ThreadReady{};
std::thread service1Thread([&con = *service1Connection, &service1ThreadReady]()
{
ConcatenatorAdaptor concatenator(con, CONCATENATOR_OBJECT_PATH);
FahrenheitThermometerAdaptor thermometer(con, FAHRENHEIT_THERMOMETER_OBJECT_PATH, false);
service1ThreadReady = true;
con.enterProcessingLoop();
});
// Wait for both services to export their D-Bus objects
while (!service2ThreadReady || !service1ThreadReady)
std::this_thread::sleep_for(1ms);
auto clientConnection = sdbus::createSystemBusConnection();
std::mutex clientThreadExitMutex;
std::condition_variable clientThreadExitCond;
bool clientThreadExit{};
std::thread clientThread([&, &con = *clientConnection]()
{
std::atomic<bool> stopClients{false};
std::thread concatenatorThread([&]()
{
ConcatenatorProxy concatenator(con, SERVICE_1_BUS_NAME, CONCATENATOR_OBJECT_PATH);
uint32_t localCounter{};
// Issue async concatenate calls densely one after another
while (!stopClients)
{
std::map<std::string, sdbus::Variant> param;
param["key1"] = "sdbus-c++-stress-tests";
param["key2"] = ++localCounter;
concatenator.concatenate(param);
if ((localCounter % 10) == 0)
{
// Make sure the system is catching up with our async requests,
// otherwise sleep a bit to slow down flooding the server.
assert(localCounter >= concatenator.repliesReceived_);
while ((localCounter - concatenator.repliesReceived_) > 40 && !stopClients)
std::this_thread::sleep_for(1ms);
// Update statistics
concatenationCallsMade = localCounter;
concatenationRepliesReceived = (uint32_t)concatenator.repliesReceived_;
concatenationSignalsReceived = (uint32_t)concatenator.signalsReceived_;
}
}
});
std::thread thermometerThread([&]()
{
// Here we continuously remotely call getCurrentTemperature(). We have one proxy object,
// first we use it's factory interface to create another proxy object, call getCurrentTemperature()
// on that one, and then destroy that proxy object. All that continously in a loop.
// This tests dynamic creation and destruction of remote D-Bus objects and local object proxies.
FahrenheitThermometerProxy thermometer(con, SERVICE_1_BUS_NAME, FAHRENHEIT_THERMOMETER_OBJECT_PATH);
uint32_t localCounter{};
uint32_t previousTemperature{};
while (!stopClients)
{
localCounter++;
auto newObjectPath = thermometer.createDelegateObject();
FahrenheitThermometerProxy proxy{con, SERVICE_1_BUS_NAME, newObjectPath};
auto temperature = proxy.getCurrentTemperature();
assert(temperature >= previousTemperature); // The temperature shall rise continually
previousTemperature = temperature;
//std::this_thread::sleep_for(1ms);
if ((localCounter % 10) == 0)
thermometerCallsMade = localCounter;
thermometer.destroyDelegateObject(newObjectPath);
}
});
// We could run the loop in a sync way, but we want it to run also when proxies are destroyed for better
// coverage of multi-threaded scenarios, so we run it async and use condition variable for exit notification
//con.enterProcessingLoop();
con.enterProcessingLoopAsync();
std::unique_lock<std::mutex> lock(clientThreadExitMutex);
clientThreadExitCond.wait(lock, [&]{return clientThreadExit;});
stopClients = true;
thermometerThread.join();
concatenatorThread.join();
});
std::this_thread::sleep_for(std::chrono::milliseconds(loopDuration));
//clientConnection->leaveProcessingLoop();
std::unique_lock<std::mutex> lock(clientThreadExitMutex);
clientThreadExit = true;
lock.unlock();
clientThreadExitCond.notify_one();
clientThread.join();
service1Connection->leaveProcessingLoop();
service1Thread.join();
service2Connection->leaveProcessingLoop();
service2Thread.join();
}
exitLogger = true;
loggerThread.join();
clientConnection->leaveProcessingLoop();
clientThread.join();
service1Connection->leaveProcessingLoop();
service1Thread.join();
service2Connection->leaveProcessingLoop();
service2Thread.join();
return 0;
}