LocatorMatcher: Get rid of ResultCollectorTask

Simplify the implementation by creating and running the
ResultsDeduplicator directly on task tree start.

Change-Id: I08a0d2b924f92382c31771d4bbd878cc5506501e
Reviewed-by: Eike Ziller <eike.ziller@qt.io>
This commit is contained in:
Jarek Kobus
2024-05-22 11:09:42 +02:00
parent 3e7f6237b0
commit 649d1cd6dc

View File

@@ -237,98 +237,6 @@ private:
QList<std::optional<LocatorFilterEntries>> m_outputData;
};
// This instance of this object is created by LocatorMatcher tree.
// It starts a separate thread which collects and deduplicates the results reported
// by LocatorStorage instances. The ResultsCollector is started as a first task in
// LocatorMatcher and runs in parallel to all the filters started by LocatorMatcher.
// When all the results are reported (the expected number of reports is set with setFilterCount()),
// the ResultsCollector finishes. The intermediate results are reported with
// serialOutputDataReady() signal.
// The object of ResultsCollector is registered in Tasking namespace under the
// ResultsCollectorTask name.
class ResultsCollector : public QObject
{
Q_OBJECT
public:
~ResultsCollector();
void setFilterCount(int count);
void start();
bool isRunning() const { return m_watcher.get(); }
std::shared_ptr<ResultsDeduplicator> deduplicator() const { return m_deduplicator; }
signals:
void serialOutputDataReady(const LocatorFilterEntries &serialOutputData);
void done();
private:
int m_filterCount = 0;
std::unique_ptr<QFutureWatcher<LocatorFilterEntries>> m_watcher;
std::shared_ptr<ResultsDeduplicator> m_deduplicator;
};
ResultsCollector::~ResultsCollector()
{
if (!isRunning())
return;
m_deduplicator->cancel();
if (Utils::futureSynchronizer()) {
Utils::futureSynchronizer()->addFuture(m_watcher->future());
return;
}
m_watcher->future().waitForFinished();
}
void ResultsCollector::setFilterCount(int count)
{
QTC_ASSERT(!isRunning(), return);
QTC_ASSERT(count >= 0, return);
m_filterCount = count;
}
void ResultsCollector::start()
{
QTC_ASSERT(!m_watcher, return);
QTC_ASSERT(!isRunning(), return);
if (m_filterCount == 0) {
emit done();
return;
}
m_deduplicator.reset(new ResultsDeduplicator(m_filterCount));
m_watcher.reset(new QFutureWatcher<LocatorFilterEntries>);
connect(m_watcher.get(), &QFutureWatcherBase::resultReadyAt, this, [this](int index) {
emit serialOutputDataReady(m_watcher->resultAt(index));
});
connect(m_watcher.get(), &QFutureWatcherBase::finished, this, [this] {
emit done();
m_watcher.release()->deleteLater();
m_deduplicator.reset();
});
// TODO: When filterCount == 1, deliver results directly and finish?
auto deduplicate = [](QPromise<LocatorFilterEntries> &promise,
const std::shared_ptr<ResultsDeduplicator> &deduplicator) {
deduplicator->run(promise);
};
m_watcher->setFuture(Utils::asyncRun(deduplicate, m_deduplicator));
}
class ResultsCollectorTaskAdapter : public TaskAdapter<ResultsCollector>
{
public:
ResultsCollectorTaskAdapter() {
connect(task(), &ResultsCollector::done, this, [this] { emit done(DoneResult::Success); });
}
void start() final { task()->start(); }
};
using ResultsCollectorTask = CustomTask<ResultsCollectorTaskAdapter>;
class LocatorStoragePrivate
{
public:
@@ -425,23 +333,35 @@ void LocatorMatcher::start()
QTC_ASSERT(!isRunning(), return);
d->m_output = {};
const Storage<ResultsCollector *> collectorStorage;
struct ResultsCollector
{
~ResultsCollector() {
if (m_deduplicator)
m_deduplicator->cancel();
}
std::shared_ptr<ResultsDeduplicator> m_deduplicator;
};
const Storage<ResultsCollector> collectorStorage;
const LoopList iterator(d->m_tasks);
const auto onCollectorSetup = [this, filterCount = d->m_tasks.size(), collectorStorage](
ResultsCollector &collector) {
*collectorStorage = &collector;
collector.setFilterCount(filterCount);
connect(&collector, &ResultsCollector::serialOutputDataReady,
this, [this](const LocatorFilterEntries &serialOutputData) {
Async<LocatorFilterEntries> &async) {
const std::shared_ptr<ResultsDeduplicator> deduplicator(new ResultsDeduplicator(filterCount));
collectorStorage->m_deduplicator = deduplicator;
Async<LocatorFilterEntries> *asyncPtr = &async;
connect(asyncPtr, &AsyncBase::resultReadyAt, this, [this, asyncPtr](int index) {
const LocatorFilterEntries serialOutputData = asyncPtr->resultAt(index);
d->m_output += serialOutputData;
emit serialOutputDataReady(serialOutputData);
});
// TODO: When filterCount == 1, deliver results directly and finish?
async.setConcurrentCallData(&ResultsDeduplicator::run, deduplicator);
};
const auto onCollectorDone = [collectorStorage] { *collectorStorage = nullptr; };
const auto onCollectorDone = [collectorStorage] { collectorStorage->m_deduplicator->cancel(); };
const auto onTaskTreeSetup = [iterator, input = d->m_input, collectorStorage](TaskTree &taskTree) {
const std::shared_ptr<ResultsDeduplicator> deduplicator = (*collectorStorage)->deduplicator();
const std::shared_ptr<ResultsDeduplicator> deduplicator = collectorStorage->m_deduplicator;
const Storage<LocatorStorage> storage = iterator->storage;
const auto onSetup = [storage, input, index = iterator.iteration(), deduplicator] {
*storage = std::make_shared<LocatorStoragePrivate>(input, index, deduplicator);
@@ -458,7 +378,7 @@ void LocatorMatcher::start()
const Group root {
parallel,
collectorStorage,
ResultsCollectorTask(onCollectorSetup, onCollectorDone),
AsyncTask<LocatorFilterEntries>(onCollectorSetup, onCollectorDone),
Group {
parallelLimit(d->m_parallelLimit),
iterator,
@@ -1472,5 +1392,3 @@ LocatorMatcherTask LocatorFileCache::matcher() const
}
} // Core
#include "ilocatorfilter.moc"