TaskTree: Introduce ParallelLimit

The parallel limit constrains the number of parallel tasks
run in the same time. So, if e.g. a group contains 10 children
and the parallel limit is 6, only first 6 tasks are being started
on the beginning and the rest 4 are being postponed until some
running tasks are finished. So, when the one of 6 running tasks
finishes the group starts the 7th task and so on.

Setting parallel limit to 1 means sequential invocation in fact.

The value of 0 means there is no limit and all tasks are run at once.

Remove the ExecuteMode enum, as this is modelled now by the
parallelLimit.

Change-Id: Ice59318be0915401f05bb5a5804078bdc591d09f
Reviewed-by: hjk <hjk@qt.io>
This commit is contained in:
Jarek Kobus
2023-01-06 16:07:16 +01:00
parent fbb8d94e55
commit b6208ab34a
5 changed files with 49 additions and 51 deletions

View File

@@ -67,8 +67,8 @@ void TreeStorageBase::activateStorage(int id)
m_storageData->m_activeStorage = id; m_storageData->m_activeStorage = id;
} }
Execute sequential(ExecuteMode::Sequential); ParallelLimit sequential(1);
Execute parallel(ExecuteMode::Parallel); ParallelLimit parallel(0);
Workflow stopOnError(WorkflowPolicy::StopOnError); Workflow stopOnError(WorkflowPolicy::StopOnError);
Workflow continueOnError(WorkflowPolicy::ContinueOnError); Workflow continueOnError(WorkflowPolicy::ContinueOnError);
Workflow stopOnDone(WorkflowPolicy::StopOnDone); Workflow stopOnDone(WorkflowPolicy::StopOnDone);
@@ -83,10 +83,10 @@ void TaskItem::addChildren(const QList<TaskItem> &children)
case Type::Group: case Type::Group:
m_children.append(child); m_children.append(child);
break; break;
case Type::Mode: case Type::Limit:
QTC_ASSERT(m_type == Type::Group, QTC_ASSERT(m_type == Type::Group,
qWarning("Mode may only be a child of Group, skipping..."); return); qWarning("Mode may only be a child of Group, skipping..."); return);
m_executeMode = child.m_executeMode; // TODO: Assert on redefinition? m_parallelLimit = child.m_parallelLimit; // TODO: Assert on redefinition?
break; break;
case Type::Policy: case Type::Policy:
QTC_ASSERT(m_type == Type::Group, QTC_ASSERT(m_type == Type::Group,
@@ -161,7 +161,7 @@ public:
TaskTreePrivate *m_taskTreePrivate = nullptr; TaskTreePrivate *m_taskTreePrivate = nullptr;
TaskContainer *m_parentContainer = nullptr; TaskContainer *m_parentContainer = nullptr;
const ExecuteMode m_executeMode = ExecuteMode::Parallel; const int m_parallelLimit = 1;
WorkflowPolicy m_workflowPolicy = WorkflowPolicy::StopOnError; WorkflowPolicy m_workflowPolicy = WorkflowPolicy::StopOnError;
const TaskItem::GroupHandler m_groupHandler; const TaskItem::GroupHandler m_groupHandler;
QList<TreeStorageBase> m_storageList; QList<TreeStorageBase> m_storageList;
@@ -314,7 +314,7 @@ TaskContainer::TaskContainer(TaskTreePrivate *taskTreePrivate, TaskContainer *pa
const TaskItem &task) const TaskItem &task)
: m_taskTreePrivate(taskTreePrivate) : m_taskTreePrivate(taskTreePrivate)
, m_parentContainer(parentContainer) , m_parentContainer(parentContainer)
, m_executeMode(task.executeMode()) , m_parallelLimit(task.parallelLimit())
, m_workflowPolicy(task.workflowPolicy()) , m_workflowPolicy(task.workflowPolicy())
, m_groupHandler(task.groupHandler()) , m_groupHandler(task.groupHandler())
, m_storageList(taskTreePrivate->addStorages(task.storageList())) , m_storageList(taskTreePrivate->addStorages(task.storageList()))
@@ -353,7 +353,8 @@ void TaskContainer::start()
} }
} }
if (m_groupConfig.action == GroupAction::StopWithDone || m_groupConfig.action == GroupAction::StopWithError) { if (m_groupConfig.action == GroupAction::StopWithDone
|| m_groupConfig.action == GroupAction::StopWithError) {
const bool success = m_groupConfig.action == GroupAction::StopWithDone; const bool success = m_groupConfig.action == GroupAction::StopWithDone;
const int skippedTaskCount = taskCount(); const int skippedTaskCount = taskCount();
m_taskTreePrivate->advanceProgress(skippedTaskCount); m_taskTreePrivate->advanceProgress(skippedTaskCount);
@@ -369,17 +370,12 @@ void TaskContainer::start()
return; return;
} }
m_currentIndex = 0;
resetSuccessBit(); resetSuccessBit();
if (m_executeMode == ExecuteMode::Sequential) { const int childCount = m_selectedChildren.size();
m_selectedChildren.at(m_currentIndex)->start(); const int startCount = m_parallelLimit ? qMin(m_parallelLimit, childCount) : childCount;
return; for (int i = 0; i < startCount; ++i) {
} if (!m_selectedChildren.at(i)->start()) // TODO: take m_groupConfig.action into account
// Parallel case
for (TaskNode *child : std::as_const(m_selectedChildren)) {
if (!child->start())
return; return;
} }
} }
@@ -406,9 +402,9 @@ void TaskContainer::stop()
if (!isRunning()) if (!isRunning())
return; return;
if (m_executeMode == ExecuteMode::Sequential) { if (m_parallelLimit) { // skip not started tasks
int skippedTaskCount = 0; int skippedTaskCount = 0;
for (int i = m_currentIndex + 1; i < m_selectedChildren.size(); ++i) for (int i = m_currentIndex + m_parallelLimit; i < m_selectedChildren.size(); ++i)
skippedTaskCount += m_selectedChildren.at(i)->taskCount(); skippedTaskCount += m_selectedChildren.at(i)->taskCount();
m_taskTreePrivate->advanceProgress(skippedTaskCount); m_taskTreePrivate->advanceProgress(skippedTaskCount);
} }
@@ -445,8 +441,12 @@ void TaskContainer::childDone(bool success)
return; return;
} }
if (m_executeMode == ExecuteMode::Sequential) if (m_parallelLimit == 0)
m_selectedChildren.at(m_currentIndex)->start(); return;
const int nextIndexToRun = m_currentIndex + m_parallelLimit - 1;
if (nextIndexToRun < m_selectedChildren.size())
m_selectedChildren.at(nextIndexToRun)->start();
} }
void TaskContainer::invokeEndHandler(bool success, bool propagateToParent) void TaskContainer::invokeEndHandler(bool success, bool propagateToParent)

