diff --git a/src/libs/utils/CMakeLists.txt b/src/libs/utils/CMakeLists.txt index e8adad413e4..e296f2857a3 100644 --- a/src/libs/utils/CMakeLists.txt +++ b/src/libs/utils/CMakeLists.txt @@ -56,6 +56,7 @@ add_qtc_library(Utils filepathinfo.h filesearch.cpp filesearch.h filestreamer.cpp filestreamer.h + filestreamermanager.cpp filestreamermanager.h filesystemmodel.cpp filesystemmodel.h filesystemwatcher.cpp filesystemwatcher.h fileutils.cpp fileutils.h diff --git a/src/libs/utils/filestreamermanager.cpp b/src/libs/utils/filestreamermanager.cpp new file mode 100644 index 00000000000..5a1ec9847ea --- /dev/null +++ b/src/libs/utils/filestreamermanager.cpp @@ -0,0 +1,200 @@ +// Copyright (C) 2023 The Qt Company Ltd. +// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +#include "filestreamermanager.h" + +#include "filestreamer.h" +#include "threadutils.h" +#include "utilstr.h" + +#include +#include +#include +#include + +#include + +namespace Utils { + +// TODO: destruct the instance before destructing ProjectExplorer::DeviceManager (?) + +static FileStreamHandle generateUniqueHandle() +{ + static std::atomic_int handleCounter = 1; + return FileStreamHandle(handleCounter.fetch_add(1)); +} + +static QMutex s_mutex = {}; +static QWaitCondition s_waitCondition = {}; +static std::unordered_map s_fileStreamers = {}; + +static void addStreamer(FileStreamHandle handle, FileStreamer *streamer) +{ + QMutexLocker locker(&s_mutex); + const bool added = s_fileStreamers.try_emplace(handle, streamer).second; + QTC_CHECK(added); +} + +static void removeStreamer(FileStreamHandle handle) +{ + QMutexLocker locker(&s_mutex); + auto it = s_fileStreamers.find(handle); + QTC_ASSERT(it != s_fileStreamers.end(), return); + QTC_ASSERT(QThread::currentThread() == it->second->thread(), return); + s_fileStreamers.erase(it); + s_waitCondition.wakeAll(); +} + +static void deleteStreamer(FileStreamHandle handle) +{ + QMutexLocker locker(&s_mutex); + auto it = s_fileStreamers.find(handle); + if (it != s_fileStreamers.end()) + return; + if (QThread::currentThread() == it->second->thread()) { + delete it->second; + s_fileStreamers.erase(it); + s_waitCondition.wakeAll(); + } else { + QMetaObject::invokeMethod(it->second, [handle] { + deleteStreamer(handle); + }); + s_waitCondition.wait(&s_mutex); + QTC_CHECK(s_fileStreamers.find(handle) == s_fileStreamers.end()); + } +} + +static void deleteAllStreamers() +{ + QMutexLocker locker(&s_mutex); + QTC_ASSERT(Utils::isMainThread(), return); + while (s_fileStreamers.size()) { + auto it = s_fileStreamers.begin(); + if (QThread::currentThread() == it->second->thread()) { + delete it->second; + s_fileStreamers.erase(it); + s_waitCondition.wakeAll(); + } else { + const FileStreamHandle handle = it->first; + QMetaObject::invokeMethod(it->second, [handle] { + deleteStreamer(handle); + }); + s_waitCondition.wait(&s_mutex); + QTC_CHECK(s_fileStreamers.find(handle) == s_fileStreamers.end()); + } + } +} + +static FileStreamHandle checkHandle(FileStreamHandle handle) +{ + QMutexLocker locker(&s_mutex); + return s_fileStreamers.find(handle) != s_fileStreamers.end() ? handle : FileStreamHandle(0); +} + +FileStreamHandle execute(const std::function &onSetup, + const std::function &onDone, + QObject *context) +{ + FileStreamer *streamer = new FileStreamer; + onSetup(streamer); + const FileStreamHandle handle = generateUniqueHandle(); + QTC_CHECK(context == nullptr || context->thread() == QThread::currentThread()); + QObject *finalContext = context ? context : streamer; + QObject::connect(streamer, &FileStreamer::done, finalContext, [=] { + if (onDone) + onDone(streamer); + removeStreamer(handle); + streamer->deleteLater(); + }); + addStreamer(handle, streamer); + streamer->start(); + return checkHandle(handle); // The handle could have been already removed +} + +FileStreamHandle FileStreamerManager::copy(const FilePath &source, const FilePath &destination, + const CopyContinuation &cont) +{ + return copy(source, destination, nullptr, cont); +} + +FileStreamHandle FileStreamerManager::copy(const FilePath &source, const FilePath &destination, + QObject *context, const CopyContinuation &cont) +{ + const auto onSetup = [=](FileStreamer *streamer) { + streamer->setSource(source); + streamer->setDestination(destination); + }; + if (!cont) + return execute(onSetup, {}, context); + + const auto onDone = [=](FileStreamer *streamer) { + if (streamer->result() == StreamResult::FinishedWithSuccess) + cont({}); + else + cont(make_unexpected(Tr::tr("Failed copying file"))); + }; + return execute(onSetup, onDone, context); +} + +FileStreamHandle FileStreamerManager::read(const FilePath &source, const ReadContinuation &cont) +{ + return read(source, nullptr, cont); +} + +FileStreamHandle FileStreamerManager::read(const FilePath &source, QObject *context, + const ReadContinuation &cont) +{ + const auto onSetup = [=](FileStreamer *streamer) { + streamer->setStreamMode(StreamMode::Reader); + streamer->setSource(source); + }; + if (!cont) + return execute(onSetup, {}, context); + + const auto onDone = [=](FileStreamer *streamer) { + if (streamer->result() == StreamResult::FinishedWithSuccess) + cont(streamer->readData()); + else + cont(make_unexpected(Tr::tr("Failed reading file"))); + }; + return execute(onSetup, onDone, context); +} + +FileStreamHandle FileStreamerManager::write(const FilePath &destination, const QByteArray &data, + const WriteContinuation &cont) +{ + return write(destination, data, nullptr, cont); +} + +FileStreamHandle FileStreamerManager::write(const FilePath &destination, const QByteArray &data, + QObject *context, const WriteContinuation &cont) +{ + const auto onSetup = [=](FileStreamer *streamer) { + streamer->setStreamMode(StreamMode::Writer); + streamer->setDestination(destination); + streamer->setWriteData(data); + }; + if (!cont) + return execute(onSetup, {}, context); + + const auto onDone = [=](FileStreamer *streamer) { + if (streamer->result() == StreamResult::FinishedWithSuccess) + cont(0); // TODO: return write count? + else + cont(make_unexpected(Tr::tr("Failed writing file"))); + }; + return execute(onSetup, onDone, context); +} + +void FileStreamerManager::stop(FileStreamHandle handle) +{ + deleteStreamer(handle); +} + +void FileStreamerManager::stopAll() +{ + deleteAllStreamers(); +} + +} // namespace Utils + diff --git a/src/libs/utils/filestreamermanager.h b/src/libs/utils/filestreamermanager.h new file mode 100644 index 00000000000..261d58ba299 --- /dev/null +++ b/src/libs/utils/filestreamermanager.h @@ -0,0 +1,46 @@ +// Copyright (C) 2023 The Qt Company Ltd. +// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +#pragma once + +#include "utils_global.h" + +#include "filepath.h" + +#include + +QT_BEGIN_NAMESPACE +class QByteArray; +QT_END_NAMESPACE + +namespace Utils { + +enum FileStreamHandle : int {}; + +class QTCREATOR_UTILS_EXPORT FileStreamerManager +{ +public: + using CopyContinuation = Continuation &>; + using ReadContinuation = Continuation &>; + using WriteContinuation = Continuation &>; + + static FileStreamHandle copy(const FilePath &source, const FilePath &destination, + const CopyContinuation &cont); + static FileStreamHandle copy(const FilePath &source, const FilePath &destination, + QObject *context, const CopyContinuation &cont); + + static FileStreamHandle read(const FilePath &source, const ReadContinuation &cont = {}); + static FileStreamHandle read(const FilePath &source, QObject *context, + const ReadContinuation &cont = {}); + + static FileStreamHandle write(const FilePath &destination, const QByteArray &data, + const WriteContinuation &cont = {}); + static FileStreamHandle write(const FilePath &destination, const QByteArray &data, + QObject *context, const WriteContinuation &cont = {}); + + // If called from the same thread that started the task, no continuation is going to be called. + static void stop(FileStreamHandle handle); + static void stopAll(); +}; + +} // namespace Utils diff --git a/src/libs/utils/utils.qbs b/src/libs/utils/utils.qbs index 8c0312ed03b..9e3d1eaeaaf 100644 --- a/src/libs/utils/utils.qbs +++ b/src/libs/utils/utils.qbs @@ -129,6 +129,8 @@ Project { "filesearch.h", "filestreamer.cpp", "filestreamer.h", + "filestreamermanager.cpp", + "filestreamermanager.h", "filesystemmodel.cpp", "filesystemmodel.h", "filesystemwatcher.cpp", diff --git a/src/plugins/remotelinux/filesystemaccess_test.cpp b/src/plugins/remotelinux/filesystemaccess_test.cpp index fa3f654f388..268e36acb4f 100644 --- a/src/plugins/remotelinux/filesystemaccess_test.cpp +++ b/src/plugins/remotelinux/filesystemaccess_test.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -88,7 +89,8 @@ void FileSystemAccessTest::initTestCase() QVERIFY(filePath.createDir()); QVERIFY(filePath.exists()); - const QString streamerDir("streamerDir"); + const QString streamerLocalDir("streamerLocalDir"); + const QString streamerRemoteDir("streamerRemoteDir"); const QString sourceDir("source"); const QString destDir("dest"); const QString localDir("local"); @@ -97,8 +99,8 @@ void FileSystemAccessTest::initTestCase() const FilePath remoteRoot = m_device->rootPath(); const FilePath localTempDir = *localRoot.tmpDir(); const FilePath remoteTempDir = *remoteRoot.tmpDir(); - m_localStreamerDir = localTempDir / streamerDir; - m_remoteStreamerDir = remoteTempDir / streamerDir; + m_localStreamerDir = localTempDir / streamerLocalDir; + m_remoteStreamerDir = remoteTempDir / streamerRemoteDir; m_localSourceDir = m_localStreamerDir / sourceDir; m_remoteSourceDir = m_remoteStreamerDir / sourceDir; m_localDestDir = m_localStreamerDir / destDir; @@ -139,6 +141,8 @@ void FileSystemAccessTest::cleanupTestCase() QVERIFY(!m_localStreamerDir.exists()); QVERIFY(!m_remoteStreamerDir.exists()); + + FileStreamerManager::stopAll(); } void FileSystemAccessTest::testCreateRemoteFile_data() @@ -530,6 +534,120 @@ void FileSystemAccessTest::testFileStreamer() qDebug() << "Elapsed time:" << timer.elapsed() << "ms."; } +void FileSystemAccessTest::testFileStreamerManager_data() +{ + testFileStreamer_data(); +} + +void FileSystemAccessTest::testFileStreamerManager() +{ + QElapsedTimer timer; + timer.start(); + + QFETCH(QString, fileName); + QFETCH(QByteArray, data); + + const FilePath localSourcePath = m_localSourceDir / fileName; + const FilePath remoteSourcePath = m_remoteSourceDir / fileName; + const FilePath localLocalDestPath = m_localDestDir / "local" / fileName; + const FilePath localRemoteDestPath = m_localDestDir / "remote" / fileName; + const FilePath remoteLocalDestPath = m_remoteDestDir / "local" / fileName; + const FilePath remoteRemoteDestPath = m_remoteDestDir / "remote" / fileName; + + localSourcePath.removeFile(); + remoteSourcePath.removeFile(); + localLocalDestPath.removeFile(); + localRemoteDestPath.removeFile(); + remoteLocalDestPath.removeFile(); + remoteRemoteDestPath.removeFile(); + + QVERIFY(!localSourcePath.exists()); + QVERIFY(!remoteSourcePath.exists()); + QVERIFY(!localLocalDestPath.exists()); + QVERIFY(!localRemoteDestPath.exists()); + QVERIFY(!remoteLocalDestPath.exists()); + QVERIFY(!remoteRemoteDestPath.exists()); + + std::optional localData; + std::optional remoteData; + std::optional localLocalData; + std::optional localRemoteData; + std::optional remoteLocalData; + std::optional remoteRemoteData; + + QEventLoop eventLoop1; + QEventLoop *loop = &eventLoop1; + int counter = 0; + int *hitCount = &counter; + + const auto writeAndRead = [hitCount, loop, data](const FilePath &destination, + std::optional *result) { + const auto onWrite = [hitCount, loop, destination, result] + (const expected_str &writeResult) { + QVERIFY(writeResult); + const auto onRead = [hitCount, loop, result] + (const expected_str &readResult) { + QVERIFY(readResult); + *result = *readResult; + ++(*hitCount); + if (*hitCount == 2) + loop->quit(); + }; + FileStreamerManager::read(destination, onRead); + }; + FileStreamerManager::write(destination, data, onWrite); + }; + + writeAndRead(localSourcePath, &localData); + writeAndRead(remoteSourcePath, &remoteData); + loop->exec(); + + QVERIFY(localData); + QCOMPARE(*localData, data); + QVERIFY(remoteData); + QCOMPARE(*remoteData, data); + + QEventLoop eventLoop2; + loop = &eventLoop2; + counter = 0; + + const auto transferAndRead = [hitCount, loop, data](const FilePath &source, + const FilePath &destination, + std::optional *result) { + const auto onTransfer = [hitCount, loop, destination, result] + (const expected_str &transferResult) { + QVERIFY(transferResult); + const auto onRead = [hitCount, loop, result] + (const expected_str &readResult) { + QVERIFY(readResult); + *result = *readResult; + ++(*hitCount); + if (*hitCount == 4) + loop->quit(); + }; + FileStreamerManager::read(destination, onRead); + }; + FileStreamerManager::copy(source, destination, onTransfer); + }; + + transferAndRead(localSourcePath, localLocalDestPath, &localLocalData); + transferAndRead(remoteSourcePath, localRemoteDestPath, &localRemoteData); + transferAndRead(localSourcePath, remoteLocalDestPath, &remoteLocalData); + transferAndRead(remoteSourcePath, remoteRemoteDestPath, &remoteRemoteData); + loop->exec(); + + QVERIFY(localLocalData); + QCOMPARE(*localLocalData, data); + QVERIFY(localRemoteData); + QCOMPARE(*localRemoteData, data); + QVERIFY(remoteLocalData); + QCOMPARE(*remoteLocalData, data); + QVERIFY(remoteRemoteData); + QCOMPARE(*remoteRemoteData, data); + + qDebug() << "Elapsed time:" << timer.elapsed() << "ms."; +} + void FileSystemAccessTest::testBlockingTransfer_data() { testFileStreamer_data(); diff --git a/src/plugins/remotelinux/filesystemaccess_test.h b/src/plugins/remotelinux/filesystemaccess_test.h index f7101af3908..2d3e6ae241b 100644 --- a/src/plugins/remotelinux/filesystemaccess_test.h +++ b/src/plugins/remotelinux/filesystemaccess_test.h @@ -31,6 +31,8 @@ private slots: void testFileTransfer(); void testFileStreamer_data(); void testFileStreamer(); + void testFileStreamerManager_data(); + void testFileStreamerManager(); void testBlockingTransfer_data(); void testBlockingTransfer();