mapReduce: Option for ordered reduce

Change-Id: Ifdf6535315e2032aa5d9d2b56587e27b5dc4b450
Reviewed-by: Tobias Hunger <tobias.hunger@theqtcompany.com>
This commit is contained in:
Eike Ziller
2016-02-23 15:10:01 +01:00
parent e426d08e54
commit 4996be216d
3 changed files with 100 additions and 41 deletions

View File

@@ -31,6 +31,13 @@
#include <QFutureWatcher>
namespace Utils {
enum class MapReduceOption
{
Ordered,
Unordered
};
namespace Internal {
class QTCREATOR_UTILS_EXPORT MapReduceObject : public QObject
@@ -48,7 +55,8 @@ protected:
public:
MapReduceBase(QFutureInterface<ReduceResult> futureInterface, ForwardIterator begin, ForwardIterator end,
MapFunction &&map, State &state, ReduceFunction &&reduce, int size)
MapFunction &&map, State &state, ReduceFunction &&reduce,
MapReduceOption option, int size)
: m_futureInterface(futureInterface),
m_iterator(begin),
m_end(end),
@@ -56,7 +64,8 @@ public:
m_state(state),
m_reduce(std::forward<ReduceFunction>(reduce)),
m_handleProgress(size >= 0),
m_size(size)
m_size(size),
m_option(option)
{
if (m_handleProgress) // progress is handled by us
m_futureInterface.setProgressRange(0, MAX_PROGRESS);
@@ -72,7 +81,7 @@ public:
}
protected:
virtual void reduce(QFutureWatcher<MapResult> *watcher) = 0;
virtual void reduce(QFutureWatcher<MapResult> *watcher, int index) = 0;
bool schedule()
{
@@ -90,6 +99,8 @@ protected:
this, &MapReduceBase::updateProgress);
}
m_mapWatcher.append(watcher);
m_watcherIndex.append(m_currentIndex);
++m_currentIndex;
watcher->setFuture(runAsync(&m_threadPool, std::cref(m_map),
ItemReferenceWrapper(*m_iterator)));
++m_iterator;
@@ -99,7 +110,10 @@ protected:
void mapFinished(QFutureWatcher<MapResult> *watcher)
{
m_mapWatcher.removeOne(watcher); // remove so we can schedule next one
int index = m_mapWatcher.indexOf(watcher);
int watcherIndex = m_watcherIndex.at(index);
m_mapWatcher.removeAt(index); // remove so we can schedule next one
m_watcherIndex.removeAt(index);
bool didSchedule = false;
if (!m_futureInterface.isCanceled()) {
// first schedule the next map...
@@ -107,7 +121,7 @@ protected:
++m_successfullyFinishedMapCount;
updateProgress();
// ...then reduce
reduce(watcher);
reduce(watcher, watcherIndex);
}
delete watcher;
if (!didSchedule && m_mapWatcher.isEmpty())
@@ -151,9 +165,12 @@ protected:
QEventLoop m_loop;
QThreadPool m_threadPool; // for reusing threads
QList<QFutureWatcher<MapResult> *> m_mapWatcher;
QList<int> m_watcherIndex;
int m_currentIndex = 0;
const bool m_handleProgress;
const int m_size;
int m_successfullyFinishedMapCount = 0;
MapReduceOption m_option;
};
// non-void result of map function.
@@ -163,22 +180,45 @@ class MapReduce : public MapReduceBase<ForwardIterator, MapResult, MapFunction,
using BaseType = MapReduceBase<ForwardIterator, MapResult, MapFunction, State, ReduceResult, ReduceFunction>;
public:
MapReduce(QFutureInterface<ReduceResult> futureInterface, ForwardIterator begin, ForwardIterator end,
MapFunction &&map, State &state, ReduceFunction &&reduce, int size)
MapFunction &&map, State &state, ReduceFunction &&reduce, MapReduceOption option, int size)
: BaseType(futureInterface, begin, end, std::forward<MapFunction>(map), state,
std::forward<ReduceFunction>(reduce), size)
std::forward<ReduceFunction>(reduce), option, size)
{
}
protected:
void reduce(QFutureWatcher<MapResult> *watcher) override
void reduce(QFutureWatcher<MapResult> *watcher, int index) override
{
const int resultCount = watcher->future().resultCount();
for (int i = 0; i < resultCount; ++i) {
Internal::runAsyncImpl(BaseType::m_futureInterface, BaseType::m_reduce,
BaseType::m_state, watcher->future().resultAt(i));
if (BaseType::m_option == MapReduceOption::Unordered) {
reduceOne(watcher->future().results());
} else {
if (m_nextIndex == index) {
// handle this result and all directly following
reduceOne(watcher->future().results());
++m_nextIndex;
while (!m_pendingResults.isEmpty() && m_pendingResults.firstKey() == m_nextIndex) {
reduceOne(m_pendingResults.take(m_nextIndex));
++m_nextIndex;
}
} else {
// add result to pending results
m_pendingResults.insert(index, watcher->future().results());
}
}
}
private:
void reduceOne(const QList<MapResult> &results)
{
const int resultCount = results.size();
for (int i = 0; i < resultCount; ++i) {
Internal::runAsyncImpl(BaseType::m_futureInterface, BaseType::m_reduce,
BaseType::m_state, results.at(i));
}
}
QMap<int, QList<MapResult>> m_pendingResults;
int m_nextIndex = 0;
};
// specialization for void result of map function. Reducing is a no-op.
@@ -188,14 +228,14 @@ class MapReduce<ForwardIterator, void, MapFunction, State, ReduceResult, ReduceF
using BaseType = MapReduceBase<ForwardIterator, void, MapFunction, State, ReduceResult, ReduceFunction>;
public:
MapReduce(QFutureInterface<ReduceResult> futureInterface, ForwardIterator begin, ForwardIterator end,
MapFunction &&map, State &state, ReduceFunction &&reduce, int size)
MapFunction &&map, State &state, ReduceFunction &&reduce, MapReduceOption option, int size)
: BaseType(futureInterface, begin, end, std::forward<MapFunction>(map), state,
std::forward<ReduceFunction>(reduce), size)
std::forward<ReduceFunction>(reduce), option, size)
{
}
protected:
void reduce(QFutureWatcher<void> *) override
void reduce(QFutureWatcher<void> *, int) override
{
}
@@ -205,12 +245,13 @@ template <typename ForwardIterator, typename InitFunction, typename MapFunction,
typename ReduceFunction, typename CleanUpFunction>
void blockingIteratorMapReduce(QFutureInterface<ReduceResult> &futureInterface, ForwardIterator begin, ForwardIterator end,
InitFunction &&init, MapFunction &&map,
ReduceFunction &&reduce, CleanUpFunction &&cleanup, int size)
ReduceFunction &&reduce, CleanUpFunction &&cleanup,
MapReduceOption option, int size)
{
auto state = init(futureInterface);
MapReduce<ForwardIterator, typename Internal::resultType<MapFunction>::type, MapFunction, decltype(state), ReduceResult, ReduceFunction>
mr(futureInterface, begin, end, std::forward<MapFunction>(map), state,
std::forward<ReduceFunction>(reduce), size);
std::forward<ReduceFunction>(reduce), option, size);
mr.exec();
cleanup(futureInterface, state);
}
@@ -219,12 +260,14 @@ template <typename Container, typename InitFunction, typename MapFunction, typen
typename ReduceFunction, typename CleanUpFunction>
void blockingContainerMapReduce(QFutureInterface<ReduceResult> &futureInterface, Container &&container,
InitFunction &&init, MapFunction &&map,
ReduceFunction &&reduce, CleanUpFunction &&cleanup)
ReduceFunction &&reduce, CleanUpFunction &&cleanup,
MapReduceOption option)
{
blockingIteratorMapReduce(futureInterface, std::begin(container), std::end(container),
std::forward<InitFunction>(init), std::forward<MapFunction>(map),
std::forward<ReduceFunction>(reduce),
std::forward<CleanUpFunction>(cleanup), container.size());
std::forward<CleanUpFunction>(cleanup),
option, container.size());
}
template <typename Container, typename InitFunction, typename MapFunction, typename ReduceResult,
@@ -232,12 +275,14 @@ template <typename Container, typename InitFunction, typename MapFunction, typen
void blockingContainerRefMapReduce(QFutureInterface<ReduceResult> &futureInterface,
std::reference_wrapper<Container> containerWrapper,
InitFunction &&init, MapFunction &&map,
ReduceFunction &&reduce, CleanUpFunction &&cleanup)
ReduceFunction &&reduce, CleanUpFunction &&cleanup,
MapReduceOption option)
{
blockingContainerMapReduce(futureInterface, containerWrapper.get(),
std::forward<InitFunction>(init), std::forward<MapFunction>(map),
std::forward<ReduceFunction>(reduce),
std::forward<CleanUpFunction>(cleanup));
std::forward<CleanUpFunction>(cleanup),
option);
}
template <typename ReduceResult>
@@ -262,7 +307,8 @@ template <typename ForwardIterator, typename InitFunction, typename MapFunction,
typename ReduceResult = typename Internal::resultType<ReduceFunction>::type>
QFuture<ReduceResult>
mapReduce(ForwardIterator begin, ForwardIterator end, InitFunction &&init, MapFunction &&map,
ReduceFunction &&reduce, CleanUpFunction &&cleanup, int size = -1)
ReduceFunction &&reduce, CleanUpFunction &&cleanup,
MapReduceOption option = MapReduceOption::Unordered, int size = -1)
{
return runAsync(Internal::blockingIteratorMapReduce<
ForwardIterator,
@@ -273,7 +319,7 @@ mapReduce(ForwardIterator begin, ForwardIterator end, InitFunction &&init, MapFu
typename std::decay<CleanUpFunction>::type>,
begin, end, std::forward<InitFunction>(init), std::forward<MapFunction>(map),
std::forward<ReduceFunction>(reduce), std::forward<CleanUpFunction>(cleanup),
size);
option, size);
}
/*!
@@ -325,7 +371,8 @@ template <typename Container, typename InitFunction, typename MapFunction,
typename ReduceResult = typename Internal::resultType<ReduceFunction>::type>
QFuture<ReduceResult>
mapReduce(Container &&container, InitFunction &&init, MapFunction &&map,
ReduceFunction &&reduce, CleanUpFunction &&cleanup)
ReduceFunction &&reduce, CleanUpFunction &&cleanup,
MapReduceOption option = MapReduceOption::Unordered)
{
return runAsync(Internal::blockingContainerMapReduce<
typename std::decay<Container>::type,
@@ -335,7 +382,8 @@ mapReduce(Container &&container, InitFunction &&init, MapFunction &&map,
typename std::decay<CleanUpFunction>::type>,
std::forward<Container>(container),
std::forward<InitFunction>(init), std::forward<MapFunction>(map),
std::forward<ReduceFunction>(reduce), std::forward<CleanUpFunction>(cleanup));
std::forward<ReduceFunction>(reduce), std::forward<CleanUpFunction>(cleanup),
option);
}
template <typename Container, typename InitFunction, typename MapFunction,
@@ -343,7 +391,8 @@ template <typename Container, typename InitFunction, typename MapFunction,
typename ReduceResult = typename Internal::resultType<ReduceFunction>::type>
QFuture<ReduceResult>
mapReduce(std::reference_wrapper<Container> containerWrapper, InitFunction &&init, MapFunction &&map,
ReduceFunction &&reduce, CleanUpFunction &&cleanup)
ReduceFunction &&reduce, CleanUpFunction &&cleanup,
MapReduceOption option = MapReduceOption::Unordered)
{
return runAsync(Internal::blockingContainerRefMapReduce<
Container,
@@ -354,20 +403,21 @@ mapReduce(std::reference_wrapper<Container> containerWrapper, InitFunction &&ini
typename std::decay<CleanUpFunction>::type>,
containerWrapper,
std::forward<InitFunction>(init), std::forward<MapFunction>(map),
std::forward<ReduceFunction>(reduce), std::forward<CleanUpFunction>(cleanup));
std::forward<ReduceFunction>(reduce), std::forward<CleanUpFunction>(cleanup),
option);
}
// TODO: Currently does not order its map results.
template <typename Container, typename MapFunction,
typename MapResult = typename Internal::resultType<MapFunction>::type>
QFuture<MapResult>
map(Container &&container, MapFunction &&map)
map(Container &&container, MapFunction &&map, MapReduceOption option = MapReduceOption::Ordered)
{
return mapReduce(std::forward<Container>(container),
Internal::dummyInit<MapResult>,
std::forward<MapFunction>(map),
Internal::DummyReduce<MapResult>(),
Internal::dummyCleanup<MapResult>);
Internal::dummyCleanup<MapResult>,
option);
}
} // Utils

View File

@@ -312,7 +312,7 @@ void Locator::refresh(QList<ILocatorFilter *> filters)
{
if (filters.isEmpty())
filters = m_filters;
QFuture<void> task = Utils::map(filters, &ILocatorFilter::refresh);
QFuture<void> task = Utils::map(filters, &ILocatorFilter::refresh, Utils::MapReduceOption::Unordered);
FutureProgress *progress =
ProgressManager::addTask(task, tr("Updating Locator Caches"), Constants::TASK_INDEX);
connect(progress, &FutureProgress::finished, this, &Locator::saveSettings);

View File

@@ -40,6 +40,7 @@ private slots:
void mapReduce();
void mapReduceRvalueContainer();
void map();
void orderedMapReduce();
#ifdef SUPPORTS_MOVE
void moveOnlyType();
#endif
@@ -125,13 +126,8 @@ void tst_MapReduce::mapReduceRvalueContainer()
void tst_MapReduce::map()
{
{
QList<double> results = Utils::map(QList<int>({2, 5, 1}),
[](int x) { return x*2.5; }
).results();
Utils::sort(results);
QCOMPARE(results, QList<double>({2.5, 5., 12.5}));
}
QCOMPARE(Utils::map(QList<int>({2, 5, 1}), [](int x) { return x*2.5; }).results(),
QList<double>({5., 12.5, 2.5}));
{
// void result
QList<int> results;
@@ -142,7 +138,9 @@ void tst_MapReduce::map()
// map
[&mutex, &results](int x) { QMutexLocker l(&mutex); results.append(x); }
).waitForFinished();
Utils::sort(results); // mapping order is undefined
// Utils::map is "ordered" by default, but that means that result reporting is ordered,
// the map function is still called out-of-order
qSort(results);
QCOMPARE(results, QList<int>({1, 2, 5}));
}
{
@@ -153,6 +151,17 @@ void tst_MapReduce::map()
}
}
void tst_MapReduce::orderedMapReduce()
{
QCOMPARE(Utils::mapReduce(QList<int>({1, 2, 3, 4}),
[](QFutureInterface<int> &) { return 0; },
[](int i) { return i*2; },
[](int &state, int val) { state += val; return state; },
[](QFutureInterface<int> &, int &) { },
Utils::MapReduceOption::Ordered).results(),
QList<int>({2, 6, 12, 20}));
}
#ifdef SUPPORTS_MOVE
class MoveOnlyType