View File

@@ -85,11 +85,6 @@ private:
} }
}; };
enum class ExecuteMode {
Sequential, // default
Parallel
};
// 4 policies: // 4 policies:
// 1. When all children finished with done -> report done, otherwise: // 1. When all children finished with done -> report done, otherwise:
// a) Report error on first error and stop executing other children (including their subtree) // a) Report error on first error and stop executing other children (including their subtree)
@@ -134,8 +129,6 @@ public:
using GroupSimpleHandler = std::function<void()>; using GroupSimpleHandler = std::function<void()>;
// Called when group entered // Called when group entered
using GroupSetupHandler = std::function<GroupConfig()>; using GroupSetupHandler = std::function<GroupConfig()>;
// Called after group ended with success or failure, passed set of successful children
// using GroupEndHandler = std::function<void(const QSet<int> &)>;
struct TaskHandler { struct TaskHandler {
TaskCreateHandler m_createHandler; TaskCreateHandler m_createHandler;
@@ -151,7 +144,7 @@ public:
GroupSetupHandler m_dynamicSetupHandler = {}; GroupSetupHandler m_dynamicSetupHandler = {};
}; };
ExecuteMode executeMode() const { return m_executeMode; } int parallelLimit() const { return m_parallelLimit; }
WorkflowPolicy workflowPolicy() const { return m_workflowPolicy; } WorkflowPolicy workflowPolicy() const { return m_workflowPolicy; }
TaskHandler taskHandler() const { return m_taskHandler; } TaskHandler taskHandler() const { return m_taskHandler; }
GroupHandler groupHandler() const { return m_groupHandler; } GroupHandler groupHandler() const { return m_groupHandler; }
@@ -162,16 +155,16 @@ protected:
enum class Type { enum class Type {
Group, Group,
Storage, Storage,
Mode, Limit,
Policy, Policy,
TaskHandler, TaskHandler,
GroupHandler GroupHandler
}; };
TaskItem() = default; TaskItem() = default;
TaskItem(ExecuteMode mode) TaskItem(int parallelLimit)
: m_type(Type::Mode) : m_type(Type::Limit)
, m_executeMode(mode) {} , m_parallelLimit(parallelLimit) {}
TaskItem(WorkflowPolicy policy) TaskItem(WorkflowPolicy policy)
: m_type(Type::Policy) : m_type(Type::Policy)
, m_workflowPolicy(policy) {} , m_workflowPolicy(policy) {}
@@ -188,7 +181,7 @@ protected:
private: private:
Type m_type = Type::Group; Type m_type = Type::Group;
ExecuteMode m_executeMode = ExecuteMode::Sequential; int m_parallelLimit = 1; // 0 means unlimited
WorkflowPolicy m_workflowPolicy = WorkflowPolicy::StopOnError; WorkflowPolicy m_workflowPolicy = WorkflowPolicy::StopOnError;
TaskHandler m_taskHandler; TaskHandler m_taskHandler;
GroupHandler m_groupHandler; GroupHandler m_groupHandler;
@@ -209,10 +202,10 @@ public:
Storage(const TreeStorageBase &storage) : TaskItem(storage) { } Storage(const TreeStorageBase &storage) : TaskItem(storage) { }
}; };
class QTCREATOR_UTILS_EXPORT Execute : public TaskItem class QTCREATOR_UTILS_EXPORT ParallelLimit : public TaskItem
{ {
public: public:
Execute(ExecuteMode mode) : TaskItem(mode) {} ParallelLimit(int parallelLimit) : TaskItem(qMax(parallelLimit, 0)) {}
}; };
class QTCREATOR_UTILS_EXPORT Workflow : public TaskItem class QTCREATOR_UTILS_EXPORT Workflow : public TaskItem
@@ -245,8 +238,8 @@ public:
DynamicSetup(const GroupSetupHandler &handler) : TaskItem({{}, {}, {}, handler}) {} DynamicSetup(const GroupSetupHandler &handler) : TaskItem({{}, {}, {}, handler}) {}
}; };
QTCREATOR_UTILS_EXPORT extern Execute sequential; QTCREATOR_UTILS_EXPORT extern ParallelLimit sequential;
QTCREATOR_UTILS_EXPORT extern Execute parallel; QTCREATOR_UTILS_EXPORT extern ParallelLimit parallel;
QTCREATOR_UTILS_EXPORT extern Workflow stopOnError; QTCREATOR_UTILS_EXPORT extern Workflow stopOnError;
QTCREATOR_UTILS_EXPORT extern Workflow continueOnError; QTCREATOR_UTILS_EXPORT extern Workflow continueOnError;
QTCREATOR_UTILS_EXPORT extern Workflow stopOnDone; QTCREATOR_UTILS_EXPORT extern Workflow stopOnDone;

