Utils: Introduce FileStreamerManager

To be used for FilePath::async[Copy/Read/Write]() methods.

Change-Id: Ie34e600f8d65eae10b41893e15685afe19ce2a46
Reviewed-by: hjk <hjk@qt.io>
This commit is contained in:
Jarek Kobus
2023-02-27 15:54:14 +01:00
parent 22b9826e22
commit c1b1842c48
6 changed files with 372 additions and 3 deletions

View File

@@ -56,6 +56,7 @@ add_qtc_library(Utils
filepathinfo.h filepathinfo.h
filesearch.cpp filesearch.h filesearch.cpp filesearch.h
filestreamer.cpp filestreamer.h filestreamer.cpp filestreamer.h
filestreamermanager.cpp filestreamermanager.h
filesystemmodel.cpp filesystemmodel.h filesystemmodel.cpp filesystemmodel.h
filesystemwatcher.cpp filesystemwatcher.h filesystemwatcher.cpp filesystemwatcher.h
fileutils.cpp fileutils.h fileutils.cpp fileutils.h

View File

@@ -0,0 +1,200 @@
// Copyright (C) 2023 The Qt Company Ltd.
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
#include "filestreamermanager.h"
#include "filestreamer.h"
#include "threadutils.h"
#include "utilstr.h"
#include <QMutex>
#include <QMutexLocker>
#include <QThread>
#include <QWaitCondition>
#include <unordered_map>
namespace Utils {
// TODO: destruct the instance before destructing ProjectExplorer::DeviceManager (?)
static FileStreamHandle generateUniqueHandle()
{
static std::atomic_int handleCounter = 1;
return FileStreamHandle(handleCounter.fetch_add(1));
}
static QMutex s_mutex = {};
static QWaitCondition s_waitCondition = {};
static std::unordered_map<FileStreamHandle, FileStreamer *> s_fileStreamers = {};
static void addStreamer(FileStreamHandle handle, FileStreamer *streamer)
{
QMutexLocker locker(&s_mutex);
const bool added = s_fileStreamers.try_emplace(handle, streamer).second;
QTC_CHECK(added);
}
static void removeStreamer(FileStreamHandle handle)
{
QMutexLocker locker(&s_mutex);
auto it = s_fileStreamers.find(handle);
QTC_ASSERT(it != s_fileStreamers.end(), return);
QTC_ASSERT(QThread::currentThread() == it->second->thread(), return);
s_fileStreamers.erase(it);
s_waitCondition.wakeAll();
}
static void deleteStreamer(FileStreamHandle handle)
{
QMutexLocker locker(&s_mutex);
auto it = s_fileStreamers.find(handle);
if (it != s_fileStreamers.end())
return;
if (QThread::currentThread() == it->second->thread()) {
delete it->second;
s_fileStreamers.erase(it);
s_waitCondition.wakeAll();
} else {
QMetaObject::invokeMethod(it->second, [handle] {
deleteStreamer(handle);
});
s_waitCondition.wait(&s_mutex);
QTC_CHECK(s_fileStreamers.find(handle) == s_fileStreamers.end());
}
}
static void deleteAllStreamers()
{
QMutexLocker locker(&s_mutex);
QTC_ASSERT(Utils::isMainThread(), return);
while (s_fileStreamers.size()) {
auto it = s_fileStreamers.begin();
if (QThread::currentThread() == it->second->thread()) {
delete it->second;
s_fileStreamers.erase(it);
s_waitCondition.wakeAll();
} else {
const FileStreamHandle handle = it->first;
QMetaObject::invokeMethod(it->second, [handle] {
deleteStreamer(handle);
});
s_waitCondition.wait(&s_mutex);
QTC_CHECK(s_fileStreamers.find(handle) == s_fileStreamers.end());
}
}
}
static FileStreamHandle checkHandle(FileStreamHandle handle)
{
QMutexLocker locker(&s_mutex);
return s_fileStreamers.find(handle) != s_fileStreamers.end() ? handle : FileStreamHandle(0);
}
FileStreamHandle execute(const std::function<void(FileStreamer *)> &onSetup,
const std::function<void(FileStreamer *)> &onDone,
QObject *context)
{
FileStreamer *streamer = new FileStreamer;
onSetup(streamer);
const FileStreamHandle handle = generateUniqueHandle();
QTC_CHECK(context == nullptr || context->thread() == QThread::currentThread());
QObject *finalContext = context ? context : streamer;
QObject::connect(streamer, &FileStreamer::done, finalContext, [=] {
if (onDone)
onDone(streamer);
removeStreamer(handle);
streamer->deleteLater();
});
addStreamer(handle, streamer);
streamer->start();
return checkHandle(handle); // The handle could have been already removed
}
FileStreamHandle FileStreamerManager::copy(const FilePath &source, const FilePath &destination,
const CopyContinuation &cont)
{
return copy(source, destination, nullptr, cont);
}
FileStreamHandle FileStreamerManager::copy(const FilePath &source, const FilePath &destination,
QObject *context, const CopyContinuation &cont)
{
const auto onSetup = [=](FileStreamer *streamer) {
streamer->setSource(source);
streamer->setDestination(destination);
};
if (!cont)
return execute(onSetup, {}, context);
const auto onDone = [=](FileStreamer *streamer) {
if (streamer->result() == StreamResult::FinishedWithSuccess)
cont({});
else
cont(make_unexpected(Tr::tr("Failed copying file")));
};
return execute(onSetup, onDone, context);
}
FileStreamHandle FileStreamerManager::read(const FilePath &source, const ReadContinuation &cont)
{
return read(source, nullptr, cont);
}
FileStreamHandle FileStreamerManager::read(const FilePath &source, QObject *context,
const ReadContinuation &cont)
{
const auto onSetup = [=](FileStreamer *streamer) {
streamer->setStreamMode(StreamMode::Reader);
streamer->setSource(source);
};
if (!cont)
return execute(onSetup, {}, context);
const auto onDone = [=](FileStreamer *streamer) {
if (streamer->result() == StreamResult::FinishedWithSuccess)
cont(streamer->readData());
else
cont(make_unexpected(Tr::tr("Failed reading file")));
};
return execute(onSetup, onDone, context);
}
FileStreamHandle FileStreamerManager::write(const FilePath &destination, const QByteArray &data,
const WriteContinuation &cont)
{
return write(destination, data, nullptr, cont);
}
FileStreamHandle FileStreamerManager::write(const FilePath &destination, const QByteArray &data,
QObject *context, const WriteContinuation &cont)
{
const auto onSetup = [=](FileStreamer *streamer) {
streamer->setStreamMode(StreamMode::Writer);
streamer->setDestination(destination);
streamer->setWriteData(data);
};
if (!cont)
return execute(onSetup, {}, context);
const auto onDone = [=](FileStreamer *streamer) {
if (streamer->result() == StreamResult::FinishedWithSuccess)
cont(0); // TODO: return write count?
else
cont(make_unexpected(Tr::tr("Failed writing file")));
};
return execute(onSetup, onDone, context);
}
void FileStreamerManager::stop(FileStreamHandle handle)
{
deleteStreamer(handle);
}
void FileStreamerManager::stopAll()
{
deleteAllStreamers();
}
} // namespace Utils

View File

@@ -0,0 +1,46 @@
// Copyright (C) 2023 The Qt Company Ltd.
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
#pragma once
#include "utils_global.h"
#include "filepath.h"
#include <QObject>
QT_BEGIN_NAMESPACE
class QByteArray;
QT_END_NAMESPACE
namespace Utils {
enum FileStreamHandle : int {};
class QTCREATOR_UTILS_EXPORT FileStreamerManager
{
public:
using CopyContinuation = Continuation<const expected_str<void> &>;
using ReadContinuation = Continuation<const expected_str<QByteArray> &>;
using WriteContinuation = Continuation<const expected_str<qint64> &>;
static FileStreamHandle copy(const FilePath &source, const FilePath &destination,
const CopyContinuation &cont);
static FileStreamHandle copy(const FilePath &source, const FilePath &destination,
QObject *context, const CopyContinuation &cont);
static FileStreamHandle read(const FilePath &source, const ReadContinuation &cont = {});
static FileStreamHandle read(const FilePath &source, QObject *context,
const ReadContinuation &cont = {});
static FileStreamHandle write(const FilePath &destination, const QByteArray &data,
const WriteContinuation &cont = {});
static FileStreamHandle write(const FilePath &destination, const QByteArray &data,
QObject *context, const WriteContinuation &cont = {});
// If called from the same thread that started the task, no continuation is going to be called.
static void stop(FileStreamHandle handle);
static void stopAll();
};
} // namespace Utils

View File

@@ -129,6 +129,8 @@ Project {
"filesearch.h", "filesearch.h",
"filestreamer.cpp", "filestreamer.cpp",
"filestreamer.h", "filestreamer.h",
"filestreamermanager.cpp",
"filestreamermanager.h",
"filesystemmodel.cpp", "filesystemmodel.cpp",
"filesystemmodel.h", "filesystemmodel.h",
"filesystemwatcher.cpp", "filesystemwatcher.cpp",

View File

@@ -10,6 +10,7 @@
#include <projectexplorer/devicesupport/sshparameters.h> #include <projectexplorer/devicesupport/sshparameters.h>
#include <utils/filepath.h> #include <utils/filepath.h>
#include <utils/filestreamer.h> #include <utils/filestreamer.h>
#include <utils/filestreamermanager.h>
#include <utils/processinterface.h> #include <utils/processinterface.h>
#include <QDebug> #include <QDebug>
@@ -88,7 +89,8 @@ void FileSystemAccessTest::initTestCase()
QVERIFY(filePath.createDir()); QVERIFY(filePath.createDir());
QVERIFY(filePath.exists()); QVERIFY(filePath.exists());
const QString streamerDir("streamerDir"); const QString streamerLocalDir("streamerLocalDir");
const QString streamerRemoteDir("streamerRemoteDir");
const QString sourceDir("source"); const QString sourceDir("source");
const QString destDir("dest"); const QString destDir("dest");
const QString localDir("local"); const QString localDir("local");
@@ -97,8 +99,8 @@ void FileSystemAccessTest::initTestCase()
const FilePath remoteRoot = m_device->rootPath(); const FilePath remoteRoot = m_device->rootPath();
const FilePath localTempDir = *localRoot.tmpDir(); const FilePath localTempDir = *localRoot.tmpDir();
const FilePath remoteTempDir = *remoteRoot.tmpDir(); const FilePath remoteTempDir = *remoteRoot.tmpDir();
m_localStreamerDir = localTempDir / streamerDir; m_localStreamerDir = localTempDir / streamerLocalDir;
m_remoteStreamerDir = remoteTempDir / streamerDir; m_remoteStreamerDir = remoteTempDir / streamerRemoteDir;
m_localSourceDir = m_localStreamerDir / sourceDir; m_localSourceDir = m_localStreamerDir / sourceDir;
m_remoteSourceDir = m_remoteStreamerDir / sourceDir; m_remoteSourceDir = m_remoteStreamerDir / sourceDir;
m_localDestDir = m_localStreamerDir / destDir; m_localDestDir = m_localStreamerDir / destDir;
@@ -139,6 +141,8 @@ void FileSystemAccessTest::cleanupTestCase()
QVERIFY(!m_localStreamerDir.exists()); QVERIFY(!m_localStreamerDir.exists());
QVERIFY(!m_remoteStreamerDir.exists()); QVERIFY(!m_remoteStreamerDir.exists());
FileStreamerManager::stopAll();
} }
void FileSystemAccessTest::testCreateRemoteFile_data() void FileSystemAccessTest::testCreateRemoteFile_data()
@@ -530,6 +534,120 @@ void FileSystemAccessTest::testFileStreamer()
qDebug() << "Elapsed time:" << timer.elapsed() << "ms."; qDebug() << "Elapsed time:" << timer.elapsed() << "ms.";
} }
void FileSystemAccessTest::testFileStreamerManager_data()
{
testFileStreamer_data();
}
void FileSystemAccessTest::testFileStreamerManager()
{
QElapsedTimer timer;
timer.start();
QFETCH(QString, fileName);
QFETCH(QByteArray, data);
const FilePath localSourcePath = m_localSourceDir / fileName;
const FilePath remoteSourcePath = m_remoteSourceDir / fileName;
const FilePath localLocalDestPath = m_localDestDir / "local" / fileName;
const FilePath localRemoteDestPath = m_localDestDir / "remote" / fileName;
const FilePath remoteLocalDestPath = m_remoteDestDir / "local" / fileName;
const FilePath remoteRemoteDestPath = m_remoteDestDir / "remote" / fileName;
localSourcePath.removeFile();
remoteSourcePath.removeFile();
localLocalDestPath.removeFile();
localRemoteDestPath.removeFile();
remoteLocalDestPath.removeFile();
remoteRemoteDestPath.removeFile();
QVERIFY(!localSourcePath.exists());
QVERIFY(!remoteSourcePath.exists());
QVERIFY(!localLocalDestPath.exists());
QVERIFY(!localRemoteDestPath.exists());
QVERIFY(!remoteLocalDestPath.exists());
QVERIFY(!remoteRemoteDestPath.exists());
std::optional<QByteArray> localData;
std::optional<QByteArray> remoteData;
std::optional<QByteArray> localLocalData;
std::optional<QByteArray> localRemoteData;
std::optional<QByteArray> remoteLocalData;
std::optional<QByteArray> remoteRemoteData;
QEventLoop eventLoop1;
QEventLoop *loop = &eventLoop1;
int counter = 0;
int *hitCount = &counter;
const auto writeAndRead = [hitCount, loop, data](const FilePath &destination,
std::optional<QByteArray> *result) {
const auto onWrite = [hitCount, loop, destination, result]
(const expected_str<qint64> &writeResult) {
QVERIFY(writeResult);
const auto onRead = [hitCount, loop, result]
(const expected_str<QByteArray> &readResult) {
QVERIFY(readResult);
*result = *readResult;
++(*hitCount);
if (*hitCount == 2)
loop->quit();
};
FileStreamerManager::read(destination, onRead);
};
FileStreamerManager::write(destination, data, onWrite);
};
writeAndRead(localSourcePath, &localData);
writeAndRead(remoteSourcePath, &remoteData);
loop->exec();
QVERIFY(localData);
QCOMPARE(*localData, data);
QVERIFY(remoteData);
QCOMPARE(*remoteData, data);
QEventLoop eventLoop2;
loop = &eventLoop2;
counter = 0;
const auto transferAndRead = [hitCount, loop, data](const FilePath &source,
const FilePath &destination,
std::optional<QByteArray> *result) {
const auto onTransfer = [hitCount, loop, destination, result]
(const expected_str<void> &transferResult) {
QVERIFY(transferResult);
const auto onRead = [hitCount, loop, result]
(const expected_str<QByteArray> &readResult) {
QVERIFY(readResult);
*result = *readResult;
++(*hitCount);
if (*hitCount == 4)
loop->quit();
};
FileStreamerManager::read(destination, onRead);
};
FileStreamerManager::copy(source, destination, onTransfer);
};
transferAndRead(localSourcePath, localLocalDestPath, &localLocalData);
transferAndRead(remoteSourcePath, localRemoteDestPath, &localRemoteData);
transferAndRead(localSourcePath, remoteLocalDestPath, &remoteLocalData);
transferAndRead(remoteSourcePath, remoteRemoteDestPath, &remoteRemoteData);
loop->exec();
QVERIFY(localLocalData);
QCOMPARE(*localLocalData, data);
QVERIFY(localRemoteData);
QCOMPARE(*localRemoteData, data);
QVERIFY(remoteLocalData);
QCOMPARE(*remoteLocalData, data);
QVERIFY(remoteRemoteData);
QCOMPARE(*remoteRemoteData, data);
qDebug() << "Elapsed time:" << timer.elapsed() << "ms.";
}
void FileSystemAccessTest::testBlockingTransfer_data() void FileSystemAccessTest::testBlockingTransfer_data()
{ {
testFileStreamer_data(); testFileStreamer_data();

View File

@@ -31,6 +31,8 @@ private slots:
void testFileTransfer(); void testFileTransfer();
void testFileStreamer_data(); void testFileStreamer_data();
void testFileStreamer(); void testFileStreamer();
void testFileStreamerManager_data();
void testFileStreamerManager();
void testBlockingTransfer_data(); void testBlockingTransfer_data();
void testBlockingTransfer(); void testBlockingTransfer();