forked from qt-creator/qt-creator
Utils: Introduce FileStreamerManager
To be used for FilePath::async[Copy/Read/Write]() methods. Change-Id: Ie34e600f8d65eae10b41893e15685afe19ce2a46 Reviewed-by: hjk <hjk@qt.io>
This commit is contained in:
@@ -56,6 +56,7 @@ add_qtc_library(Utils
|
||||
filepathinfo.h
|
||||
filesearch.cpp filesearch.h
|
||||
filestreamer.cpp filestreamer.h
|
||||
filestreamermanager.cpp filestreamermanager.h
|
||||
filesystemmodel.cpp filesystemmodel.h
|
||||
filesystemwatcher.cpp filesystemwatcher.h
|
||||
fileutils.cpp fileutils.h
|
||||
|
200
src/libs/utils/filestreamermanager.cpp
Normal file
200
src/libs/utils/filestreamermanager.cpp
Normal file
@@ -0,0 +1,200 @@
|
||||
// Copyright (C) 2023 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
|
||||
|
||||
#include "filestreamermanager.h"
|
||||
|
||||
#include "filestreamer.h"
|
||||
#include "threadutils.h"
|
||||
#include "utilstr.h"
|
||||
|
||||
#include <QMutex>
|
||||
#include <QMutexLocker>
|
||||
#include <QThread>
|
||||
#include <QWaitCondition>
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
namespace Utils {
|
||||
|
||||
// TODO: destruct the instance before destructing ProjectExplorer::DeviceManager (?)
|
||||
|
||||
static FileStreamHandle generateUniqueHandle()
|
||||
{
|
||||
static std::atomic_int handleCounter = 1;
|
||||
return FileStreamHandle(handleCounter.fetch_add(1));
|
||||
}
|
||||
|
||||
static QMutex s_mutex = {};
|
||||
static QWaitCondition s_waitCondition = {};
|
||||
static std::unordered_map<FileStreamHandle, FileStreamer *> s_fileStreamers = {};
|
||||
|
||||
static void addStreamer(FileStreamHandle handle, FileStreamer *streamer)
|
||||
{
|
||||
QMutexLocker locker(&s_mutex);
|
||||
const bool added = s_fileStreamers.try_emplace(handle, streamer).second;
|
||||
QTC_CHECK(added);
|
||||
}
|
||||
|
||||
static void removeStreamer(FileStreamHandle handle)
|
||||
{
|
||||
QMutexLocker locker(&s_mutex);
|
||||
auto it = s_fileStreamers.find(handle);
|
||||
QTC_ASSERT(it != s_fileStreamers.end(), return);
|
||||
QTC_ASSERT(QThread::currentThread() == it->second->thread(), return);
|
||||
s_fileStreamers.erase(it);
|
||||
s_waitCondition.wakeAll();
|
||||
}
|
||||
|
||||
static void deleteStreamer(FileStreamHandle handle)
|
||||
{
|
||||
QMutexLocker locker(&s_mutex);
|
||||
auto it = s_fileStreamers.find(handle);
|
||||
if (it != s_fileStreamers.end())
|
||||
return;
|
||||
if (QThread::currentThread() == it->second->thread()) {
|
||||
delete it->second;
|
||||
s_fileStreamers.erase(it);
|
||||
s_waitCondition.wakeAll();
|
||||
} else {
|
||||
QMetaObject::invokeMethod(it->second, [handle] {
|
||||
deleteStreamer(handle);
|
||||
});
|
||||
s_waitCondition.wait(&s_mutex);
|
||||
QTC_CHECK(s_fileStreamers.find(handle) == s_fileStreamers.end());
|
||||
}
|
||||
}
|
||||
|
||||
static void deleteAllStreamers()
|
||||
{
|
||||
QMutexLocker locker(&s_mutex);
|
||||
QTC_ASSERT(Utils::isMainThread(), return);
|
||||
while (s_fileStreamers.size()) {
|
||||
auto it = s_fileStreamers.begin();
|
||||
if (QThread::currentThread() == it->second->thread()) {
|
||||
delete it->second;
|
||||
s_fileStreamers.erase(it);
|
||||
s_waitCondition.wakeAll();
|
||||
} else {
|
||||
const FileStreamHandle handle = it->first;
|
||||
QMetaObject::invokeMethod(it->second, [handle] {
|
||||
deleteStreamer(handle);
|
||||
});
|
||||
s_waitCondition.wait(&s_mutex);
|
||||
QTC_CHECK(s_fileStreamers.find(handle) == s_fileStreamers.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static FileStreamHandle checkHandle(FileStreamHandle handle)
|
||||
{
|
||||
QMutexLocker locker(&s_mutex);
|
||||
return s_fileStreamers.find(handle) != s_fileStreamers.end() ? handle : FileStreamHandle(0);
|
||||
}
|
||||
|
||||
FileStreamHandle execute(const std::function<void(FileStreamer *)> &onSetup,
|
||||
const std::function<void(FileStreamer *)> &onDone,
|
||||
QObject *context)
|
||||
{
|
||||
FileStreamer *streamer = new FileStreamer;
|
||||
onSetup(streamer);
|
||||
const FileStreamHandle handle = generateUniqueHandle();
|
||||
QTC_CHECK(context == nullptr || context->thread() == QThread::currentThread());
|
||||
QObject *finalContext = context ? context : streamer;
|
||||
QObject::connect(streamer, &FileStreamer::done, finalContext, [=] {
|
||||
if (onDone)
|
||||
onDone(streamer);
|
||||
removeStreamer(handle);
|
||||
streamer->deleteLater();
|
||||
});
|
||||
addStreamer(handle, streamer);
|
||||
streamer->start();
|
||||
return checkHandle(handle); // The handle could have been already removed
|
||||
}
|
||||
|
||||
FileStreamHandle FileStreamerManager::copy(const FilePath &source, const FilePath &destination,
|
||||
const CopyContinuation &cont)
|
||||
{
|
||||
return copy(source, destination, nullptr, cont);
|
||||
}
|
||||
|
||||
FileStreamHandle FileStreamerManager::copy(const FilePath &source, const FilePath &destination,
|
||||
QObject *context, const CopyContinuation &cont)
|
||||
{
|
||||
const auto onSetup = [=](FileStreamer *streamer) {
|
||||
streamer->setSource(source);
|
||||
streamer->setDestination(destination);
|
||||
};
|
||||
if (!cont)
|
||||
return execute(onSetup, {}, context);
|
||||
|
||||
const auto onDone = [=](FileStreamer *streamer) {
|
||||
if (streamer->result() == StreamResult::FinishedWithSuccess)
|
||||
cont({});
|
||||
else
|
||||
cont(make_unexpected(Tr::tr("Failed copying file")));
|
||||
};
|
||||
return execute(onSetup, onDone, context);
|
||||
}
|
||||
|
||||
FileStreamHandle FileStreamerManager::read(const FilePath &source, const ReadContinuation &cont)
|
||||
{
|
||||
return read(source, nullptr, cont);
|
||||
}
|
||||
|
||||
FileStreamHandle FileStreamerManager::read(const FilePath &source, QObject *context,
|
||||
const ReadContinuation &cont)
|
||||
{
|
||||
const auto onSetup = [=](FileStreamer *streamer) {
|
||||
streamer->setStreamMode(StreamMode::Reader);
|
||||
streamer->setSource(source);
|
||||
};
|
||||
if (!cont)
|
||||
return execute(onSetup, {}, context);
|
||||
|
||||
const auto onDone = [=](FileStreamer *streamer) {
|
||||
if (streamer->result() == StreamResult::FinishedWithSuccess)
|
||||
cont(streamer->readData());
|
||||
else
|
||||
cont(make_unexpected(Tr::tr("Failed reading file")));
|
||||
};
|
||||
return execute(onSetup, onDone, context);
|
||||
}
|
||||
|
||||
FileStreamHandle FileStreamerManager::write(const FilePath &destination, const QByteArray &data,
|
||||
const WriteContinuation &cont)
|
||||
{
|
||||
return write(destination, data, nullptr, cont);
|
||||
}
|
||||
|
||||
FileStreamHandle FileStreamerManager::write(const FilePath &destination, const QByteArray &data,
|
||||
QObject *context, const WriteContinuation &cont)
|
||||
{
|
||||
const auto onSetup = [=](FileStreamer *streamer) {
|
||||
streamer->setStreamMode(StreamMode::Writer);
|
||||
streamer->setDestination(destination);
|
||||
streamer->setWriteData(data);
|
||||
};
|
||||
if (!cont)
|
||||
return execute(onSetup, {}, context);
|
||||
|
||||
const auto onDone = [=](FileStreamer *streamer) {
|
||||
if (streamer->result() == StreamResult::FinishedWithSuccess)
|
||||
cont(0); // TODO: return write count?
|
||||
else
|
||||
cont(make_unexpected(Tr::tr("Failed writing file")));
|
||||
};
|
||||
return execute(onSetup, onDone, context);
|
||||
}
|
||||
|
||||
void FileStreamerManager::stop(FileStreamHandle handle)
|
||||
{
|
||||
deleteStreamer(handle);
|
||||
}
|
||||
|
||||
void FileStreamerManager::stopAll()
|
||||
{
|
||||
deleteAllStreamers();
|
||||
}
|
||||
|
||||
} // namespace Utils
|
||||
|
46
src/libs/utils/filestreamermanager.h
Normal file
46
src/libs/utils/filestreamermanager.h
Normal file
@@ -0,0 +1,46 @@
|
||||
// Copyright (C) 2023 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils_global.h"
|
||||
|
||||
#include "filepath.h"
|
||||
|
||||
#include <QObject>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
class QByteArray;
|
||||
QT_END_NAMESPACE
|
||||
|
||||
namespace Utils {
|
||||
|
||||
enum FileStreamHandle : int {};
|
||||
|
||||
class QTCREATOR_UTILS_EXPORT FileStreamerManager
|
||||
{
|
||||
public:
|
||||
using CopyContinuation = Continuation<const expected_str<void> &>;
|
||||
using ReadContinuation = Continuation<const expected_str<QByteArray> &>;
|
||||
using WriteContinuation = Continuation<const expected_str<qint64> &>;
|
||||
|
||||
static FileStreamHandle copy(const FilePath &source, const FilePath &destination,
|
||||
const CopyContinuation &cont);
|
||||
static FileStreamHandle copy(const FilePath &source, const FilePath &destination,
|
||||
QObject *context, const CopyContinuation &cont);
|
||||
|
||||
static FileStreamHandle read(const FilePath &source, const ReadContinuation &cont = {});
|
||||
static FileStreamHandle read(const FilePath &source, QObject *context,
|
||||
const ReadContinuation &cont = {});
|
||||
|
||||
static FileStreamHandle write(const FilePath &destination, const QByteArray &data,
|
||||
const WriteContinuation &cont = {});
|
||||
static FileStreamHandle write(const FilePath &destination, const QByteArray &data,
|
||||
QObject *context, const WriteContinuation &cont = {});
|
||||
|
||||
// If called from the same thread that started the task, no continuation is going to be called.
|
||||
static void stop(FileStreamHandle handle);
|
||||
static void stopAll();
|
||||
};
|
||||
|
||||
} // namespace Utils
|
@@ -129,6 +129,8 @@ Project {
|
||||
"filesearch.h",
|
||||
"filestreamer.cpp",
|
||||
"filestreamer.h",
|
||||
"filestreamermanager.cpp",
|
||||
"filestreamermanager.h",
|
||||
"filesystemmodel.cpp",
|
||||
"filesystemmodel.h",
|
||||
"filesystemwatcher.cpp",
|
||||
|
@@ -10,6 +10,7 @@
|
||||
#include <projectexplorer/devicesupport/sshparameters.h>
|
||||
#include <utils/filepath.h>
|
||||
#include <utils/filestreamer.h>
|
||||
#include <utils/filestreamermanager.h>
|
||||
#include <utils/processinterface.h>
|
||||
|
||||
#include <QDebug>
|
||||
@@ -88,7 +89,8 @@ void FileSystemAccessTest::initTestCase()
|
||||
QVERIFY(filePath.createDir());
|
||||
QVERIFY(filePath.exists());
|
||||
|
||||
const QString streamerDir("streamerDir");
|
||||
const QString streamerLocalDir("streamerLocalDir");
|
||||
const QString streamerRemoteDir("streamerRemoteDir");
|
||||
const QString sourceDir("source");
|
||||
const QString destDir("dest");
|
||||
const QString localDir("local");
|
||||
@@ -97,8 +99,8 @@ void FileSystemAccessTest::initTestCase()
|
||||
const FilePath remoteRoot = m_device->rootPath();
|
||||
const FilePath localTempDir = *localRoot.tmpDir();
|
||||
const FilePath remoteTempDir = *remoteRoot.tmpDir();
|
||||
m_localStreamerDir = localTempDir / streamerDir;
|
||||
m_remoteStreamerDir = remoteTempDir / streamerDir;
|
||||
m_localStreamerDir = localTempDir / streamerLocalDir;
|
||||
m_remoteStreamerDir = remoteTempDir / streamerRemoteDir;
|
||||
m_localSourceDir = m_localStreamerDir / sourceDir;
|
||||
m_remoteSourceDir = m_remoteStreamerDir / sourceDir;
|
||||
m_localDestDir = m_localStreamerDir / destDir;
|
||||
@@ -139,6 +141,8 @@ void FileSystemAccessTest::cleanupTestCase()
|
||||
|
||||
QVERIFY(!m_localStreamerDir.exists());
|
||||
QVERIFY(!m_remoteStreamerDir.exists());
|
||||
|
||||
FileStreamerManager::stopAll();
|
||||
}
|
||||
|
||||
void FileSystemAccessTest::testCreateRemoteFile_data()
|
||||
@@ -530,6 +534,120 @@ void FileSystemAccessTest::testFileStreamer()
|
||||
qDebug() << "Elapsed time:" << timer.elapsed() << "ms.";
|
||||
}
|
||||
|
||||
void FileSystemAccessTest::testFileStreamerManager_data()
|
||||
{
|
||||
testFileStreamer_data();
|
||||
}
|
||||
|
||||
void FileSystemAccessTest::testFileStreamerManager()
|
||||
{
|
||||
QElapsedTimer timer;
|
||||
timer.start();
|
||||
|
||||
QFETCH(QString, fileName);
|
||||
QFETCH(QByteArray, data);
|
||||
|
||||
const FilePath localSourcePath = m_localSourceDir / fileName;
|
||||
const FilePath remoteSourcePath = m_remoteSourceDir / fileName;
|
||||
const FilePath localLocalDestPath = m_localDestDir / "local" / fileName;
|
||||
const FilePath localRemoteDestPath = m_localDestDir / "remote" / fileName;
|
||||
const FilePath remoteLocalDestPath = m_remoteDestDir / "local" / fileName;
|
||||
const FilePath remoteRemoteDestPath = m_remoteDestDir / "remote" / fileName;
|
||||
|
||||
localSourcePath.removeFile();
|
||||
remoteSourcePath.removeFile();
|
||||
localLocalDestPath.removeFile();
|
||||
localRemoteDestPath.removeFile();
|
||||
remoteLocalDestPath.removeFile();
|
||||
remoteRemoteDestPath.removeFile();
|
||||
|
||||
QVERIFY(!localSourcePath.exists());
|
||||
QVERIFY(!remoteSourcePath.exists());
|
||||
QVERIFY(!localLocalDestPath.exists());
|
||||
QVERIFY(!localRemoteDestPath.exists());
|
||||
QVERIFY(!remoteLocalDestPath.exists());
|
||||
QVERIFY(!remoteRemoteDestPath.exists());
|
||||
|
||||
std::optional<QByteArray> localData;
|
||||
std::optional<QByteArray> remoteData;
|
||||
std::optional<QByteArray> localLocalData;
|
||||
std::optional<QByteArray> localRemoteData;
|
||||
std::optional<QByteArray> remoteLocalData;
|
||||
std::optional<QByteArray> remoteRemoteData;
|
||||
|
||||
QEventLoop eventLoop1;
|
||||
QEventLoop *loop = &eventLoop1;
|
||||
int counter = 0;
|
||||
int *hitCount = &counter;
|
||||
|
||||
const auto writeAndRead = [hitCount, loop, data](const FilePath &destination,
|
||||
std::optional<QByteArray> *result) {
|
||||
const auto onWrite = [hitCount, loop, destination, result]
|
||||
(const expected_str<qint64> &writeResult) {
|
||||
QVERIFY(writeResult);
|
||||
const auto onRead = [hitCount, loop, result]
|
||||
(const expected_str<QByteArray> &readResult) {
|
||||
QVERIFY(readResult);
|
||||
*result = *readResult;
|
||||
++(*hitCount);
|
||||
if (*hitCount == 2)
|
||||
loop->quit();
|
||||
};
|
||||
FileStreamerManager::read(destination, onRead);
|
||||
};
|
||||
FileStreamerManager::write(destination, data, onWrite);
|
||||
};
|
||||
|
||||
writeAndRead(localSourcePath, &localData);
|
||||
writeAndRead(remoteSourcePath, &remoteData);
|
||||
loop->exec();
|
||||
|
||||
QVERIFY(localData);
|
||||
QCOMPARE(*localData, data);
|
||||
QVERIFY(remoteData);
|
||||
QCOMPARE(*remoteData, data);
|
||||
|
||||
QEventLoop eventLoop2;
|
||||
loop = &eventLoop2;
|
||||
counter = 0;
|
||||
|
||||
const auto transferAndRead = [hitCount, loop, data](const FilePath &source,
|
||||
const FilePath &destination,
|
||||
std::optional<QByteArray> *result) {
|
||||
const auto onTransfer = [hitCount, loop, destination, result]
|
||||
(const expected_str<void> &transferResult) {
|
||||
QVERIFY(transferResult);
|
||||
const auto onRead = [hitCount, loop, result]
|
||||
(const expected_str<QByteArray> &readResult) {
|
||||
QVERIFY(readResult);
|
||||
*result = *readResult;
|
||||
++(*hitCount);
|
||||
if (*hitCount == 4)
|
||||
loop->quit();
|
||||
};
|
||||
FileStreamerManager::read(destination, onRead);
|
||||
};
|
||||
FileStreamerManager::copy(source, destination, onTransfer);
|
||||
};
|
||||
|
||||
transferAndRead(localSourcePath, localLocalDestPath, &localLocalData);
|
||||
transferAndRead(remoteSourcePath, localRemoteDestPath, &localRemoteData);
|
||||
transferAndRead(localSourcePath, remoteLocalDestPath, &remoteLocalData);
|
||||
transferAndRead(remoteSourcePath, remoteRemoteDestPath, &remoteRemoteData);
|
||||
loop->exec();
|
||||
|
||||
QVERIFY(localLocalData);
|
||||
QCOMPARE(*localLocalData, data);
|
||||
QVERIFY(localRemoteData);
|
||||
QCOMPARE(*localRemoteData, data);
|
||||
QVERIFY(remoteLocalData);
|
||||
QCOMPARE(*remoteLocalData, data);
|
||||
QVERIFY(remoteRemoteData);
|
||||
QCOMPARE(*remoteRemoteData, data);
|
||||
|
||||
qDebug() << "Elapsed time:" << timer.elapsed() << "ms.";
|
||||
}
|
||||
|
||||
void FileSystemAccessTest::testBlockingTransfer_data()
|
||||
{
|
||||
testFileStreamer_data();
|
||||
|
@@ -31,6 +31,8 @@ private slots:
|
||||
void testFileTransfer();
|
||||
void testFileStreamer_data();
|
||||
void testFileStreamer();
|
||||
void testFileStreamerManager_data();
|
||||
void testFileStreamerManager();
|
||||
void testBlockingTransfer_data();
|
||||
void testBlockingTransfer();
|
||||
|
||||
|
Reference in New Issue
Block a user