From 52e0863bd2e47ee9915e177eedfb15401e835d29 Mon Sep 17 00:00:00 2001 From: Jarek Kobus Date: Mon, 8 Jan 2024 14:39:11 +0100 Subject: [PATCH] TaskTree: Implement simple Loop functionality Task-number: QTCREATORBUG-30081 Change-Id: I961d9c476d2af8742e461f92313ff0161d9de522 Reviewed-by: hjk --- src/libs/solutions/tasking/tasktree.cpp | 225 ++++++++++++++++++------ src/libs/solutions/tasking/tasktree.h | 30 ++++ 2 files changed, 199 insertions(+), 56 deletions(-) diff --git a/src/libs/solutions/tasking/tasktree.cpp b/src/libs/solutions/tasking/tasktree.cpp index 209d79da1ce..dd8956bc61f 100644 --- a/src/libs/solutions/tasking/tasktree.cpp +++ b/src/libs/solutions/tasking/tasktree.cpp @@ -1211,6 +1211,59 @@ static DoneWith toDoneWith(DoneResult result) return result == DoneResult::Success ? DoneWith::Success : DoneWith::Error; } +class LoopThreadData +{ + Q_DISABLE_COPY_MOVE(LoopThreadData) + +public: + LoopThreadData() = default; + void pushIteration(int index) + { + m_activeLoopStack.push_back(index); + } + void popIteration() + { + QT_ASSERT(m_activeLoopStack.size(), return); + m_activeLoopStack.pop_back(); + } + int iteration() const + { + QT_ASSERT(m_activeLoopStack.size(), qWarning( + "The referenced loop is not reachable in the running tree. " + "A -1 will be returned which might lead to a crash in the calling code. " + "It is possible that no loop was added to the tree, " + "or the loop is not reachable from where it is referenced."); return -1); + return m_activeLoopStack.last(); + } + +private: + QList m_activeLoopStack; +}; + +class LoopData +{ +public: + LoopThreadData &threadData() { + const std::lock_guard lock(m_threadDataMutex); + return m_threadDataMap.try_emplace(QThread::currentThread()).first->second; + } + + const Loop::Condition m_condition = {}; + std::mutex m_threadDataMutex = {}; + // Use std::map on purpose, so that it doesn't invalidate references on modifications. + // Don't optimize it by using std::unordered_map. + std::map m_threadDataMap = {}; +}; + +Loop::Loop(const Condition &condition) + : m_loopData(new LoopData{condition}) +{} + +int Loop::iteration() const +{ + return m_loopData->threadData().iteration(); +} + using StoragePtr = void *; class StorageThreadData @@ -1306,6 +1359,11 @@ void GroupItem::addChildren(const QList &children) qWarning("Group workflow policy redefinition, overriding...")); m_groupData.m_workflowPolicy = child.m_groupData.m_workflowPolicy; } + if (child.m_groupData.m_loop) { + QT_ASSERT(!m_groupData.m_loop, + qWarning("Group loop redefinition, overriding...")); + m_groupData.m_loop = child.m_groupData.m_loop; + } break; case Type::TaskHandler: QT_ASSERT(child.m_taskHandler.m_createHandler, @@ -1357,11 +1415,14 @@ public: ~ExecutionContextActivator() { for (int i = m_activeStorages.size() - 1; i >= 0; --i) // iterate in reverse order m_activeStorages[i].m_storageData->threadData().popStorage(); + for (int i = m_activeLoops.size() - 1; i >= 0; --i) // iterate in reverse order + m_activeLoops[i].m_loopData->threadData().popIteration(); } private: void activateContext(RuntimeIteration *iteration); void activateContext(RuntimeContainer *container); + QList m_activeLoops; QList m_activeStorages; }; @@ -1375,9 +1436,10 @@ public: TaskTreePrivate *const m_taskTreePrivate = nullptr; + const GroupItem::GroupHandler m_groupHandler; const int m_parallelLimit = 1; const WorkflowPolicy m_workflowPolicy = WorkflowPolicy::StopOnError; - const GroupItem::GroupHandler m_groupHandler; + const std::optional m_loop; const QList m_storageList; std::vector m_children; const int m_taskCount = 0; @@ -1454,6 +1516,7 @@ public: SetupResult childDone(RuntimeIteration *iteration, bool success); void stop(RuntimeContainer *container); bool invokeDoneHandler(RuntimeContainer *container, DoneWith doneWith); + bool invokeLoopHandler(RuntimeContainer *container); template > @@ -1502,8 +1565,7 @@ public: , m_isProgressive(index ? false : isProgressive(container)) , m_container(container) {} - int continueIndex() const; - int currentLimit() const; + std::optional loop() const; void deleteChild(RuntimeTask *node); const int m_iterationIndex = 0; @@ -1523,6 +1585,7 @@ public: , m_parentTask(parentTask) , m_storages(createStorages(taskContainer)) , m_successBit(initialSuccessBit(taskContainer.m_workflowPolicy)) + , m_shouldIterate(taskContainer.m_loop) {} ~RuntimeContainer() @@ -1540,6 +1603,7 @@ public: bool isStarting() const { return m_startGuard.isLocked(); } RuntimeIteration *parentIteration() const; bool updateSuccessBit(bool success); + void deleteFinishedIterations(); const ContainerNode &m_containerNode; // Not owning. RuntimeTask *m_parentTask = nullptr; // Not owning. @@ -1549,6 +1613,10 @@ public: bool m_callStorageDoneHandlersOnDestruction = false; Guard m_startGuard; + int m_iterationCount = 0; + int m_nextToStart = 0; + int m_runningChildren = 0; + bool m_shouldIterate = true; std::vector> m_iterations; // Owning. }; @@ -1571,7 +1639,11 @@ bool isProgressive(RuntimeContainer *container) void ExecutionContextActivator::activateContext(RuntimeIteration *iteration) { - // TODO: activate iterator + std::optional loop = iteration->loop(); + if (loop) { + loop->m_loopData->threadData().pushIteration(iteration->m_iterationIndex); + m_activeLoops.append(*loop); + } activateContext(iteration->m_container); } @@ -1646,18 +1718,9 @@ void TaskTreePrivate::emitDone(DoneWith result) emit q->done(result); } -int RuntimeIteration::continueIndex() const +std::optional RuntimeIteration::loop() const { - return m_doneCount ? qMin(m_doneCount + m_container->m_containerNode.m_parallelLimit - 1, - int(m_container->m_containerNode.m_children.size())) : 0; -} - -int RuntimeIteration::currentLimit() const -{ - // TODO: Handle children well - const int childCount = int(m_container->m_containerNode.m_children.size()); - return m_container->m_containerNode.m_parallelLimit - ? qMin(m_doneCount + m_container->m_containerNode.m_parallelLimit, childCount) : childCount; + return m_container->m_containerNode.m_loop; } void RuntimeIteration::deleteChild(RuntimeTask *task) @@ -1681,9 +1744,10 @@ static std::vector createChildren(TaskTreePrivate *taskTreePrivate, ContainerNode::ContainerNode(TaskTreePrivate *taskTreePrivate, const GroupItem &task) : m_taskTreePrivate(taskTreePrivate) + , m_groupHandler(task.m_groupData.m_groupHandler) , m_parallelLimit(task.m_groupData.m_parallelLimit.value_or(1)) , m_workflowPolicy(task.m_groupData.m_workflowPolicy.value_or(WorkflowPolicy::StopOnError)) - , m_groupHandler(task.m_groupData.m_groupHandler) + , m_loop(task.m_groupData.m_loop) , m_storageList(task.m_storageList) , m_children(createChildren(taskTreePrivate, task.m_children)) , m_taskCount(std::accumulate(m_children.cbegin(), m_children.cend(), 0, @@ -1725,20 +1789,34 @@ bool RuntimeContainer::updateSuccessBit(bool success) return m_successBit; } +void RuntimeContainer::deleteFinishedIterations() +{ + for (auto it = m_iterations.cbegin(); it != m_iterations.cend(); ) { + if (it->get()->m_doneCount == int(m_containerNode.m_children.size())) + it = m_iterations.erase(it); + else + ++it; + } +} + SetupResult TaskTreePrivate::start(RuntimeContainer *container) { + const ContainerNode &containerNode = container->m_containerNode; SetupResult startAction = SetupResult::Continue; - if (container->m_containerNode.m_groupHandler.m_setupHandler) { - startAction = invokeHandler(container, container->m_containerNode.m_groupHandler.m_setupHandler); + if (containerNode.m_groupHandler.m_setupHandler) { + startAction = invokeHandler(container, containerNode.m_groupHandler.m_setupHandler); if (startAction != SetupResult::Continue) { if (isProgressive(container)) - advanceProgress(container->m_containerNode.m_taskCount); + advanceProgress(containerNode.m_taskCount); // Non-Continue SetupResult takes precedence over the workflow policy. container->m_successBit = startAction == SetupResult::StopWithSuccess; } } - if (startAction == SetupResult::Continue && container->m_containerNode.m_children.empty()) + if (startAction == SetupResult::Continue + && (containerNode.m_children.empty() + || (containerNode.m_loop && !invokeLoopHandler(container)))) { startAction = toSetupResult(container->m_successBit); + } return continueStart(container, startAction); } @@ -1767,41 +1845,65 @@ SetupResult TaskTreePrivate::continueStart(RuntimeContainer *container, SetupRes SetupResult TaskTreePrivate::startChildren(RuntimeContainer *container) { - if (container->m_containerNode.m_parallelLimit == 0 && !container->m_iterations.empty()) - return SetupResult::Continue; + const ContainerNode &containerNode = container->m_containerNode; + const int childCount = int(containerNode.m_children.size()); - if (container->m_iterations.empty()) - container->m_iterations.emplace_back(std::make_unique(0, container)); + if (container->m_iterationCount == 0) { + container->m_iterations.emplace_back( + std::make_unique(container->m_iterationCount, container)); + ++container->m_iterationCount; + } else if (containerNode.m_parallelLimit == 0) { + container->deleteFinishedIterations(); + if (container->m_iterations.empty()) + return toSetupResult(container->m_successBit); + return SetupResult::Continue; + } GuardLocker locker(container->m_startGuard); - for (auto &iteration : container->m_iterations) { - for (int i = iteration->continueIndex(); i < int(container->m_containerNode.m_children.size()); ++i) { - const int limit = iteration->currentLimit(); - if (i >= limit) - break; - RuntimeTask *newTask = new RuntimeTask{container->m_containerNode.m_children.at(i), - iteration.get()}; - iteration->m_children.emplace_back(newTask); - - const SetupResult startAction = start(newTask); - if (startAction == SetupResult::Continue) - continue; - - const SetupResult finalizeAction = childDone(iteration.get(), - startAction == SetupResult::StopWithSuccess); - if (finalizeAction == SetupResult::Continue) - continue; - - if (iteration->m_isProgressive) { - int skippedTaskCount = 0; - // Skip scheduled but not run yet. The current (i) was already notified. - for (int j = i + 1; j < limit; ++j) - skippedTaskCount += container->m_containerNode.m_children.at(j).taskCount(); - advanceProgress(skippedTaskCount); + while (containerNode.m_parallelLimit == 0 + || container->m_runningChildren < containerNode.m_parallelLimit) { + container->deleteFinishedIterations(); + if (container->m_nextToStart == childCount) { + if (container->m_shouldIterate && invokeLoopHandler(container)) { + container->m_nextToStart = 0; + container->m_iterations.emplace_back( + std::make_unique(container->m_iterationCount, container)); + ++container->m_iterationCount; + } else { + if (container->m_iterations.empty()) + return toSetupResult(container->m_successBit); + return SetupResult::Continue; } - return finalizeAction; } + RuntimeIteration *iteration = container->m_iterations.back().get(); + RuntimeTask *newTask = new RuntimeTask{containerNode.m_children.at(container->m_nextToStart), + iteration}; + iteration->m_children.emplace_back(newTask); + ++container->m_runningChildren; + ++container->m_nextToStart; + + const SetupResult startAction = start(newTask); + if (startAction == SetupResult::Continue) + continue; + + const SetupResult finalizeAction = childDone(iteration, + startAction == SetupResult::StopWithSuccess); + if (finalizeAction == SetupResult::Continue) + continue; + + if (iteration->m_isProgressive) { + int skippedTaskCount = 0; + // Skip scheduled but not run yet. + // The current m_nextToStart was already notified -> thus -1. + const int limit = containerNode.m_parallelLimit + ? qMin(iteration->m_doneCount + containerNode.m_parallelLimit - 1, childCount) + : childCount; + for (int i = container->m_nextToStart; i < limit; ++i) + skippedTaskCount += containerNode.m_children.at(i).taskCount(); + advanceProgress(skippedTaskCount); + } + return finalizeAction; } return SetupResult::Continue; } @@ -1817,10 +1919,10 @@ SetupResult TaskTreePrivate::childDone(RuntimeIteration *iteration, bool success stop(container); ++iteration->m_doneCount; + --container->m_runningChildren; const bool updatedSuccess = container->updateSuccessBit(success); - const SetupResult startAction - = (shouldStop || iteration->m_doneCount == int(container->m_containerNode.m_children.size())) - ? toSetupResult(updatedSuccess) : SetupResult::Continue; + const SetupResult startAction = shouldStop ? toSetupResult(updatedSuccess) + : SetupResult::Continue; if (container->isStarting()) return startAction; @@ -1829,15 +1931,16 @@ SetupResult TaskTreePrivate::childDone(RuntimeIteration *iteration, bool success void TaskTreePrivate::stop(RuntimeContainer *container) { + const ContainerNode &containerNode = container->m_containerNode; for (auto &iteration : container->m_iterations) { for (auto &child : iteration->m_children) stop(child.get()); - if (iteration->m_isProgressive) { + if (iteration->m_isProgressive && containerNode.m_parallelLimit > 0) { int skippedTaskCount = 0; - for (int i = iteration->currentLimit(); - i < int(container->m_containerNode.m_children.size()); ++i) { - skippedTaskCount += container->m_containerNode.m_children.at(i).taskCount(); + for (int i = iteration->m_doneCount + containerNode.m_parallelLimit; + i < int(containerNode.m_children.size()); ++i) { + skippedTaskCount += containerNode.m_children.at(i).taskCount(); } advanceProgress(skippedTaskCount); } @@ -1862,6 +1965,16 @@ bool TaskTreePrivate::invokeDoneHandler(RuntimeContainer *container, DoneWith do return result == DoneResult::Success; } +bool TaskTreePrivate::invokeLoopHandler(RuntimeContainer *container) +{ + if (container->m_shouldIterate) { + container->m_shouldIterate = invokeHandler(container, + container->m_containerNode.m_loop->m_loopData->m_condition, + container->m_iterationCount); + } + return container->m_shouldIterate; +} + SetupResult TaskTreePrivate::start(RuntimeTask *node) { if (!node->m_taskNode.isTask()) { diff --git a/src/libs/solutions/tasking/tasktree.h b/src/libs/solutions/tasking/tasktree.h index 5d600268c23..2a20980e971 100644 --- a/src/libs/solutions/tasking/tasktree.h +++ b/src/libs/solutions/tasking/tasktree.h @@ -76,6 +76,7 @@ Q_ENUM_NS(CallDoneIf); TASKING_EXPORT DoneResult toDoneResult(bool success); +class LoopData; class StorageData; class TaskTreePrivate; @@ -96,6 +97,32 @@ protected: virtual void start() = 0; }; +class TASKING_EXPORT Loop +{ +public: + using Condition = std::function; // Takes iteration, called prior to each iteration. + + Loop(const Condition &condition); + int iteration() const; + +private: + friend class ExecutionContextActivator; + friend class TaskTreePrivate; + QSharedPointer m_loopData; +}; + +class TASKING_EXPORT Forever final : public Loop +{ +public: + Forever() : Loop([](int) { return true; }) {} +}; + +class TASKING_EXPORT Repeat final : public Loop +{ +public: + Repeat(int count) : Loop([count](int index) { return index < count; }) {} +}; + class TASKING_EXPORT StorageBase { private: @@ -157,6 +184,8 @@ public: : m_type(Type::Storage) , m_storageList{storage} {} + GroupItem(const Loop &loop) : GroupItem(GroupData{{}, {}, {}, loop}) {} + // TODO: Add tests. GroupItem(const QList &children) : m_type(Type::List) { addChildren(children); } GroupItem(std::initializer_list children) : m_type(Type::List) { addChildren(children); } @@ -186,6 +215,7 @@ protected: GroupHandler m_groupHandler = {}; std::optional m_parallelLimit = {}; std::optional m_workflowPolicy = {}; + std::optional m_loop = {}; }; enum class Type {