From 221414c5e81eac71ab01e54caa7d35b5265c42d7 Mon Sep 17 00:00:00 2001 From: Jarek Kobus Date: Mon, 25 Apr 2022 20:12:43 +0200 Subject: [PATCH] QtcProcess: Implement waitFor...() in general way Task-number: QTCREATORBUG-27430 Change-Id: I34aff44258d2c7cabae0b63fe4e9ec55aa7b3b7d Reviewed-by: Reviewed-by: Marcus Tillmanns Reviewed-by: Qt CI Bot Reviewed-by: hjk --- src/libs/utils/launchersocket.cpp | 42 ++--- src/libs/utils/launchersocket.h | 12 +- src/libs/utils/qtcprocess.cpp | 292 +++++++++++++++++++++++++++++- 3 files changed, 312 insertions(+), 34 deletions(-) diff --git a/src/libs/utils/launchersocket.cpp b/src/libs/utils/launchersocket.cpp index ff262b1d468..048d118e935 100644 --- a/src/libs/utils/launchersocket.cpp +++ b/src/libs/utils/launchersocket.cpp @@ -47,10 +47,10 @@ private: const CallerHandle::SignalType m_signalType; }; -class StartedSignal : public LauncherSignal +class LauncherStartedSignal : public LauncherSignal { public: - StartedSignal(int processId) + LauncherStartedSignal(int processId) : LauncherSignal(CallerHandle::SignalType::Started) , m_processId(processId) {} int processId() const { return m_processId; } @@ -58,16 +58,16 @@ private: const int m_processId; }; -class ReadyReadSignal : public LauncherSignal +class LauncherReadyReadSignal : public LauncherSignal { public: - ReadyReadSignal(const QByteArray &stdOut, const QByteArray &stdErr) + LauncherReadyReadSignal(const QByteArray &stdOut, const QByteArray &stdErr) : LauncherSignal(CallerHandle::SignalType::ReadyRead) , m_stdOut(stdOut) , m_stdErr(stdErr) {} QByteArray stdOut() const { return m_stdOut; } QByteArray stdErr() const { return m_stdErr; } - void mergeWith(ReadyReadSignal *newSignal) { + void mergeWith(LauncherReadyReadSignal *newSignal) { m_stdOut += newSignal->stdOut(); m_stdErr += newSignal->stdErr(); } @@ -76,10 +76,10 @@ private: QByteArray m_stdErr; }; -class DoneSignal : public LauncherSignal +class LauncherDoneSignal : public LauncherSignal { public: - DoneSignal(const ProcessResultData &resultData) + LauncherDoneSignal(const ProcessResultData &resultData) : LauncherSignal(CallerHandle::SignalType::Done) , m_resultData(resultData) {} ProcessResultData resultData() const { return m_resultData; } @@ -150,14 +150,14 @@ bool CallerHandle::flushFor(SignalType signalType) case SignalType::NoSignal: break; case SignalType::Started: - handleStarted(static_cast(storedSignal)); + handleStarted(static_cast(storedSignal)); break; case SignalType::ReadyRead: - handleReadyRead(static_cast(storedSignal)); + handleReadyRead(static_cast(storedSignal)); break; case SignalType::Done: signalMatched = true; - handleDone(static_cast(storedSignal)); + handleDone(static_cast(storedSignal)); break; } delete storedSignal; @@ -173,7 +173,7 @@ bool CallerHandle::shouldFlush() const return !m_signals.isEmpty(); } -void CallerHandle::handleStarted(const StartedSignal *launcherSignal) +void CallerHandle::handleStarted(const LauncherStartedSignal *launcherSignal) { QTC_ASSERT(isCalledFromCallersThread(), return); m_processState = QProcess::Running; @@ -181,13 +181,13 @@ void CallerHandle::handleStarted(const StartedSignal *launcherSignal) emit started(m_processId); } -void CallerHandle::handleReadyRead(const ReadyReadSignal *launcherSignal) +void CallerHandle::handleReadyRead(const LauncherReadyReadSignal *launcherSignal) { QTC_ASSERT(isCalledFromCallersThread(), return); emit readyRead(launcherSignal->stdOut(), launcherSignal->stdErr()); } -void CallerHandle::handleDone(const DoneSignal *launcherSignal) +void CallerHandle::handleDone(const LauncherDoneSignal *launcherSignal) { QTC_ASSERT(isCalledFromCallersThread(), return); m_processState = QProcess::NotRunning; @@ -221,8 +221,8 @@ void CallerHandle::appendSignal(LauncherSignal *newSignal) // Merge ReadyRead signals into one. if (lastSignal->signalType() == SignalType::ReadyRead && newSignal->signalType() == SignalType::ReadyRead) { - ReadyReadSignal *lastRead = static_cast(lastSignal); - ReadyReadSignal *newRead = static_cast(newSignal); + LauncherReadyReadSignal *lastRead = static_cast(lastSignal); + LauncherReadyReadSignal *newRead = static_cast(newSignal); lastRead->mergeWith(newRead); delete newRead; return; @@ -446,7 +446,7 @@ void LauncherHandle::handleStartedPacket(const QByteArray &packetData) return; const auto packet = LauncherPacket::extractPacket(m_token, packetData); - m_callerHandle->appendSignal(new StartedSignal(packet.processId)); + m_callerHandle->appendSignal(new LauncherStartedSignal(packet.processId)); flushCaller(); } @@ -461,7 +461,7 @@ void LauncherHandle::handleReadyReadStandardOutput(const QByteArray &packetData) if (packet.standardChannel.isEmpty()) return; - m_callerHandle->appendSignal(new ReadyReadSignal(packet.standardChannel, {})); + m_callerHandle->appendSignal(new LauncherReadyReadSignal(packet.standardChannel, {})); flushCaller(); } @@ -476,7 +476,7 @@ void LauncherHandle::handleReadyReadStandardError(const QByteArray &packetData) if (packet.standardChannel.isEmpty()) return; - m_callerHandle->appendSignal(new ReadyReadSignal({}, packet.standardChannel)); + m_callerHandle->appendSignal(new LauncherReadyReadSignal({}, packet.standardChannel)); flushCaller(); } @@ -494,8 +494,8 @@ void LauncherHandle::handleDonePacket(const QByteArray &packetData) packet.error, packet.errorString }; if (!stdOut.isEmpty() || !stdErr.isEmpty()) - m_callerHandle->appendSignal(new ReadyReadSignal(stdOut, stdErr)); - m_callerHandle->appendSignal(new DoneSignal(result)); + m_callerHandle->appendSignal(new LauncherReadyReadSignal(stdOut, stdErr)); + m_callerHandle->appendSignal(new LauncherDoneSignal(result)); flushCaller(); } @@ -512,7 +512,7 @@ void LauncherHandle::handleSocketError(const QString &message) "Internal socket error: %1").arg(message); const ProcessResultData result = { 0, QProcess::NormalExit, QProcess::FailedToStart, errorString }; - m_callerHandle->appendSignal(new DoneSignal(result)); + m_callerHandle->appendSignal(new LauncherDoneSignal(result)); flushCaller(); } diff --git a/src/libs/utils/launchersocket.h b/src/libs/utils/launchersocket.h index dc49157d713..6364dc85a88 100644 --- a/src/libs/utils/launchersocket.h +++ b/src/libs/utils/launchersocket.h @@ -51,9 +51,9 @@ namespace Internal { class LauncherInterfacePrivate; class LauncherHandle; class LauncherSignal; -class StartedSignal; -class ReadyReadSignal; -class DoneSignal; +class LauncherStartedSignal; +class LauncherReadyReadSignal; +class LauncherDoneSignal; // All the methods and data fields in this class are called / accessed from the caller's thread. // Exceptions are explicitly marked. @@ -126,9 +126,9 @@ private: return tmp; } - void handleStarted(const StartedSignal *launcherSignal); - void handleReadyRead(const ReadyReadSignal *launcherSignal); - void handleDone(const DoneSignal *launcherSignal); + void handleStarted(const LauncherStartedSignal *launcherSignal); + void handleReadyRead(const LauncherReadyReadSignal *launcherSignal); + void handleDone(const LauncherDoneSignal *launcherSignal); // Lives in launcher's thread. Modified from caller's thread. LauncherHandle *m_launcherHandle = nullptr; diff --git a/src/libs/utils/qtcprocess.cpp b/src/libs/utils/qtcprocess.cpp index 504e034721c..bcf663c349a 100644 --- a/src/libs/utils/qtcprocess.cpp +++ b/src/libs/utils/qtcprocess.cpp @@ -25,6 +25,7 @@ #include "qtcprocess.h" +#include "algorithm.h" #include "commandline.h" #include "executeondestruction.h" #include "hostosinfo.h" @@ -485,6 +486,91 @@ static ProcessImpl defaultProcessImpl() return ProcessImpl::ProcessLauncher; } +enum class SignalType { + NoSignal, + Started, + ReadyRead, + Done +}; + +class ProcessInterfaceSignal +{ +public: + SignalType signalType() const { return m_signalType; } + virtual ~ProcessInterfaceSignal() = default; +protected: + ProcessInterfaceSignal(SignalType signalType) : m_signalType(signalType) {} +private: + const SignalType m_signalType; +}; + +class StartedSignal : public ProcessInterfaceSignal +{ +public: + StartedSignal(qint64 processId, qint64 applicationMainThreadId) + : ProcessInterfaceSignal(SignalType::Started) + , m_processId(processId) + , m_applicationMainThreadId(applicationMainThreadId) {} + qint64 processId() const { return m_processId; } + qint64 applicationMainThreadId() const { return m_applicationMainThreadId; } +private: + const qint64 m_processId; + const qint64 m_applicationMainThreadId; +}; + +class ReadyReadSignal : public ProcessInterfaceSignal +{ +public: + ReadyReadSignal(const QByteArray &stdOut, const QByteArray &stdErr) + : ProcessInterfaceSignal(SignalType::ReadyRead) + , m_stdOut(stdOut) + , m_stdErr(stdErr) {} + QByteArray stdOut() const { return m_stdOut; } + QByteArray stdErr() const { return m_stdErr; } + void mergeWith(ReadyReadSignal *newSignal) { + m_stdOut += newSignal->stdOut(); + m_stdErr += newSignal->stdErr(); + } +private: + QByteArray m_stdOut; + QByteArray m_stdErr; +}; + +class DoneSignal : public ProcessInterfaceSignal +{ +public: + DoneSignal(const ProcessResultData &resultData) + : ProcessInterfaceSignal(SignalType::Done) + , m_resultData(resultData) {} + ProcessResultData resultData() const { return m_resultData; } +private: + const ProcessResultData m_resultData; +}; + +class ProcessInterfaceHandler : public QObject +{ +public: + ProcessInterfaceHandler(QtcProcessPrivate *caller, ProcessInterface *process); + + // Called from caller's thread exclusively. + bool waitForSignal(int msecs, SignalType newSignal); + +private: + // Called from caller's thread exclusively. + bool doWaitForSignal(QDeadlineTimer deadline); + + // Called from caller's thread when not waiting for signal, + // otherwise called from temporary thread. + void handleStarted(qint64 processId, qint64 applicationMainThreadId); + void handleReadyRead(const QByteArray &outputData, const QByteArray &errorData); + void handleDone(const ProcessResultData &data); + void appendSignal(ProcessInterfaceSignal *newSignal); + + QtcProcessPrivate *m_caller = nullptr; + QMutex m_mutex; + QWaitCondition m_waitCondition; +}; + class QtcProcessPrivate : public QObject { public: @@ -508,14 +594,11 @@ public: void setProcessInterface(ProcessInterface *process) { m_process.reset(process); - m_process->setParent(this); + m_processHandler.reset(new ProcessInterfaceHandler(this, process)); - connect(m_process.get(), &ProcessInterface::started, - this, &QtcProcessPrivate::handleStarted); - connect(m_process.get(), &ProcessInterface::readyRead, - this, &QtcProcessPrivate::handleReadyRead); - connect(m_process.get(), &ProcessInterface::done, - this, &QtcProcessPrivate::handleDone); + // In order to move the process into another thread together with handle + m_process->setParent(m_processHandler.get()); + m_processHandler->setParent(this); } CommandLine fullCommandLine() const @@ -549,10 +632,14 @@ public: } QtcProcess *q; + std::unique_ptr m_processHandler; std::unique_ptr m_process; ProcessSetupData m_setup; void slotTimeout(); + void handleStartedSignal(const StartedSignal *launcherSignal); + void handleReadyReadSignal(const ReadyReadSignal *launcherSignal); + void handleDoneSignal(const DoneSignal *launcherSignal); void handleStarted(qint64 processId, qint64 applicationMainThreadId); void handleReadyRead(const QByteArray &outputData, const QByteArray &errorData); void handleDone(const ProcessResultData &data); @@ -567,6 +654,19 @@ public: ProcessResult interpretExitCode(int exitCode); + // === ProcessInterfaceHandler related === + // Called from caller's thread exclusively + void flush() { flushFor(SignalType::NoSignal); } + bool flushFor(SignalType signalType); + bool shouldFlush() const { QMutexLocker locker(&m_mutex); return !m_signals.isEmpty(); } + // Called from ProcessInterfaceHandler thread exclusively. + void appendSignal(ProcessInterfaceSignal *launcherSignal); + + mutable QMutex m_mutex; + QList m_signals; + // ======================================= + + QProcess::ProcessState m_state = QProcess::NotRunning; qint64 m_processId = 0; qint64 m_applicationMainThreadId = 0; @@ -596,6 +696,165 @@ public: #define CALL_STACK_GUARD() Guard guard(m_callStackGuard) +ProcessInterfaceHandler::ProcessInterfaceHandler(QtcProcessPrivate *caller, + ProcessInterface *process) + : m_caller(caller) +{ + connect(process, &ProcessInterface::started, + this, &ProcessInterfaceHandler::handleStarted); + connect(process, &ProcessInterface::readyRead, + this, &ProcessInterfaceHandler::handleReadyRead); + connect(process, &ProcessInterface::done, + this, &ProcessInterfaceHandler::handleDone); +} + +// Called from caller's thread exclusively. +bool ProcessInterfaceHandler::waitForSignal(int msecs, SignalType newSignal) +{ + QDeadlineTimer deadline(msecs); + while (true) { + if (deadline.hasExpired()) + break; + if (!doWaitForSignal(deadline)) + break; + // Matching (or Done) signal was flushed + if (m_caller->flushFor(newSignal)) + return true; + // Otherwise continue awaiting (e.g. when ReadyRead came while waitForFinished()) + } + return false; +} + +// Called from caller's thread exclusively. +bool ProcessInterfaceHandler::doWaitForSignal(QDeadlineTimer deadline) +{ + QMutexLocker locker(&m_mutex); + + // Flush, if we have any stored signals. + // This must be called when holding laucher's mutex locked prior to the call to wait, + // so that it's done atomically. + if (m_caller->shouldFlush()) + return true; + + return m_waitCondition.wait(&m_mutex, deadline); +} + +// Called from ProcessInterfaceHandler thread exclusively +void ProcessInterfaceHandler::handleStarted(qint64 processId, qint64 applicationMainThreadId) +{ + appendSignal(new StartedSignal(processId, applicationMainThreadId)); +} + +// Called from ProcessInterfaceHandler thread exclusively +void ProcessInterfaceHandler::handleReadyRead(const QByteArray &outputData, const QByteArray &errorData) +{ + appendSignal(new ReadyReadSignal(outputData, errorData)); +} + +// Called from ProcessInterfaceHandler thread exclusively +void ProcessInterfaceHandler::handleDone(const ProcessResultData &data) +{ + appendSignal(new DoneSignal(data)); +} + +void ProcessInterfaceHandler::appendSignal(ProcessInterfaceSignal *newSignal) +{ + { + QMutexLocker locker(&m_mutex); + m_caller->appendSignal(newSignal); + } + m_waitCondition.wakeOne(); + // call in callers thread + QMetaObject::invokeMethod(m_caller, &QtcProcessPrivate::flush); +} + +// Called from caller's thread exclusively +bool QtcProcessPrivate::flushFor(SignalType signalType) +{ + QList oldSignals; + { + QMutexLocker locker(&m_mutex); + const QList storedSignals = + Utils::transform(qAsConst(m_signals), [](const ProcessInterfaceSignal *aSignal) { + return aSignal->signalType(); + }); + + // If we are flushing for ReadyRead or Done - flush all. + // If we are flushing for Started: + // - if Started was buffered - flush Started only. + // - otherwise if Done signal was buffered - flush all. + const bool flushAll = (signalType != SignalType::Started) + || (!storedSignals.contains(SignalType::Started) + && storedSignals.contains(SignalType::Done)); + if (flushAll) { + oldSignals = m_signals; + m_signals = {}; + } else { + auto matchingIndex = storedSignals.lastIndexOf(signalType); + if (matchingIndex >= 0) { + oldSignals = m_signals.mid(0, matchingIndex + 1); + m_signals = m_signals.mid(matchingIndex + 1); + } + } + } + bool signalMatched = false; + for (const ProcessInterfaceSignal *storedSignal : qAsConst(oldSignals)) { + const SignalType storedSignalType = storedSignal->signalType(); + if (storedSignalType == signalType) + signalMatched = true; + switch (storedSignalType) { + case SignalType::NoSignal: + break; + case SignalType::Started: + handleStartedSignal(static_cast(storedSignal)); + break; + case SignalType::ReadyRead: + handleReadyReadSignal(static_cast(storedSignal)); + break; + case SignalType::Done: + signalMatched = true; + handleDoneSignal(static_cast(storedSignal)); + break; + } + delete storedSignal; + } + return signalMatched; +} + +// Called from ProcessInterfaceHandler thread exclusively. +void QtcProcessPrivate::appendSignal(ProcessInterfaceSignal *newSignal) +{ + QTC_ASSERT(newSignal->signalType() != SignalType::NoSignal, delete newSignal; return); + + QMutexLocker locker(&m_mutex); + + // TODO: we might assert if the caller's state is proper, e.g. + // start signal can't appear if we are in Running or NotRunning state, + // or finish signal can't appear if we are in NotRunning or Starting state, + // or readyRead signal can't appear if we are in NotRunning or Starting state, + // or error signal can't appear if we are in NotRunning state + // or FailedToStart error signal can't appear if we are in Running state + // or other than FailedToStart error signal can't appear if we are in Starting state. + if (!m_signals.isEmpty()) { + ProcessInterfaceSignal *lastSignal = m_signals.last(); + + QTC_ASSERT(lastSignal->signalType() != SignalType::Done, + qWarning() << "Buffering new signal for process" << m_setup.m_commandLine + << "while the last done() signal wasn't flushed yet."); + + // Merge ReadyRead signals into one. + if (lastSignal->signalType() == SignalType::ReadyRead + && newSignal->signalType() == SignalType::ReadyRead) { + ReadyReadSignal *lastRead = static_cast(lastSignal); + ReadyReadSignal *newRead = static_cast(newSignal); + lastRead->mergeWith(newRead); + delete newRead; + return; + } + } + m_signals.append(newSignal); +} + void QtcProcessPrivate::clearForRun() { m_hangTimerCount = 0; @@ -1237,6 +1496,10 @@ void QtcProcess::close() d->m_process->disconnect(); d->m_process.release()->deleteLater(); } + if (d->m_processHandler) { + d->m_processHandler->disconnect(); + d->m_processHandler.release()->deleteLater(); + } d->clearForRun(); } @@ -1571,6 +1834,21 @@ void QtcProcessPrivate::slotTimeout() } } +void QtcProcessPrivate::handleStartedSignal(const StartedSignal *aSignal) +{ + handleStarted(aSignal->processId(), aSignal->applicationMainThreadId()); +} + +void QtcProcessPrivate::handleReadyReadSignal(const ReadyReadSignal *aSignal) +{ + handleReadyRead(aSignal->stdOut(), aSignal->stdErr()); +} + +void QtcProcessPrivate::handleDoneSignal(const DoneSignal *aSignal) +{ + handleDone(aSignal->resultData()); +} + void QtcProcessPrivate::handleStarted(qint64 processId, qint64 applicationMainThreadId) { QTC_CHECK(m_state == QProcess::Starting);