diff --git a/test/stresstests/sdbus-c++-stress-tests.cpp b/test/stresstests/sdbus-c++-stress-tests.cpp index 933e288..0e5c2f0 100644 --- a/test/stresstests/sdbus-c++-stress-tests.cpp +++ b/test/stresstests/sdbus-c++-stress-tests.cpp @@ -82,6 +82,59 @@ public: , org::sdbuscpp::stresstests::fahrenheit::thermometer::factory_adaptor >(connection, std::move(objectPath)) , celsiusProxy_(connection, SERVICE_2_BUS_NAME, CELSIUS_THERMOMETER_OBJECT_PATH) { + unsigned int workers = std::thread::hardware_concurrency(); + if (workers < 4) + workers = 4; + + for (unsigned int i = 0; i < workers; ++i) + workers_.emplace_back([this]() + { + //std::cout << "Created FTA worker thread 0x" << std::hex << std::this_thread::get_id() << std::dec << std::endl; + + while(!exit_) + { + // Pop a work item from the queue + std::unique_lock lock(mutex_); + cond_.wait(lock, [this]{return !requests_.empty() || exit_;}); + if (exit_) + break; + auto request = std::move(requests_.front()); + requests_.pop(); + lock.unlock(); + + // Either create or destroy a delegate object + if (request.delegateObjectPath.empty()) + { + // Create new delegate object + auto& connection = getObject().getConnection(); + sdbus::ObjectPath newObjectPath = FAHRENHEIT_THERMOMETER_OBJECT_PATH + "/" + std::to_string(request.objectNr); + + // Here we are testing dynamic creation of a D-Bus object in an async way + auto adaptor = std::make_unique(connection, newObjectPath); + + std::unique_lock lock{childrenMutex_}; + children_.emplace(newObjectPath, std::move(adaptor)); + lock.unlock(); + + request.result.returnResults(newObjectPath); + } + else + { + // Destroy existing delegate object + // Here we are testing dynamic removal of a D-Bus object in an async way + std::lock_guard lock{childrenMutex_}; + children_.erase(request.delegateObjectPath); + } + } + }); + } + + ~FahrenheitThermometerAdaptor() + { + exit_ = true; + cond_.notify_all(); + for (auto& worker : workers_) + worker.join(); } protected: @@ -96,35 +149,36 @@ protected: static size_t objectCounter{}; objectCounter++; - std::thread([this, result = std::move(result), objectCounter = objectCounter]() - { - auto& connection = getObject().getConnection(); - sdbus::ObjectPath newObjectPath = FAHRENHEIT_THERMOMETER_OBJECT_PATH + "/" + std::to_string(objectCounter); - - // Here we are testing dynamic creation of a D-Bus object in an async way - auto adaptor = std::make_unique(connection, newObjectPath); - - std::lock_guard lock{mutex_}; - children_.emplace(newObjectPath, std::move(adaptor)); - - result.returnResults(newObjectPath); - }).detach(); + std::unique_lock lock(mutex_); + requests_.push(WorkItem{objectCounter, std::string{}, std::move(result)}); + lock.unlock(); + cond_.notify_one(); } virtual void destroyDelegateObject(sdbus::Result<>&& /*result*/, sdbus::ObjectPath delegate) override { - std::thread([this, delegate = std::move(delegate)]() - { - // Here we are testing dynamic removal of a D-Bus object in an async way - std::lock_guard lock{mutex_}; - children_.erase(delegate); - }).detach(); + std::unique_lock lock(mutex_); + requests_.push(WorkItem{0, std::move(delegate), {}}); + lock.unlock(); + cond_.notify_one(); } private: CelsiusThermometerProxy celsiusProxy_; std::map> children_; + std::mutex childrenMutex_; + + struct WorkItem + { + size_t objectNr; + sdbus::ObjectPath delegateObjectPath; + sdbus::Result result; + }; std::mutex mutex_; + std::condition_variable cond_; + std::queue requests_; + std::vector workers_; + std::atomic exit_{}; }; class FahrenheitThermometerProxy : public sdbus::ProxyInterfaces< org::sdbuscpp::stresstests::fahrenheit::thermometer_proxy @@ -148,7 +202,7 @@ public: for (unsigned int i = 0; i < workers; ++i) workers_.emplace_back([this]() { - //std::cout << "Created worker thread 0x" << std::hex << std::this_thread::get_id() << std::dec << std::endl; + //std::cout << "Created CA worker thread 0x" << std::hex << std::this_thread::get_id() << std::dec << std::endl; while(!exit_) {