TaskTree: Implement simple Loop functionality

Task-number: QTCREATORBUG-30081
Change-Id: I961d9c476d2af8742e461f92313ff0161d9de522
Reviewed-by: hjk <hjk@qt.io>
This commit is contained in:
Jarek Kobus
2024-01-08 14:39:11 +01:00
parent a50bc70b87
commit 52e0863bd2
2 changed files with 199 additions and 56 deletions

View File

@@ -1211,6 +1211,59 @@ static DoneWith toDoneWith(DoneResult result)
return result == DoneResult::Success ? DoneWith::Success : DoneWith::Error;
}
class LoopThreadData
{
Q_DISABLE_COPY_MOVE(LoopThreadData)
public:
LoopThreadData() = default;
void pushIteration(int index)
{
m_activeLoopStack.push_back(index);
}
void popIteration()
{
QT_ASSERT(m_activeLoopStack.size(), return);
m_activeLoopStack.pop_back();
}
int iteration() const
{
QT_ASSERT(m_activeLoopStack.size(), qWarning(
"The referenced loop is not reachable in the running tree. "
"A -1 will be returned which might lead to a crash in the calling code. "
"It is possible that no loop was added to the tree, "
"or the loop is not reachable from where it is referenced."); return -1);
return m_activeLoopStack.last();
}
private:
QList<int> m_activeLoopStack;
};
class LoopData
{
public:
LoopThreadData &threadData() {
const std::lock_guard lock(m_threadDataMutex);
return m_threadDataMap.try_emplace(QThread::currentThread()).first->second;
}
const Loop::Condition m_condition = {};
std::mutex m_threadDataMutex = {};
// Use std::map on purpose, so that it doesn't invalidate references on modifications.
// Don't optimize it by using std::unordered_map.
std::map<QThread *, LoopThreadData> m_threadDataMap = {};
};
Loop::Loop(const Condition &condition)
: m_loopData(new LoopData{condition})
{}
int Loop::iteration() const
{
return m_loopData->threadData().iteration();
}
using StoragePtr = void *;
class StorageThreadData
@@ -1306,6 +1359,11 @@ void GroupItem::addChildren(const QList<GroupItem> &children)
qWarning("Group workflow policy redefinition, overriding..."));
m_groupData.m_workflowPolicy = child.m_groupData.m_workflowPolicy;
}
if (child.m_groupData.m_loop) {
QT_ASSERT(!m_groupData.m_loop,
qWarning("Group loop redefinition, overriding..."));
m_groupData.m_loop = child.m_groupData.m_loop;
}
break;
case Type::TaskHandler:
QT_ASSERT(child.m_taskHandler.m_createHandler,
@@ -1357,11 +1415,14 @@ public:
~ExecutionContextActivator() {
for (int i = m_activeStorages.size() - 1; i >= 0; --i) // iterate in reverse order
m_activeStorages[i].m_storageData->threadData().popStorage();
for (int i = m_activeLoops.size() - 1; i >= 0; --i) // iterate in reverse order
m_activeLoops[i].m_loopData->threadData().popIteration();
}
private:
void activateContext(RuntimeIteration *iteration);
void activateContext(RuntimeContainer *container);
QList<Loop> m_activeLoops;
QList<StorageBase> m_activeStorages;
};
@@ -1375,9 +1436,10 @@ public:
TaskTreePrivate *const m_taskTreePrivate = nullptr;
const GroupItem::GroupHandler m_groupHandler;
const int m_parallelLimit = 1;
const WorkflowPolicy m_workflowPolicy = WorkflowPolicy::StopOnError;
const GroupItem::GroupHandler m_groupHandler;
const std::optional<Loop> m_loop;
const QList<StorageBase> m_storageList;
std::vector<TaskNode> m_children;
const int m_taskCount = 0;
@@ -1454,6 +1516,7 @@ public:
SetupResult childDone(RuntimeIteration *iteration, bool success);
void stop(RuntimeContainer *container);
bool invokeDoneHandler(RuntimeContainer *container, DoneWith doneWith);
bool invokeLoopHandler(RuntimeContainer *container);
template <typename Container, typename Handler, typename ...Args,
typename ReturnType = std::invoke_result_t<Handler, Args...>>
@@ -1502,8 +1565,7 @@ public:
, m_isProgressive(index ? false : isProgressive(container))
, m_container(container)
{}
int continueIndex() const;
int currentLimit() const;
std::optional<Loop> loop() const;
void deleteChild(RuntimeTask *node);
const int m_iterationIndex = 0;
@@ -1523,6 +1585,7 @@ public:
, m_parentTask(parentTask)
, m_storages(createStorages(taskContainer))
, m_successBit(initialSuccessBit(taskContainer.m_workflowPolicy))
, m_shouldIterate(taskContainer.m_loop)
{}
~RuntimeContainer()
@@ -1540,6 +1603,7 @@ public:
bool isStarting() const { return m_startGuard.isLocked(); }
RuntimeIteration *parentIteration() const;
bool updateSuccessBit(bool success);
void deleteFinishedIterations();
const ContainerNode &m_containerNode; // Not owning.
RuntimeTask *m_parentTask = nullptr; // Not owning.
@@ -1549,6 +1613,10 @@ public:
bool m_callStorageDoneHandlersOnDestruction = false;
Guard m_startGuard;
int m_iterationCount = 0;
int m_nextToStart = 0;
int m_runningChildren = 0;
bool m_shouldIterate = true;
std::vector<std::unique_ptr<RuntimeIteration>> m_iterations; // Owning.
};
@@ -1571,7 +1639,11 @@ bool isProgressive(RuntimeContainer *container)
void ExecutionContextActivator::activateContext(RuntimeIteration *iteration)
{
// TODO: activate iterator
std::optional<Loop> loop = iteration->loop();
if (loop) {
loop->m_loopData->threadData().pushIteration(iteration->m_iterationIndex);
m_activeLoops.append(*loop);
}
activateContext(iteration->m_container);
}
@@ -1646,18 +1718,9 @@ void TaskTreePrivate::emitDone(DoneWith result)
emit q->done(result);
}
int RuntimeIteration::continueIndex() const
std::optional<Loop> RuntimeIteration::loop() const
{
return m_doneCount ? qMin(m_doneCount + m_container->m_containerNode.m_parallelLimit - 1,
int(m_container->m_containerNode.m_children.size())) : 0;
}
int RuntimeIteration::currentLimit() const
{
// TODO: Handle children well
const int childCount = int(m_container->m_containerNode.m_children.size());
return m_container->m_containerNode.m_parallelLimit
? qMin(m_doneCount + m_container->m_containerNode.m_parallelLimit, childCount) : childCount;
return m_container->m_containerNode.m_loop;
}
void RuntimeIteration::deleteChild(RuntimeTask *task)
@@ -1681,9 +1744,10 @@ static std::vector<TaskNode> createChildren(TaskTreePrivate *taskTreePrivate,
ContainerNode::ContainerNode(TaskTreePrivate *taskTreePrivate, const GroupItem &task)
: m_taskTreePrivate(taskTreePrivate)
, m_groupHandler(task.m_groupData.m_groupHandler)
, m_parallelLimit(task.m_groupData.m_parallelLimit.value_or(1))
, m_workflowPolicy(task.m_groupData.m_workflowPolicy.value_or(WorkflowPolicy::StopOnError))
, m_groupHandler(task.m_groupData.m_groupHandler)
, m_loop(task.m_groupData.m_loop)
, m_storageList(task.m_storageList)
, m_children(createChildren(taskTreePrivate, task.m_children))
, m_taskCount(std::accumulate(m_children.cbegin(), m_children.cend(), 0,
@@ -1725,20 +1789,34 @@ bool RuntimeContainer::updateSuccessBit(bool success)
return m_successBit;
}
void RuntimeContainer::deleteFinishedIterations()
{
for (auto it = m_iterations.cbegin(); it != m_iterations.cend(); ) {
if (it->get()->m_doneCount == int(m_containerNode.m_children.size()))
it = m_iterations.erase(it);
else
++it;
}
}
SetupResult TaskTreePrivate::start(RuntimeContainer *container)
{
const ContainerNode &containerNode = container->m_containerNode;
SetupResult startAction = SetupResult::Continue;
if (container->m_containerNode.m_groupHandler.m_setupHandler) {
startAction = invokeHandler(container, container->m_containerNode.m_groupHandler.m_setupHandler);
if (containerNode.m_groupHandler.m_setupHandler) {
startAction = invokeHandler(container, containerNode.m_groupHandler.m_setupHandler);
if (startAction != SetupResult::Continue) {
if (isProgressive(container))
advanceProgress(container->m_containerNode.m_taskCount);
advanceProgress(containerNode.m_taskCount);
// Non-Continue SetupResult takes precedence over the workflow policy.
container->m_successBit = startAction == SetupResult::StopWithSuccess;
}
}
if (startAction == SetupResult::Continue && container->m_containerNode.m_children.empty())
if (startAction == SetupResult::Continue
&& (containerNode.m_children.empty()
|| (containerNode.m_loop && !invokeLoopHandler(container)))) {
startAction = toSetupResult(container->m_successBit);
}
return continueStart(container, startAction);
}
@@ -1767,41 +1845,65 @@ SetupResult TaskTreePrivate::continueStart(RuntimeContainer *container, SetupRes
SetupResult TaskTreePrivate::startChildren(RuntimeContainer *container)
{
if (container->m_containerNode.m_parallelLimit == 0 && !container->m_iterations.empty())
return SetupResult::Continue;
const ContainerNode &containerNode = container->m_containerNode;
const int childCount = int(containerNode.m_children.size());
if (container->m_iterations.empty())
container->m_iterations.emplace_back(std::make_unique<RuntimeIteration>(0, container));
if (container->m_iterationCount == 0) {
container->m_iterations.emplace_back(
std::make_unique<RuntimeIteration>(container->m_iterationCount, container));
++container->m_iterationCount;
} else if (containerNode.m_parallelLimit == 0) {
container->deleteFinishedIterations();
if (container->m_iterations.empty())
return toSetupResult(container->m_successBit);
return SetupResult::Continue;
}
GuardLocker locker(container->m_startGuard);
for (auto &iteration : container->m_iterations) {
for (int i = iteration->continueIndex(); i < int(container->m_containerNode.m_children.size()); ++i) {
const int limit = iteration->currentLimit();
if (i >= limit)
break;
RuntimeTask *newTask = new RuntimeTask{container->m_containerNode.m_children.at(i),
iteration.get()};
iteration->m_children.emplace_back(newTask);
const SetupResult startAction = start(newTask);
if (startAction == SetupResult::Continue)
continue;
const SetupResult finalizeAction = childDone(iteration.get(),
startAction == SetupResult::StopWithSuccess);
if (finalizeAction == SetupResult::Continue)
continue;
if (iteration->m_isProgressive) {
int skippedTaskCount = 0;
// Skip scheduled but not run yet. The current (i) was already notified.
for (int j = i + 1; j < limit; ++j)
skippedTaskCount += container->m_containerNode.m_children.at(j).taskCount();
advanceProgress(skippedTaskCount);
while (containerNode.m_parallelLimit == 0
|| container->m_runningChildren < containerNode.m_parallelLimit) {
container->deleteFinishedIterations();
if (container->m_nextToStart == childCount) {
if (container->m_shouldIterate && invokeLoopHandler(container)) {
container->m_nextToStart = 0;
container->m_iterations.emplace_back(
std::make_unique<RuntimeIteration>(container->m_iterationCount, container));
++container->m_iterationCount;
} else {
if (container->m_iterations.empty())
return toSetupResult(container->m_successBit);
return SetupResult::Continue;
}
return finalizeAction;
}
RuntimeIteration *iteration = container->m_iterations.back().get();
RuntimeTask *newTask = new RuntimeTask{containerNode.m_children.at(container->m_nextToStart),
iteration};
iteration->m_children.emplace_back(newTask);
++container->m_runningChildren;
++container->m_nextToStart;
const SetupResult startAction = start(newTask);
if (startAction == SetupResult::Continue)
continue;
const SetupResult finalizeAction = childDone(iteration,
startAction == SetupResult::StopWithSuccess);
if (finalizeAction == SetupResult::Continue)
continue;
if (iteration->m_isProgressive) {
int skippedTaskCount = 0;
// Skip scheduled but not run yet.
// The current m_nextToStart was already notified -> thus -1.
const int limit = containerNode.m_parallelLimit
? qMin(iteration->m_doneCount + containerNode.m_parallelLimit - 1, childCount)
: childCount;
for (int i = container->m_nextToStart; i < limit; ++i)
skippedTaskCount += containerNode.m_children.at(i).taskCount();
advanceProgress(skippedTaskCount);
}
return finalizeAction;
}
return SetupResult::Continue;
}
@@ -1817,10 +1919,10 @@ SetupResult TaskTreePrivate::childDone(RuntimeIteration *iteration, bool success
stop(container);
++iteration->m_doneCount;
--container->m_runningChildren;
const bool updatedSuccess = container->updateSuccessBit(success);
const SetupResult startAction
= (shouldStop || iteration->m_doneCount == int(container->m_containerNode.m_children.size()))
? toSetupResult(updatedSuccess) : SetupResult::Continue;
const SetupResult startAction = shouldStop ? toSetupResult(updatedSuccess)
: SetupResult::Continue;
if (container->isStarting())
return startAction;
@@ -1829,15 +1931,16 @@ SetupResult TaskTreePrivate::childDone(RuntimeIteration *iteration, bool success
void TaskTreePrivate::stop(RuntimeContainer *container)
{
const ContainerNode &containerNode = container->m_containerNode;
for (auto &iteration : container->m_iterations) {
for (auto &child : iteration->m_children)
stop(child.get());
if (iteration->m_isProgressive) {
if (iteration->m_isProgressive && containerNode.m_parallelLimit > 0) {
int skippedTaskCount = 0;
for (int i = iteration->currentLimit();
i < int(container->m_containerNode.m_children.size()); ++i) {
skippedTaskCount += container->m_containerNode.m_children.at(i).taskCount();
for (int i = iteration->m_doneCount + containerNode.m_parallelLimit;
i < int(containerNode.m_children.size()); ++i) {
skippedTaskCount += containerNode.m_children.at(i).taskCount();
}
advanceProgress(skippedTaskCount);
}
@@ -1862,6 +1965,16 @@ bool TaskTreePrivate::invokeDoneHandler(RuntimeContainer *container, DoneWith do
return result == DoneResult::Success;
}
bool TaskTreePrivate::invokeLoopHandler(RuntimeContainer *container)
{
if (container->m_shouldIterate) {
container->m_shouldIterate = invokeHandler(container,
container->m_containerNode.m_loop->m_loopData->m_condition,
container->m_iterationCount);
}
return container->m_shouldIterate;
}
SetupResult TaskTreePrivate::start(RuntimeTask *node)
{
if (!node->m_taskNode.isTask()) {

View File

@@ -76,6 +76,7 @@ Q_ENUM_NS(CallDoneIf);
TASKING_EXPORT DoneResult toDoneResult(bool success);
class LoopData;
class StorageData;
class TaskTreePrivate;
@@ -96,6 +97,32 @@ protected:
virtual void start() = 0;
};
class TASKING_EXPORT Loop
{
public:
using Condition = std::function<bool(int)>; // Takes iteration, called prior to each iteration.
Loop(const Condition &condition);
int iteration() const;
private:
friend class ExecutionContextActivator;
friend class TaskTreePrivate;
QSharedPointer<LoopData> m_loopData;
};
class TASKING_EXPORT Forever final : public Loop
{
public:
Forever() : Loop([](int) { return true; }) {}
};
class TASKING_EXPORT Repeat final : public Loop
{
public:
Repeat(int count) : Loop([count](int index) { return index < count; }) {}
};
class TASKING_EXPORT StorageBase
{
private:
@@ -157,6 +184,8 @@ public:
: m_type(Type::Storage)
, m_storageList{storage} {}
GroupItem(const Loop &loop) : GroupItem(GroupData{{}, {}, {}, loop}) {}
// TODO: Add tests.
GroupItem(const QList<GroupItem> &children) : m_type(Type::List) { addChildren(children); }
GroupItem(std::initializer_list<GroupItem> children) : m_type(Type::List) { addChildren(children); }
@@ -186,6 +215,7 @@ protected:
GroupHandler m_groupHandler = {};
std::optional<int> m_parallelLimit = {};
std::optional<WorkflowPolicy> m_workflowPolicy = {};
std::optional<Loop> m_loop = {};
};
enum class Type {