TaskTree: Fix TimeoutTask ordering

Make ordering separate for each thread.
Add stress test for it.

Change-Id: Idb42ce2511b18c0e9dd4dcb216ca39b35b5c980e
Reviewed-by: hjk <hjk@qt.io>
Reviewed-by: Qt CI Bot <qt_ci_bot@qt-project.org>
This commit is contained in:
Jarek Kobus
2023-11-13 13:01:55 +01:00
parent 43f1281674
commit 73bff6d497
2 changed files with 102 additions and 58 deletions

View File

@@ -2672,46 +2672,51 @@ struct TimerData
TimeoutCallback m_callback; TimeoutCallback m_callback;
}; };
QMutex s_mutex; struct TimerThreadData
std::atomic_int s_timerId = 0; {
QHash<int, TimerData> s_timerIdToTimerData = {}; Q_DISABLE_COPY_MOVE(TimerThreadData)
QMultiMap<system_clock::time_point, int> s_deadlineToTimerId = {};
QHash<int, TimerData> m_timerIdToTimerData = {};
QMultiMap<system_clock::time_point, int> m_deadlineToTimerId = {};
int m_timerIdCounter = 0;
};
// Please note the thread_local keyword below guarantees a separate instance per thread.
static thread_local TimerThreadData s_threadTimerData = {};
static QList<TimerData> prepareForActivation(int timerId) static QList<TimerData> prepareForActivation(int timerId)
{ {
QMutexLocker lock(&s_mutex); const auto it = s_threadTimerData.m_timerIdToTimerData.constFind(timerId);
const auto it = s_timerIdToTimerData.constFind(timerId); if (it == s_threadTimerData.m_timerIdToTimerData.cend())
if (it == s_timerIdToTimerData.cend())
return {}; // the timer was already activated return {}; // the timer was already activated
const system_clock::time_point deadline = it->m_deadline; const system_clock::time_point deadline = it->m_deadline;
QList<TimerData> toActivate; QList<TimerData> toActivate;
auto itMap = s_deadlineToTimerId.cbegin(); auto itMap = s_threadTimerData.m_deadlineToTimerId.cbegin();
while (itMap != s_deadlineToTimerId.cend()) { while (itMap != s_threadTimerData.m_deadlineToTimerId.cend()) {
if (itMap.key() > deadline) if (itMap.key() > deadline)
break; break;
const auto it = s_timerIdToTimerData.constFind(itMap.value()); const auto it = s_threadTimerData.m_timerIdToTimerData.constFind(itMap.value());
if (it != s_timerIdToTimerData.cend()) { if (it != s_threadTimerData.m_timerIdToTimerData.cend()) {
toActivate.append(it.value()); toActivate.append(it.value());
s_timerIdToTimerData.erase(it); s_threadTimerData.m_timerIdToTimerData.erase(it);
} }
itMap = s_deadlineToTimerId.erase(itMap); itMap = s_threadTimerData.m_deadlineToTimerId.erase(itMap);
} }
return toActivate; return toActivate;
} }
static void removeTimerId(int timerId) static void removeTimerId(int timerId)
{ {
QMutexLocker lock(&s_mutex); const auto it = s_threadTimerData.m_timerIdToTimerData.constFind(timerId);
const auto it = s_timerIdToTimerData.constFind(timerId); QT_ASSERT(it != s_threadTimerData.m_timerIdToTimerData.cend(),
QT_ASSERT(it != s_timerIdToTimerData.cend(),
qWarning("Removing active timerId failed."); return); qWarning("Removing active timerId failed."); return);
const system_clock::time_point deadline = it->m_deadline; const system_clock::time_point deadline = it->m_deadline;
s_timerIdToTimerData.erase(it); s_threadTimerData.m_timerIdToTimerData.erase(it);
const int removedCount = s_deadlineToTimerId.remove(deadline, timerId); const int removedCount = s_threadTimerData.m_deadlineToTimerId.remove(deadline, timerId);
QT_ASSERT(removedCount == 1, qWarning("Removing active timerId failed."); return); QT_ASSERT(removedCount == 1, qWarning("Removing active timerId failed."); return);
} }
@@ -2720,18 +2725,17 @@ static void handleTimeout(int timerId)
const QList<TimerData> toActivate = prepareForActivation(timerId); const QList<TimerData> toActivate = prepareForActivation(timerId);
for (const TimerData &timerData : toActivate) { for (const TimerData &timerData : toActivate) {
if (timerData.m_context) if (timerData.m_context)
QMetaObject::invokeMethod(timerData.m_context.get(), timerData.m_callback); timerData.m_callback();
} }
} }
static int scheduleTimeout(milliseconds timeout, QObject *context, const TimeoutCallback &callback) static int scheduleTimeout(milliseconds timeout, QObject *context, const TimeoutCallback &callback)
{ {
const int timerId = s_timerId.fetch_add(1) + 1; const int timerId = ++s_threadTimerData.m_timerIdCounter;
const system_clock::time_point deadline = system_clock::now() + timeout; const system_clock::time_point deadline = system_clock::now() + timeout;
QTimer::singleShot(timeout, context, [timerId] { handleTimeout(timerId); }); QTimer::singleShot(timeout, context, [timerId] { handleTimeout(timerId); });
QMutexLocker lock(&s_mutex); s_threadTimerData.m_timerIdToTimerData.emplace(timerId, TimerData{deadline, context, callback});
s_timerIdToTimerData.emplace(timerId, TimerData{deadline, context, callback}); s_threadTimerData.m_deadlineToTimerId.insert(deadline, timerId);
s_deadlineToTimerId.insert(deadline, timerId);
return timerId; return timerId;
} }

