LocatorMatcher: Add more comments, do some renaming

Rename:
OutputDataProvider -> ResultsDeduplicator
OutputFilter -> ResultsCollector

Change-Id: If29ae29a2fa6895ad36d94577824fbb2e6b65a4d
Reviewed-by: Qt CI Bot <qt_ci_bot@qt-project.org>
Reviewed-by: Eike Ziller <eike.ziller@qt.io>
This commit is contained in:
Jarek Kobus
2023-04-17 07:49:45 +02:00
parent 0ab1eaa31c
commit f21e2ff65c

View File

@@ -51,14 +51,39 @@ using namespace Utils;
namespace Core {
class OutputDataProvider
// ResultsDeduplicator squashes upcoming results from various filters and removes
// duplicated entries. It also reports intermediate results (to be displayed in LocatorWidget).
//
// Assuming the results from filters come in this order (numbers are indices to filter results):
// 2, 4, 3, 1 - the various strategies are possible. The current strategy looks like this:
// - When 2nd result came - the result is stored
// - When 4th result came - the result is stored
// - When 3rd result came - it's being squased to the 2nd result and afterwards the 4th result
// result is being squashed into the common result list.
// - When 1st result came - the stored common list is squashed into the 1st result
// and intermediate results are reported (for 1-4 results).
// If the filterCount is 4, the deduplicator finishes now.
// If the filterCount is greater than 4, it waits for the remaining
// results.
//
// TODO: The other possible startegy would be to just store the newly reported data
// and do the actual deduplication only when new results are reachable from 1st index
// (i.e. skip the intermediate deduplication).
class ResultsDeduplicator
{
enum class State {
Awaiting,
NewData,
Canceled
Awaiting, // Waiting in a separate thread for new data, or fetched the last new data and
// is currently deduplicating.
// This happens when all previous data were squashed in the separate thread but
// still some data needs to come (reportOutput wasn't called for all filters,
// yet). The expected number of calls to reportOutput equals m_filterCount.
NewData, // The new data came and the separate thread is being awaken in order to squash
// it. After the separate thread is awaken it transitions to Awaiting state.
Canceled // The Deduplicator task has been canceled.
};
// A separate item for keeping squashed entries. Call mergeWith() to squash a consecutive
// results into this results.
struct WorkingData {
WorkingData() = default;
WorkingData(const LocatorFilterEntries &entries, std::atomic<State> &state) {
@@ -88,12 +113,17 @@ class OutputDataProvider
};
public:
OutputDataProvider(int filterCount)
// filterCount is the expected numbers of running filters. The separate thread executing run()
// will stop after reportOutput was called filterCount times, for all different indices
// in range [0, filterCount).
ResultsDeduplicator(int filterCount)
: m_filterCount(filterCount)
, m_outputData(filterCount, {})
{}
void reportOutput(int index, const LocatorFilterEntries &outputData)
// Called directly by running filters. The calls may come from main thread in case of
// e.g. Sync task or directly from other threads when AsyncTask was used.
{
QTC_ASSERT(index >= 0, return);
@@ -110,6 +140,8 @@ public:
m_waitCondition.wakeOne();
}
// Called when the LocatorMatcher was canceled. It wakes the separate thread in order to
// finish it, soon.
void cancel()
{
QMutexLocker locker(&m_mutex);
@@ -117,7 +149,7 @@ public:
m_waitCondition.wakeOne();
}
// Called from separate thread (OutputFilter's thread)
// Called from the separate thread (ResultsCollector's thread)
void run(QPromise<LocatorFilterEntries> &promise)
{
QList<std::optional<LocatorFilterEntries>> data;
@@ -171,13 +203,18 @@ public:
}
private:
// Called from the separate thread, exclusively by run(). Checks if the new data already
// came before sleeping with wait condition. If so, it doesn't sleep with wait condition,
// but returns the data collected in meantime. Otherwise, it calls wait() on wait condition.
bool waitForData(QList<std::optional<LocatorFilterEntries>> *data)
{
QMutexLocker locker(&m_mutex);
if (m_state == State::Canceled)
return false;
if (m_state == State::NewData) {
m_state = State::Awaiting;
m_state = State::Awaiting; // Mark the state as awaiting to detect new calls to
// setOutputData while the separate thread deduplicates the
// new data.
*data = m_outputData;
return true;
}
@@ -185,7 +222,9 @@ private:
QTC_ASSERT(m_state != State::Awaiting, return false);
if (m_state == State::Canceled)
return false;
m_state = State::Awaiting;
m_state = State::Awaiting; // Mark the state as awaiting to detect new calls to
// setOutputData while the separate thread deduplicates the
// new data.
*data = m_outputData;
return true;
}
@@ -197,18 +236,26 @@ private:
QList<std::optional<LocatorFilterEntries>> m_outputData;
};
class OutputFilter : public QObject
// This instance of this object is created by LocatorMatcher tree.
// It starts a separate thread which collects and deduplicates the results reported
// by LocatorStorage instances. The ResultsCollector is started as a first task in
// LocatorMatcher and runs in parallel to all the filters started by LocatorMatcher.
// When all the results are reported (the expected number of reports is set with setFilterCount()),
// the ResultsCollector finishes. The intermediate results are reported with
// serialOutputDataReady() signal.
// The object of ResultsCollector is registered in Tasking namespace with Tasking::Collector name.
class ResultsCollector : public QObject
{
Q_OBJECT
public:
~OutputFilter();
~ResultsCollector();
void setFilterCount(int count);
void start();
bool isRunning() const { return m_watcher.get(); }
std::shared_ptr<OutputDataProvider> dataProvider() const { return m_dataProvider; }
std::shared_ptr<ResultsDeduplicator> deduplicator() const { return m_deduplicator; }
signals:
void serialOutputDataReady(const LocatorFilterEntries &serialOutputData);
@@ -217,19 +264,19 @@ signals:
private:
int m_filterCount = 0;
std::unique_ptr<QFutureWatcher<LocatorFilterEntries>> m_watcher;
std::shared_ptr<OutputDataProvider> m_dataProvider;
std::shared_ptr<ResultsDeduplicator> m_deduplicator;
};
OutputFilter::~OutputFilter()
ResultsCollector::~ResultsCollector()
{
if (!isRunning())
return;
m_dataProvider->cancel();
m_deduplicator->cancel();
Internal::CorePlugin::futureSynchronizer()->addFuture(m_watcher->future());
}
void OutputFilter::setFilterCount(int count)
void ResultsCollector::setFilterCount(int count)
{
QTC_ASSERT(!isRunning(), return);
QTC_ASSERT(count >= 0, return);
@@ -237,7 +284,7 @@ void OutputFilter::setFilterCount(int count)
m_filterCount = count;
}
void OutputFilter::start()
void ResultsCollector::start()
{
QTC_ASSERT(!m_watcher, return);
QTC_ASSERT(!isRunning(), return);
@@ -246,7 +293,7 @@ void OutputFilter::start()
return;
}
m_dataProvider.reset(new OutputDataProvider(m_filterCount));
m_deduplicator.reset(new ResultsDeduplicator(m_filterCount));
m_watcher.reset(new QFutureWatcher<LocatorFilterEntries>);
connect(m_watcher.get(), &QFutureWatcherBase::resultReadyAt, this, [this](int index) {
emit serialOutputDataReady(m_watcher->resultAt(index));
@@ -254,29 +301,29 @@ void OutputFilter::start()
connect(m_watcher.get(), &QFutureWatcherBase::finished, this, [this] {
emit done();
m_watcher.release()->deleteLater();
m_dataProvider.reset();
m_deduplicator.reset();
});
// TODO: When filterCount == 1, deliver results directly and finish?
auto filter = [](QPromise<LocatorFilterEntries> &promise,
const std::shared_ptr<OutputDataProvider> &dataProvider) {
dataProvider->run(promise);
auto deduplicate = [](QPromise<LocatorFilterEntries> &promise,
const std::shared_ptr<ResultsDeduplicator> &deduplicator) {
deduplicator->run(promise);
};
m_watcher->setFuture(Utils::asyncRun(filter, m_dataProvider));
m_watcher->setFuture(Utils::asyncRun(deduplicate, m_deduplicator));
}
class OutputFilterAdapter : public Tasking::TaskAdapter<OutputFilter>
class ResultsCollectorAdapter : public Tasking::TaskAdapter<ResultsCollector>
{
public:
OutputFilterAdapter() {
connect(task(), &OutputFilter::done, this, [this] { emit done(true); });
ResultsCollectorAdapter() {
connect(task(), &ResultsCollector::done, this, [this] { emit done(true); });
}
void start() final { task()->start(); }
};
} // namespace Core
QTC_DECLARE_CUSTOM_TASK(Filter, Core::OutputFilterAdapter);
QTC_DECLARE_CUSTOM_TASK(Collector, Core::ResultsCollectorAdapter);
namespace Core {
@@ -284,10 +331,10 @@ class LocatorStoragePrivate
{
public:
LocatorStoragePrivate(const QString &input, int index,
const std::shared_ptr<OutputDataProvider> &dataProvider)
const std::shared_ptr<ResultsDeduplicator> &deduplicator)
: m_input(input)
, m_index(index)
, m_dataProvider(dataProvider)
, m_deduplicator(deduplicator)
{}
QString input() const { return m_input; }
@@ -295,14 +342,14 @@ public:
void reportOutput(const LocatorFilterEntries &outputData)
{
QMutexLocker locker(&m_mutex);
QTC_ASSERT(m_dataProvider, return);
QTC_ASSERT(m_deduplicator, return);
reportOutputImpl(outputData);
}
void finalize()
{
QMutexLocker locker(&m_mutex);
if (m_dataProvider)
if (m_deduplicator)
reportOutputImpl({});
}
@@ -311,14 +358,14 @@ private:
void reportOutputImpl(const LocatorFilterEntries &outputData)
{
QTC_ASSERT(m_index >= 0, return);
m_dataProvider->reportOutput(m_index, outputData);
m_deduplicator->reportOutput(m_index, outputData);
// Deliver results only once for all copies of the storage, drop ref afterwards
m_dataProvider.reset();
m_deduplicator.reset();
}
const QString m_input;
const int m_index = -1;
std::shared_ptr<OutputDataProvider> m_dataProvider;
std::shared_ptr<ResultsDeduplicator> m_deduplicator;
QMutex m_mutex = {};
};
@@ -378,35 +425,36 @@ void LocatorMatcher::start()
using namespace Tasking;
struct FilterStorage
struct CollectorStorage
{
OutputFilter *m_filter = nullptr;
ResultsCollector *m_collector = nullptr;
};
TreeStorage<FilterStorage> filterStorage;
TreeStorage<CollectorStorage> collectorStorage;
const auto onFilterSetup = [this, filterCount = d->m_tasks.size(), filterStorage](OutputFilter &filter) {
filterStorage->m_filter = &filter;
filter.setFilterCount(filterCount);
connect(&filter, &OutputFilter::serialOutputDataReady,
const int filterCount = d->m_tasks.size();
const auto onCollectorSetup = [this, filterCount, collectorStorage](ResultsCollector &collector) {
collectorStorage->m_collector = &collector;
collector.setFilterCount(filterCount);
connect(&collector, &ResultsCollector::serialOutputDataReady,
this, [this](const LocatorFilterEntries &serialOutputData) {
d->m_output += serialOutputData;
emit serialOutputDataReady(serialOutputData);
});
};
const auto onFilterDone = [filterStorage](const OutputFilter &filter) {
Q_UNUSED(filter)
filterStorage->m_filter = nullptr;
const auto onCollectorDone = [collectorStorage](const ResultsCollector &collector) {
Q_UNUSED(collector)
collectorStorage->m_collector = nullptr;
};
QList<TaskItem> parallelTasks { ParallelLimit(d->m_parallelLimit) };
const auto onGroupSetup = [this, filterStorage](const TreeStorage<LocatorStorage> &storage,
const auto onGroupSetup = [this, collectorStorage](const TreeStorage<LocatorStorage> &storage,
int index) {
return [this, filterStorage, storage, index] {
OutputFilter *outputFilter = filterStorage->m_filter;
QTC_ASSERT(outputFilter, return);
return [this, collectorStorage, storage, index] {
ResultsCollector *collector = collectorStorage->m_collector;
QTC_ASSERT(collector, return);
*storage = std::make_shared<LocatorStoragePrivate>(d->m_input, index,
outputFilter->dataProvider());
collector->deduplicator());
};
};
@@ -431,8 +479,8 @@ void LocatorMatcher::start()
const Group root {
parallel,
Storage(filterStorage),
Filter(onFilterSetup, onFilterDone, onFilterDone),
Storage(collectorStorage),
Collector(onCollectorSetup, onCollectorDone, onCollectorDone),
Group {
parallelTasks
}