AsyncTask: Make it possible to run with promise

Using QtConcurrent API.
Adapt asynctask tests according to new API.
The old API in going to be removed, soon.

Change-Id: I3cf163e9492526f0b49bd162c27e6c55a98ace7a
Reviewed-by: Eike Ziller <eike.ziller@qt.io>
This commit is contained in:
Jarek Kobus
2023-02-11 18:43:19 +01:00
parent 7fe9363395
commit efc4a0f1af
3 changed files with 152 additions and 62 deletions

View File

@@ -3,6 +3,26 @@
#include "asynctask.h"
#include <QCoreApplication>
namespace Utils {
static int s_maxThreadCount = INT_MAX;
class AsyncThreadPool : public QThreadPool
{
public:
AsyncThreadPool() {
setMaxThreadCount(s_maxThreadCount);
moveToThread(qApp->thread());
}
};
Q_GLOBAL_STATIC(AsyncThreadPool, s_asyncThreadPool);
QThreadPool *asyncThreadPool()
{
return s_asyncThreadPool;
}
} // namespace Utils

View File

@@ -11,9 +11,12 @@
#include "tasktree.h"
#include <QFutureWatcher>
#include <QtConcurrent>
namespace Utils {
QTCREATOR_UTILS_EXPORT QThreadPool *asyncThreadPool();
class QTCREATOR_UTILS_EXPORT AsyncTaskBase : public QObject
{
Q_OBJECT
@@ -38,7 +41,11 @@ public:
m_watcher.waitForFinished();
}
using StartHandler = std::function<QFuture<ResultType>()>;
template <typename Function, typename ...Args>
void setConcurrentCallData(Function &&function, Args &&...args)
{
return wrapConcurrent(std::forward<Function>(function), std::forward<Args>(args)...);
}
template <typename Function, typename ...Args>
void setAsyncCallData(const Function &function, const Args &...args)
@@ -69,6 +76,41 @@ public:
bool isResultAvailable() const { return future().resultCount(); }
private:
template <typename Function, typename ...Args>
void wrapConcurrent(Function &&function, Args &&...args)
{
m_startHandler = [=] {
return callConcurrent(function, args...);
};
}
template <typename Function, typename ...Args>
void wrapConcurrent(std::reference_wrapper<const Function> &&wrapper, Args &&...args)
{
m_startHandler = [=] {
return callConcurrent(std::forward<const Function>(wrapper.get()), args...);
};
}
template <typename Function, typename ...Args>
auto callConcurrent(Function &&function, Args &&...args)
{
// Notice: we can't just call:
//
// return QtConcurrent::run(function, args...);
//
// since there is no way of passing m_priority there.
// There is an overload with thread pool, however, there is no overload with priority.
//
// Below implementation copied from QtConcurrent::run():
QThreadPool *threadPool = m_threadPool ? m_threadPool : asyncThreadPool();
QtConcurrent::DecayedTuple<Function, Args...>
tuple{std::forward<Function>(function), std::forward<Args>(args)...};
return QtConcurrent::TaskResolver<std::decay_t<Function>, std::decay_t<Args>...>
::run(std::move(tuple), QtConcurrent::TaskStartParameters{threadPool, m_priority});
}
using StartHandler = std::function<QFuture<ResultType>()>;
StartHandler m_startHandler;
FutureSynchronizer *m_synchronizer = nullptr;
QThreadPool *m_threadPool = nullptr;

View File

@@ -24,78 +24,109 @@ private:
QThreadPool m_threadPool;
};
void report3(QFutureInterface<int> &fi)
void report3(QPromise<int> &promise)
{
fi.reportResults({0, 2, 1});
promise.addResult(0);
promise.addResult(2);
promise.addResult(1);
}
void reportN(QFutureInterface<double> &fi, int n)
void reportN(QPromise<double> &promise, int n)
{
fi.reportResults(QVector<double>(n, 0));
for (int i = 0; i < n; ++i)
promise.addResult(0);
}
void reportString1(QFutureInterface<QString> &fi, const QString &s)
void reportString1(QPromise<QString> &promise, const QString &s)
{
fi.reportResult(s);
promise.addResult(s);
}
void reportString2(QFutureInterface<QString> &fi, QString s)
void reportString2(QPromise<QString> &promise, QString s)
{
fi.reportResult(s);
promise.addResult(s);
}
class Callable {
public:
void operator()(QFutureInterface<double> &fi, int n) const
void operator()(QPromise<double> &promise, int n) const
{
fi.reportResults(QVector<double>(n, 0));
for (int i = 0; i < n; ++i)
promise.addResult(0);
}
};
class MyObject {
public:
static void staticMember0(QFutureInterface<double> &fi)
static void staticMember0(QPromise<double> &promise)
{
fi.reportResults({0, 2, 1});
promise.addResult(0);
promise.addResult(2);
promise.addResult(1);
}
static void staticMember1(QFutureInterface<double> &fi, int n)
static void staticMember1(QPromise<double> &promise, int n)
{
fi.reportResults(QVector<double>(n, 0));
for (int i = 0; i < n; ++i)
promise.addResult(0);
}
void member0(QFutureInterface<double> &fi) const
void member0(QPromise<double> &promise) const
{
fi.reportResults({0, 2, 1});
promise.addResult(0);
promise.addResult(2);
promise.addResult(1);
}
void member1(QFutureInterface<double> &fi, int n) const
void member1(QPromise<double> &promise, int n) const
{
fi.reportResults(QVector<double>(n, 0));
for (int i = 0; i < n; ++i)
promise.addResult(0);
}
void memberString1(QFutureInterface<QString> &fi, const QString &s) const
void memberString1(QPromise<QString> &promise, const QString &s) const
{
fi.reportResult(s);
promise.addResult(s);
}
void memberString2(QFutureInterface<QString> &fi, QString s) const
void memberString2(QPromise<QString> &promise, QString s) const
{
fi.reportResult(s);
promise.addResult(s);
}
void nonConstMember(QFutureInterface<double> &fi)
void nonConstMember(QPromise<double> &promise)
{
fi.reportResults({0, 2, 1});
promise.addResult(0);
promise.addResult(2);
promise.addResult(1);
}
};
template<typename...>
struct FutureArgType;
template<typename Arg>
struct FutureArgType<QFuture<Arg>>
{
using Type = Arg;
};
template<typename...>
struct ConcurrentResultType;
template<typename Function, typename ...Args>
struct ConcurrentResultType<Function, Args...>
{
using Type = typename FutureArgType<decltype(QtConcurrent::run(
std::declval<Function>(), std::declval<Args>()...))>::Type;
};
template <typename Function, typename ...Args,
typename ResultType = typename Internal::resultType<Function>::type>
std::shared_ptr<AsyncTask<ResultType>> createAsyncTask(const Function &function, const Args &...args)
typename ResultType = typename ConcurrentResultType<Function, Args...>::Type>
std::shared_ptr<AsyncTask<ResultType>> createAsyncTask(Function &&function, Args &&...args)
{
auto asyncTask = std::make_shared<AsyncTask<ResultType>>();
asyncTask->setAsyncCallData(function, args...);
asyncTask->setConcurrentCallData(std::forward<Function>(function), std::forward<Args>(args)...);
asyncTask->start();
return asyncTask;
}
@@ -136,14 +167,16 @@ void tst_AsyncTask::runAsync()
QList<QString>({QString(QLatin1String("rvalue"))}));
// lambda
QCOMPARE(createAsyncTask([](QFutureInterface<double> &fi, int n) {
fi.reportResults(QVector<double>(n, 0));
QCOMPARE(createAsyncTask([](QPromise<double> &promise, int n) {
for (int i = 0; i < n; ++i)
promise.addResult(0);
}, 3)->results(),
QList<double>({0, 0, 0}));
// std::function
const std::function<void(QFutureInterface<double>&,int)> fun = [](QFutureInterface<double> &fi, int n) {
fi.reportResults(QVector<double>(n, 0));
const std::function<void(QPromise<double>&,int)> fun = [](QPromise<double> &promise, int n) {
for (int i = 0; i < n; ++i)
promise.addResult(0);
};
QCOMPARE(createAsyncTask(fun, 2)->results(),
QList<double>({0, 0}));
@@ -190,51 +223,46 @@ void tst_AsyncTask::runAsync()
void tst_AsyncTask::crefFunction()
{
// free function pointer with future interface
// free function pointer with promise
auto fun = &report3;
QCOMPARE(createAsyncTask(std::cref(fun))->results(),
QList<int>({0, 2, 1}));
// lambda with future interface
auto lambda = [](QFutureInterface<double> &fi, int n) {
fi.reportResults(QVector<double>(n, 0));
// lambda with promise
auto lambda = [](QPromise<double> &promise, int n) {
for (int i = 0; i < n; ++i)
promise.addResult(0);
};
QCOMPARE(createAsyncTask(std::cref(lambda), 3)->results(),
QList<double>({0, 0, 0}));
// std::function with future interface
const std::function<void(QFutureInterface<double>&,int)> funObj = [](QFutureInterface<double> &fi, int n) {
fi.reportResults(QVector<double>(n, 0));
// std::function with promise
const std::function<void(QPromise<double>&,int)> funObj = [](QPromise<double> &promise, int n) {
for (int i = 0; i < n; ++i)
promise.addResult(0);
};
QCOMPARE(createAsyncTask(std::cref(funObj), 2)->results(),
QList<double>({0, 0}));
// callable with future interface
// callable with promise
const Callable c{};
QCOMPARE(createAsyncTask(std::cref(c), 2)->results(),
QList<double>({0, 0}));
// member functions with future interface
// member functions with promise
auto member = &MyObject::member0;
const MyObject obj{};
QCOMPARE(createAsyncTask(std::cref(member), &obj)->results(),
QList<double>({0, 2, 1}));
}
template <typename Function, typename ...Args,
typename ResultType = typename Internal::resultType<Function>::type>
typename AsyncTask<ResultType>::StartHandler startHandler(const Function &function, const Args &...args)
{
return [=] { return Utils::runAsync(function, args...); };
}
void tst_AsyncTask::futureSynchonizer()
{
auto lambda = [](QFutureInterface<int> &fi) {
auto lambda = [](QPromise<int> &promise) {
while (true) {
if (fi.isCanceled()) {
fi.reportCanceled();
fi.reportFinished();
if (promise.isCanceled()) {
promise.future().cancel();
promise.finish();
return;
}
QThread::msleep(100);
@@ -244,7 +272,7 @@ void tst_AsyncTask::futureSynchonizer()
FutureSynchronizer synchronizer;
{
AsyncTask<int> task;
task.setAsyncCallData(lambda);
task.setConcurrentCallData(lambda);
task.setFutureSynchronizer(&synchronizer);
task.start();
QThread::msleep(10);
@@ -257,7 +285,7 @@ void tst_AsyncTask::futureSynchonizer()
// The destructor of synchronizer should wait for about 90 ms for worker thread to be canceled
}
void multiplyBy2(QFutureInterface<int> &fi, int input) { fi.reportResult(input * 2); }
void multiplyBy2(QPromise<int> &promise, int input) { promise.addResult(input * 2); }
void tst_AsyncTask::taskTree()
{
@@ -266,7 +294,7 @@ void tst_AsyncTask::taskTree()
int value = 1;
const auto setupIntAsync = [&](AsyncTask<int> &task) {
task.setAsyncCallData(multiplyBy2, value);
task.setConcurrentCallData(multiplyBy2, value);
};
const auto handleIntAsync = [&](const AsyncTask<int> &task) {
value = task.result();
@@ -294,9 +322,9 @@ static int returnxx(int x)
return x * x;
}
static void returnxxWithFI(QFutureInterface<int> &fi, int x)
static void returnxxWithPromise(QPromise<int> &promise, int x)
{
fi.reportResult(x * x);
promise.addResult(x * x);
}
static double s_sum = 0;
@@ -315,13 +343,13 @@ void tst_AsyncTask::mapReduce_data()
s_results.append(s_sum);
};
const auto setupAsync = [](AsyncTask<int> &task, int input) {
task.setAsyncCallData(returnxx, input);
task.setConcurrentCallData(returnxx, input);
};
const auto setupAsyncWithFI = [](AsyncTask<int> &task, int input) {
task.setAsyncCallData(returnxxWithFI, input);
task.setConcurrentCallData(returnxxWithPromise, input);
};
const auto setupAsyncWithTP = [this](AsyncTask<int> &task, int input) {
task.setAsyncCallData(returnxx, input);
task.setConcurrentCallData(returnxx, input);
task.setThreadPool(&m_threadPool);
};
const auto handleAsync = [](const AsyncTask<int> &task) {
@@ -377,7 +405,7 @@ void tst_AsyncTask::mapReduce_data()
QTest::newRow("SequentialWithThreadPool") << sequentialRootWithTP << defaultSum << defaultResult;
const auto setupSimpleAsync = [](AsyncTask<int> &task, int input) {
task.setAsyncCallData([](int input) { return input * 2; }, input);
task.setConcurrentCallData([](int input) { return input * 2; }, input);
};
const auto handleSimpleAsync = [](const AsyncTask<int> &task) {
s_sum += task.result() / 4.;
@@ -393,7 +421,7 @@ void tst_AsyncTask::mapReduce_data()
QTest::newRow("Simple") << simpleRoot << 3.0 << QList<double>({.5, 1.5, 3.});
const auto setupStringAsync = [](AsyncTask<int> &task, const QString &input) {
task.setAsyncCallData([](const QString &input) -> int { return input.size(); }, input);
task.setConcurrentCallData([](const QString &input) -> int { return input.size(); }, input);
};
const auto handleStringAsync = [](const AsyncTask<int> &task) {
s_sum /= task.result();