View File

@@ -394,7 +394,7 @@ static Handler toTweakDoneHandler(DoneResult result)
return result == DoneResult::Success ? Handler::TweakDoneToSuccess : Handler::TweakDoneToError; return result == DoneResult::Success ? Handler::TweakDoneToSuccess : Handler::TweakDoneToError;
} }
static TestData storageShadowing() static TestData storageShadowingData()
{ {
// This test check if storage shadowing works OK. // This test check if storage shadowing works OK.
@@ -471,6 +471,70 @@ static TestData storageShadowing()
return {storage, root, log, 0, DoneWith::Success}; return {storage, root, log, 0, DoneWith::Success};
} }
static TestData parallelData()
{
TreeStorage<CustomStorage> storage;
const auto setupTask = [storage](int taskId, milliseconds timeout) {
return [storage, taskId, timeout](TaskObject &taskObject) {
taskObject = timeout;
storage->m_log.append({taskId, Handler::Setup});
};
};
const auto setupDone = [storage](int taskId, DoneResult result = DoneResult::Success) {
return [storage, taskId, result](DoneWith doneWith) {
const Handler handler = doneWith == DoneWith::Cancel ? Handler::Canceled
: result == DoneResult::Success ? Handler::Success : Handler::Error;
storage->m_log.append({taskId, handler});
return doneWith == DoneWith::Cancel ? DoneResult::Error
: result == DoneResult::Success ? DoneResult::Success : DoneResult::Error;
};
};
const auto createTask = [storage, setupTask, setupDone](
int taskId, DoneResult result, milliseconds timeout = 0ms) {
return TestTask(setupTask(taskId, timeout), setupDone(taskId, result));
};
const auto createSuccessTask = [createTask](int taskId, milliseconds timeout = 0ms) {
return createTask(taskId, DoneResult::Success, timeout);
};
const auto groupDone = [storage](int taskId) {
return onGroupDone([storage, taskId](DoneWith result) {
storage->m_log.append({taskId, resultToGroupHandler(result)});
});
};
const Group root {
Storage(storage),
parallel,
createSuccessTask(1),
createSuccessTask(2),
createSuccessTask(3),
createSuccessTask(4),
createSuccessTask(5),
groupDone(0)
};
const Log log {
{1, Handler::Setup}, // Setup order is determined in parallel mode
{2, Handler::Setup},
{3, Handler::Setup},
{4, Handler::Setup},
{5, Handler::Setup},
{1, Handler::Success},
{2, Handler::Success},
{3, Handler::Success},
{4, Handler::Success},
{5, Handler::Success},
{0, Handler::GroupSuccess}
};
return {storage, root, log, 5, DoneWith::Success};
}
void tst_Tasking::testTree_data() void tst_Tasking::testTree_data()
{ {
QTest::addColumn<TestData>("testData"); QTest::addColumn<TestData>("testData");
@@ -792,32 +856,7 @@ void tst_Tasking::testTree_data()
QTest::newRow("Nested") << TestData{storage, root, log, 1, DoneWith::Success}; QTest::newRow("Nested") << TestData{storage, root, log, 1, DoneWith::Success};
} }
{ QTest::newRow("Parallel") << parallelData();
const Group root {
Storage(storage),
parallel,
createSuccessTask(1),
createSuccessTask(2),
createSuccessTask(3),
createSuccessTask(4),
createSuccessTask(5),
groupDone(0)
};
const Log log {
{1, Handler::Setup}, // Setup order is determined in parallel mode
{2, Handler::Setup},
{3, Handler::Setup},
{4, Handler::Setup},
{5, Handler::Setup},
{1, Handler::Success},
{2, Handler::Success},
{3, Handler::Success},
{4, Handler::Success},
{5, Handler::Success},
{0, Handler::GroupSuccess}
};
QTest::newRow("Parallel") << TestData{storage, root, log, 5, DoneWith::Success};
}
{ {
auto setupSubTree = [storage, createSuccessTask](TaskTree &taskTree) { auto setupSubTree = [storage, createSuccessTask](TaskTree &taskTree) {
@@ -2592,10 +2631,8 @@ void tst_Tasking::testTree_data()
DoneWith::Success}; DoneWith::Success};
} }
{ // This test check if storage shadowing works OK.
// This test check if storage shadowing works OK. QTest::newRow("StorageShadowing") << storageShadowingData();
QTest::newRow("StorageShadowing") << storageShadowing();
}
} }
void tst_Tasking::testTree() void tst_Tasking::testTree()
@@ -2622,13 +2659,15 @@ void tst_Tasking::testTree()
void tst_Tasking::testInThread_data() void tst_Tasking::testInThread_data()
{ {
QTest::addColumn<TestData>("testData"); QTest::addColumn<TestData>("testData");
QTest::newRow("StorageShadowing") << storageShadowing(); QTest::newRow("StorageShadowing") << storageShadowingData();
QTest::newRow("Parallel") << parallelData();
} }
struct TestResult struct TestResult
{ {
int executeCount = 0; int executeCount = 0;
ThreadResult threadResult = ThreadResult::Success; ThreadResult threadResult = ThreadResult::Success;
Log actualLog = {};
}; };
static const int s_loopCount = 1000; static const int s_loopCount = 1000;
@@ -2664,7 +2703,7 @@ static void runInThread(QPromise<TestResult> &promise, const TestData &testData)
return; return;
} }
if (actualLog != testData.expectedLog) { if (actualLog != testData.expectedLog) {
promise.addResult(TestResult{i, ThreadResult::FailOnLogCheck}); promise.addResult(TestResult{i, ThreadResult::FailOnLogCheck, actualLog});
return; return;
} }
if (result != testData.onDone) { if (result != testData.onDone) {
@@ -2672,7 +2711,7 @@ static void runInThread(QPromise<TestResult> &promise, const TestData &testData)
return; return;
} }
} }
promise.addResult(TestResult{s_loopCount, ThreadResult::Success}); promise.addResult(TestResult{s_loopCount, ThreadResult::Success, testData.expectedLog});
} }
void tst_Tasking::testInThread() void tst_Tasking::testInThread()
@@ -2685,8 +2724,9 @@ void tst_Tasking::testInThread()
const auto onDone = [testData](const ConcurrentCall<TestResult> &task) { const auto onDone = [testData](const ConcurrentCall<TestResult> &task) {
QVERIFY(task.future().resultCount()); QVERIFY(task.future().resultCount());
const TestResult result = task.result(); const TestResult result = task.result();
QCOMPARE(result.executeCount, s_loopCount); QCOMPARE(result.actualLog, testData.expectedLog);
QCOMPARE(result.threadResult, ThreadResult::Success); QCOMPARE(result.threadResult, ThreadResult::Success);
QCOMPARE(result.executeCount, s_loopCount);
}; };
QList<GroupItem> tasks = { parallel }; QList<GroupItem> tasks = { parallel };