Fix race condition between worker threads and adaptor destructor in stress tests

This commit is contained in:
sangelovic
2019-04-08 21:16:32 +02:00
parent c9ef1849cd
commit f5da0dabcb

View File

@ -82,6 +82,59 @@ public:
, org::sdbuscpp::stresstests::fahrenheit::thermometer::factory_adaptor >(connection, std::move(objectPath))
, celsiusProxy_(connection, SERVICE_2_BUS_NAME, CELSIUS_THERMOMETER_OBJECT_PATH)
{
unsigned int workers = std::thread::hardware_concurrency();
if (workers < 4)
workers = 4;
for (unsigned int i = 0; i < workers; ++i)
workers_.emplace_back([this]()
{
//std::cout << "Created FTA worker thread 0x" << std::hex << std::this_thread::get_id() << std::dec << std::endl;
while(!exit_)
{
// Pop a work item from the queue
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]{return !requests_.empty() || exit_;});
if (exit_)
break;
auto request = std::move(requests_.front());
requests_.pop();
lock.unlock();
// Either create or destroy a delegate object
if (request.delegateObjectPath.empty())
{
// Create new delegate object
auto& connection = getObject().getConnection();
sdbus::ObjectPath newObjectPath = FAHRENHEIT_THERMOMETER_OBJECT_PATH + "/" + std::to_string(request.objectNr);
// Here we are testing dynamic creation of a D-Bus object in an async way
auto adaptor = std::make_unique<FahrenheitThermometerAdaptor>(connection, newObjectPath);
std::unique_lock<std::mutex> lock{childrenMutex_};
children_.emplace(newObjectPath, std::move(adaptor));
lock.unlock();
request.result.returnResults(newObjectPath);
}
else
{
// Destroy existing delegate object
// Here we are testing dynamic removal of a D-Bus object in an async way
std::lock_guard<std::mutex> lock{childrenMutex_};
children_.erase(request.delegateObjectPath);
}
}
});
}
~FahrenheitThermometerAdaptor()
{
exit_ = true;
cond_.notify_all();
for (auto& worker : workers_)
worker.join();
}
protected:
@ -96,35 +149,36 @@ protected:
static size_t objectCounter{};
objectCounter++;
std::thread([this, result = std::move(result), objectCounter = objectCounter]()
{
auto& connection = getObject().getConnection();
sdbus::ObjectPath newObjectPath = FAHRENHEIT_THERMOMETER_OBJECT_PATH + "/" + std::to_string(objectCounter);
// Here we are testing dynamic creation of a D-Bus object in an async way
auto adaptor = std::make_unique<FahrenheitThermometerAdaptor>(connection, newObjectPath);
std::lock_guard<std::mutex> lock{mutex_};
children_.emplace(newObjectPath, std::move(adaptor));
result.returnResults(newObjectPath);
}).detach();
std::unique_lock<std::mutex> lock(mutex_);
requests_.push(WorkItem{objectCounter, std::string{}, std::move(result)});
lock.unlock();
cond_.notify_one();
}
virtual void destroyDelegateObject(sdbus::Result<>&& /*result*/, sdbus::ObjectPath delegate) override
{
std::thread([this, delegate = std::move(delegate)]()
{
// Here we are testing dynamic removal of a D-Bus object in an async way
std::lock_guard<std::mutex> lock{mutex_};
children_.erase(delegate);
}).detach();
std::unique_lock<std::mutex> lock(mutex_);
requests_.push(WorkItem{0, std::move(delegate), {}});
lock.unlock();
cond_.notify_one();
}
private:
CelsiusThermometerProxy celsiusProxy_;
std::map<std::string, std::unique_ptr<FahrenheitThermometerAdaptor>> children_;
std::mutex childrenMutex_;
struct WorkItem
{
size_t objectNr;
sdbus::ObjectPath delegateObjectPath;
sdbus::Result<sdbus::ObjectPath> result;
};
std::mutex mutex_;
std::condition_variable cond_;
std::queue<WorkItem> requests_;
std::vector<std::thread> workers_;
std::atomic<bool> exit_{};
};
class FahrenheitThermometerProxy : public sdbus::ProxyInterfaces< org::sdbuscpp::stresstests::fahrenheit::thermometer_proxy
@ -148,7 +202,7 @@ public:
for (unsigned int i = 0; i < workers; ++i)
workers_.emplace_back([this]()
{
//std::cout << "Created worker thread 0x" << std::hex << std::this_thread::get_id() << std::dec << std::endl;
//std::cout << "Created CA worker thread 0x" << std::hex << std::this_thread::get_id() << std::dec << std::endl;
while(!exit_)
{