View File

@@ -102,7 +102,7 @@ int main(int argc, char *argv[])
groupTask_1->setWorkflowPolicy(Tasking::WorkflowPolicy::ContinueOnDone); groupTask_1->setWorkflowPolicy(Tasking::WorkflowPolicy::ContinueOnDone);
groupTask_4->setWorkflowPolicy(Tasking::WorkflowPolicy::Optional); groupTask_4->setWorkflowPolicy(Tasking::WorkflowPolicy::Optional);
groupTask_4_3->setExecuteMode(Tasking::ExecuteMode::Parallel); groupTask_4_3->setExecuteMode(ExecuteMode::Parallel);
groupTask_4_3->setWorkflowPolicy(Tasking::WorkflowPolicy::StopOnError); groupTask_4_3->setWorkflowPolicy(Tasking::WorkflowPolicy::StopOnError);
// Task layout // Task layout
@@ -170,14 +170,14 @@ int main(int argc, char *argv[])
}; };
const Group root { const Group root {
Execute(rootGroup->executeMode()), rootGroup->executeMode(),
Workflow(rootGroup->workflowPolicy()), Workflow(rootGroup->workflowPolicy()),
OnGroupSetup([rootGroup] { rootGroup->setState(State::Running); }), OnGroupSetup([rootGroup] { rootGroup->setState(State::Running); }),
OnGroupDone([rootGroup] { rootGroup->setState(State::Done); }), OnGroupDone([rootGroup] { rootGroup->setState(State::Done); }),
OnGroupError([rootGroup] { rootGroup->setState(State::Error); }), OnGroupError([rootGroup] { rootGroup->setState(State::Error); }),
Group { Group {
Execute(groupTask_1->executeMode()), groupTask_1->executeMode(),
Workflow(groupTask_1->workflowPolicy()), Workflow(groupTask_1->workflowPolicy()),
OnGroupSetup([groupTask_1] { groupTask_1->setState(State::Running); }), OnGroupSetup([groupTask_1] { groupTask_1->setState(State::Running); }),
OnGroupDone([groupTask_1] { groupTask_1->setState(State::Done); }), OnGroupDone([groupTask_1] { groupTask_1->setState(State::Done); }),
@@ -190,7 +190,7 @@ int main(int argc, char *argv[])
taskItem(task_2), taskItem(task_2),
taskItem(task_3), taskItem(task_3),
Group { Group {
Execute(groupTask_4->executeMode()), groupTask_4->executeMode(),
Workflow(groupTask_4->workflowPolicy()), Workflow(groupTask_4->workflowPolicy()),
OnGroupSetup([groupTask_4] { groupTask_4->setState(State::Running); }), OnGroupSetup([groupTask_4] { groupTask_4->setState(State::Running); }),
OnGroupDone([groupTask_4] { groupTask_4->setState(State::Done); }), OnGroupDone([groupTask_4] { groupTask_4->setState(State::Done); }),
@@ -199,7 +199,7 @@ int main(int argc, char *argv[])
taskItem(task_4_1), taskItem(task_4_1),
taskItem(task_4_2), taskItem(task_4_2),
Group { Group {
Execute(groupTask_4_3->executeMode()), groupTask_4_3->executeMode(),
Workflow(groupTask_4_3->workflowPolicy()), Workflow(groupTask_4_3->workflowPolicy()),
OnGroupSetup([groupTask_4_3] { groupTask_4_3->setState(State::Running); }), OnGroupSetup([groupTask_4_3] { groupTask_4_3->setState(State::Running); }),
OnGroupDone([groupTask_4_3] { groupTask_4_3->setState(State::Done); }), OnGroupDone([groupTask_4_3] { groupTask_4_3->setState(State::Done); }),

View File

@@ -121,11 +121,11 @@ GroupWidget::GroupWidget()
{ {
m_stateIndicator->setFixedWidth(30); m_stateIndicator->setFixedWidth(30);
m_executeCombo->addItem("Sequential", (int)Tasking::ExecuteMode::Sequential); m_executeCombo->addItem("Sequential", (int)ExecuteMode::Sequential);
m_executeCombo->addItem("Parallel", (int)Tasking::ExecuteMode::Parallel); m_executeCombo->addItem("Parallel", (int)ExecuteMode::Parallel);
updateExecuteMode(); updateExecuteMode();
connect(m_executeCombo, &QComboBox::currentIndexChanged, this, [this](int index) { connect(m_executeCombo, &QComboBox::currentIndexChanged, this, [this](int index) {
m_executeMode = (Tasking::ExecuteMode)m_executeCombo->itemData(index).toInt(); m_executeMode = (ExecuteMode)m_executeCombo->itemData(index).toInt();
}); });
m_workflowCombo->addItem("Stop On Error", (int)Tasking::WorkflowPolicy::StopOnError); m_workflowCombo->addItem("Stop On Error", (int)Tasking::WorkflowPolicy::StopOnError);
@@ -152,7 +152,7 @@ GroupWidget::GroupWidget()
setSizePolicy(QSizePolicy::Fixed, QSizePolicy::Preferred); setSizePolicy(QSizePolicy::Fixed, QSizePolicy::Preferred);
} }
void GroupWidget::setExecuteMode(Tasking::ExecuteMode mode) void GroupWidget::setExecuteMode(ExecuteMode mode)
{ {
m_executeMode = mode; m_executeMode = mode;
updateExecuteMode(); updateExecuteMode();
@@ -163,9 +163,9 @@ void GroupWidget::updateExecuteMode()
m_executeCombo->setCurrentIndex(m_executeCombo->findData((int)m_executeMode)); m_executeCombo->setCurrentIndex(m_executeCombo->findData((int)m_executeMode));
} }
Tasking::ExecuteMode GroupWidget::executeMode() const Tasking::ParallelLimit GroupWidget::executeMode() const
{ {
return m_executeMode; return m_executeMode == ExecuteMode::Sequential ? Tasking::sequential : Tasking::parallel;
} }
void GroupWidget::setWorkflowPolicy(Tasking::WorkflowPolicy policy) void GroupWidget::setWorkflowPolicy(Tasking::WorkflowPolicy policy)

View File

@@ -22,6 +22,11 @@ enum class State {
Error Error
}; };
enum class ExecuteMode {
Sequential, // default
Parallel
};
class StateWidget : public QWidget class StateWidget : public QWidget
{ {
public: public:
@@ -54,8 +59,8 @@ class GroupWidget : public StateWidget
public: public:
GroupWidget(); GroupWidget();
void setExecuteMode(Utils::Tasking::ExecuteMode mode); void setExecuteMode(ExecuteMode mode);
Utils::Tasking::ExecuteMode executeMode() const; Utils::Tasking::ParallelLimit executeMode() const;
void setWorkflowPolicy(Utils::Tasking::WorkflowPolicy policy); void setWorkflowPolicy(Utils::Tasking::WorkflowPolicy policy);
Utils::Tasking::WorkflowPolicy workflowPolicy() const; Utils::Tasking::WorkflowPolicy workflowPolicy() const;
@@ -67,7 +72,7 @@ private:
QComboBox *m_executeCombo = nullptr; QComboBox *m_executeCombo = nullptr;
QComboBox *m_workflowCombo = nullptr; QComboBox *m_workflowCombo = nullptr;
Utils::Tasking::ExecuteMode m_executeMode = Utils::Tasking::ExecuteMode::Sequential; ExecuteMode m_executeMode = ExecuteMode::Sequential;
Utils::Tasking::WorkflowPolicy m_workflowPolicy = Utils::Tasking::WorkflowPolicy::StopOnError; Utils::Tasking::WorkflowPolicy m_workflowPolicy = Utils::Tasking::WorkflowPolicy::StopOnError;
}; };