Trk: Use a separate tread for writing, polish interface & trklauncher

Introduce writer thread, fix message & verbose handling.
This commit is contained in:
Friedemann Kleint
2009-09-22 17:19:56 +02:00
parent bf74d21d6c
commit 454b3bc48e
4 changed files with 241 additions and 119 deletions

View File

@@ -138,7 +138,7 @@ bool Launcher::startServer(QString *errorMessage)
void Launcher::setVerbose(int v) void Launcher::setVerbose(int v)
{ {
d->m_verbose = v; d->m_verbose = v;
d->m_device.setVerbose(v > 1); d->m_device.setVerbose(v);
} }
void Launcher::installAndRun() void Launcher::installAndRun()

View File

@@ -35,6 +35,10 @@
#include <QtCore/QQueue> #include <QtCore/QQueue>
#include <QtCore/QHash> #include <QtCore/QHash>
#include <QtCore/QMap> #include <QtCore/QMap>
#include <QtCore/QThread>
#include <QtCore/QMutex>
#include <QtCore/QWaitCondition>
#include <QtCore/QSharedPointer>
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
# include <windows.h> # include <windows.h>
@@ -90,7 +94,7 @@ BOOL WINAPI TryReadFile(HANDLE hFile,
if (comStat.cbInQue == 0) { if (comStat.cbInQue == 0) {
*lpNumberOfBytesRead = 0; *lpNumberOfBytesRead = 0;
return FALSE; return FALSE;
} }
return ReadFile(hFile, return ReadFile(hFile,
lpBuffer, lpBuffer,
qMin(comStat.cbInQue, nNumberOfBytesToRead), qMin(comStat.cbInQue, nNumberOfBytesToRead),
@@ -128,6 +132,7 @@ TrkMessage::TrkMessage(byte c, byte t, TrkCallback cb) :
{ {
} }
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
// //
// TrkWriteQueue // TrkWriteQueue
@@ -252,6 +257,162 @@ void TrkWriteQueue::queueTrkInitialPing()
m_trkWriteQueue.append(TrkMessage(0, 0)); m_trkWriteQueue.append(TrkMessage(0, 0));
} }
///////////////////////////////////////////////////////////////////////
//
// DeviceContext to be shared between threads
//
///////////////////////////////////////////////////////////////////////
struct DeviceContext {
DeviceContext();
#ifdef Q_OS_WIN
HANDLE device;
#else
QFile file;
#endif
bool serialFrame;
};
DeviceContext::DeviceContext() :
#ifdef Q_OS_WIN
device(INVALID_HANDLE_VALUE),
#endif
serialFrame(true)
{
}
///////////////////////////////////////////////////////////////////////
//
// TrkWriterThread: A thread operating a TrkWriteQueue.
//
///////////////////////////////////////////////////////////////////////
class WriterThread : public QThread {
Q_OBJECT
Q_DISABLE_COPY(WriterThread)
public:
explicit WriterThread(const QSharedPointer<DeviceContext> &context);
// Enqueue messages.
void queueTrkMessage(byte code, TrkCallback callback,
const QByteArray &data, const QVariant &cookie);
void queueTrkInitialPing();
// Call this from the device read notification with the results.
void slotHandleResult(const TrkResult &result);
virtual void run();
signals:
void error(const QString &);
public slots:
bool trkWriteRawMessage(const TrkMessage &msg);
void terminate();
void tryWrite();
private:
bool write(const QByteArray &data, QString *errorMessage);
const QSharedPointer<DeviceContext> m_context;
QMutex m_dataMutex;
QMutex m_waitMutex;
QWaitCondition m_waitCondition;
TrkWriteQueue m_queue;
bool m_terminate;
};
WriterThread::WriterThread(const QSharedPointer<DeviceContext> &context) :
m_context(context),
m_terminate(false)
{
}
void WriterThread::run()
{
while (true) {
// Wait. Use a timeout in case something is already queued before we
// start up or some weird hanging exit condition
m_waitMutex.lock();
m_waitCondition.wait(&m_waitMutex, 100);
m_waitMutex.unlock();
if (m_terminate)
break;
// Send off message
m_dataMutex.lock();
TrkMessage message;
if (m_queue.pendingMessage(&message)) {
const bool success = trkWriteRawMessage(message);
m_queue.notifyWriteResult(success);
}
m_dataMutex.unlock();
}
}
void WriterThread::terminate()
{
m_terminate = true;
m_waitCondition.wakeAll();
wait();
}
bool WriterThread::write(const QByteArray &data, QString *errorMessage)
{
#ifdef Q_OS_WIN
DWORD charsWritten;
if (!WriteFile(m_context->device, data.data(), data.size(), &charsWritten, NULL)) {
*errorMessage = QString::fromLatin1("Error writing data: %1").arg(winErrorMessage(GetLastError()));
return false;
}
FlushFileBuffers(m_context->device);
return true;
#else
if (m_context->file.write(data) == -1 || !m_context->file.flush()) {
*errorMessage = QString::fromLatin1("Cannot write: %1").arg(m_context->file.errorString());
return false;
}
return true;
#endif
}
bool WriterThread::trkWriteRawMessage(const TrkMessage &msg)
{
const QByteArray ba = frameMessage(msg.code, msg.token, msg.data, m_context->serialFrame);
QString errorMessage;
const bool rc = write(ba, &errorMessage);
if (!rc)
emit error(errorMessage);
return rc;
}
void WriterThread::tryWrite()
{
m_waitCondition.wakeAll();
}
void WriterThread::queueTrkMessage(byte code, TrkCallback callback,
const QByteArray &data, const QVariant &cookie)
{
m_dataMutex.lock();
m_queue.queueTrkMessage(code, callback, data, cookie);
m_dataMutex.unlock();
tryWrite();
}
void WriterThread::queueTrkInitialPing()
{
m_dataMutex.lock();
m_queue.queueTrkInitialPing();
m_dataMutex.unlock();
tryWrite();
}
// Call this from the device read notification with the results.
void WriterThread::slotHandleResult(const TrkResult &result)
{
m_queue.slotHandleResult(result);
tryWrite(); // Have messages been enqueued in-between?
}
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
// //
@@ -263,18 +424,12 @@ struct TrkDevicePrivate
{ {
TrkDevicePrivate(); TrkDevicePrivate();
TrkWriteQueue queue; QSharedPointer<DeviceContext> deviceContext;
#ifdef Q_OS_WIN QSharedPointer<WriterThread> writerThread;
HANDLE hdevice;
#else
QFile file;
#endif
QByteArray trkReadBuffer; QByteArray trkReadBuffer;
bool m_trkWriteBusy;
int timerId; int timerId;
bool serialFrame; int verbose;
bool verbose;
QString errorString; QString errorString;
}; };
@@ -285,13 +440,9 @@ struct TrkDevicePrivate
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
TrkDevicePrivate::TrkDevicePrivate() : TrkDevicePrivate::TrkDevicePrivate() :
#ifdef Q_OS_WIN deviceContext(new DeviceContext),
hdevice(INVALID_HANDLE_VALUE),
#endif
m_trkWriteBusy(false),
timerId(-1), timerId(-1),
serialFrame(true), verbose(0)
verbose(false)
{ {
} }
@@ -316,7 +467,7 @@ bool TrkDevice::open(const QString &port, QString *errorMessage)
{ {
close(); close();
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
d->hdevice = CreateFile(port.toStdWString().c_str(), d->deviceContext->device = CreateFile(port.toStdWString().c_str(),
GENERIC_READ | GENERIC_WRITE, GENERIC_READ | GENERIC_WRITE,
0, 0,
NULL, NULL,
@@ -324,21 +475,19 @@ bool TrkDevice::open(const QString &port, QString *errorMessage)
FILE_ATTRIBUTE_NORMAL, FILE_ATTRIBUTE_NORMAL,
NULL); NULL);
if (INVALID_HANDLE_VALUE == d->hdevice) { if (INVALID_HANDLE_VALUE == d->deviceContext->device) {
*errorMessage = QString::fromLatin1("Could not open device '%1': %2").arg(port, winErrorMessage(GetLastError())); *errorMessage = QString::fromLatin1("Could not open device '%1': %2").arg(port, winErrorMessage(GetLastError()));
return false; return false;
} }
d->timerId = startTimer(TimerInterval);
return true;
#else #else
d->file.setFileName(port); d->deviceContext->file.setFileName(port);
if (!d->file.open(QIODevice::ReadWrite|QIODevice::Unbuffered)) { if (!d->deviceContext->file.open(QIODevice::ReadWrite|QIODevice::Unbuffered)) {
*errorMessage = QString::fromLatin1("Cannot open %1: %2").arg(port, d->file.errorString()); *errorMessage = QString::fromLatin1("Cannot open %1: %2").arg(port, d->deviceContext->file.errorString());
return false; return false;
} }
struct termios termInfo; struct termios termInfo;
if (tcgetattr(d->file.handle(), &termInfo) < 0) { if (tcgetattr(d->deviceContext->file.handle(), &termInfo) < 0) {
*errorMessage = QString::fromLatin1("Unable to retrieve terminal settings: %1 %2").arg(errno).arg(QString::fromAscii(strerror(errno))); *errorMessage = QString::fromLatin1("Unable to retrieve terminal settings: %1 %2").arg(errno).arg(QString::fromAscii(strerror(errno)));
return false; return false;
} }
@@ -353,13 +502,19 @@ bool TrkDevice::open(const QString &port, QString *errorMessage)
termInfo.c_cc[VSTART] = _POSIX_VDISABLE; termInfo.c_cc[VSTART] = _POSIX_VDISABLE;
termInfo.c_cc[VSTOP] = _POSIX_VDISABLE; termInfo.c_cc[VSTOP] = _POSIX_VDISABLE;
termInfo.c_cc[VSUSP] = _POSIX_VDISABLE; termInfo.c_cc[VSUSP] = _POSIX_VDISABLE;
if (tcsetattr(d->file.handle(), TCSAFLUSH, &termInfo) < 0) { if (tcsetattr(d->deviceContext->file.handle(), TCSAFLUSH, &termInfo) < 0) {
*errorMessage = QString::fromLatin1("Unable to apply terminal settings: %1 %2").arg(errno).arg(QString::fromAscii(strerror(errno))); *errorMessage = QString::fromLatin1("Unable to apply terminal settings: %1 %2").arg(errno).arg(QString::fromAscii(strerror(errno)));
return false; return false;
} }
d->timerId = startTimer(TimerInterval);
return true;
#endif #endif
d->timerId = startTimer(TimerInterval);
d->writerThread = QSharedPointer<WriterThread>(new WriterThread(d->deviceContext));
connect(d->writerThread.data(), SIGNAL(error(QString)), this, SIGNAL(error(QString)),
Qt::QueuedConnection);
d->writerThread->start();
if (d->verbose)
qDebug() << "Opened" << port;
return true;
} }
void TrkDevice::close() void TrkDevice::close()
@@ -371,21 +526,22 @@ void TrkDevice::close()
d->timerId = -1; d->timerId = -1;
} }
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
CloseHandle(d->hdevice); CloseHandle(d->deviceContext->device);
d->hdevice = INVALID_HANDLE_VALUE; d->deviceContext->device = INVALID_HANDLE_VALUE;
#else #else
d->file.close(); d->deviceContext->file.close();
#endif #endif
if (verbose()) d->writerThread->terminate();
logMessage("Close"); if (d->verbose)
emitLogMessage("Close");
} }
bool TrkDevice::isOpen() const bool TrkDevice::isOpen() const
{ {
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
return d->hdevice != INVALID_HANDLE_VALUE; return d->deviceContext->device != INVALID_HANDLE_VALUE;
#else #else
return d->file.isOpen(); return d->deviceContext->file.isOpen();
#endif #endif
} }
@@ -396,43 +552,24 @@ QString TrkDevice::errorString() const
bool TrkDevice::serialFrame() const bool TrkDevice::serialFrame() const
{ {
return d->serialFrame; return d->deviceContext->serialFrame;
} }
void TrkDevice::setSerialFrame(bool f) void TrkDevice::setSerialFrame(bool f)
{ {
d->serialFrame = f; d->deviceContext->serialFrame = f;
} }
bool TrkDevice::verbose() const int TrkDevice::verbose() const
{ {
return true || d->verbose; return d->verbose;
} }
void TrkDevice::setVerbose(bool b) void TrkDevice::setVerbose(int b)
{ {
d->verbose = b; d->verbose = b;
} }
bool TrkDevice::write(const QByteArray &data, QString *errorMessage)
{
#ifdef Q_OS_WIN
DWORD charsWritten;
if (!WriteFile(d->hdevice, data.data(), data.size(), &charsWritten, NULL)) {
*errorMessage = QString::fromLatin1("Error writing data: %1").arg(winErrorMessage(GetLastError()));
return false;
}
FlushFileBuffers(d->hdevice);
return true;
#else
if (d->file.write(data) == -1 || !d->file.flush()) {
*errorMessage = QString::fromLatin1("Cannot write: %1").arg(d->file.errorString());
return false;
}
return true;
#endif
}
#ifndef Q_OS_WIN #ifndef Q_OS_WIN
static inline int bytesAvailable(int fileNo) static inline int bytesAvailable(int fileNo)
{ {
@@ -452,31 +589,31 @@ void TrkDevice::tryTrkRead()
DWORD charsRead; DWORD charsRead;
DWORD totalCharsRead = 0; DWORD totalCharsRead = 0;
while (TryReadFile(d->hdevice, buffer, BUFFERSIZE, &charsRead, NULL)) { while (TryReadFile(d->deviceContext->device, buffer, BUFFERSIZE, &charsRead, NULL)) {
totalCharsRead += charsRead; totalCharsRead += charsRead;
d->trkReadBuffer.append(buffer, charsRead); d->trkReadBuffer.append(buffer, charsRead);
if (isValidTrkResult(d->trkReadBuffer, d->serialFrame)) if (isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame))
break; break;
} }
if (verbose() && totalCharsRead) if (d->verbose > 1 && totalCharsRead)
logMessage("Read" + d->trkReadBuffer.toHex()); emitLogMessage("Read" + d->trkReadBuffer.toHex());
if (!totalCharsRead) if (!totalCharsRead)
return; return;
const ushort len = isValidTrkResult(d->trkReadBuffer, d->serialFrame); const ushort len = isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame);
if (!len) { if (!len) {
const QString msg = QString::fromLatin1("Partial message: %1").arg(stringFromArray(d->trkReadBuffer)); const QString msg = QString::fromLatin1("Partial message: %1").arg(stringFromArray(d->trkReadBuffer));
emitError(msg); emitError(msg);
return; return;
} }
#else #else
const int size = bytesAvailable(d->file.handle()); const int size = bytesAvailable(d->deviceContext->file.handle());
if (!size) if (!size)
return; return;
const QByteArray data = d->file.read(size); const QByteArray data = d->deviceContext->file.read(size);
if (verbose()) if (d->verbose > 1)
logMessage("trk: <- " + stringFromArray(data)); emitLogMessage("trk: <- " + stringFromArray(data));
d->trkReadBuffer.append(data); d->trkReadBuffer.append(data);
const ushort len = isValidTrkResult(d->trkReadBuffer, d->serialFrame); const ushort len = isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame);
if (!len) { if (!len) {
if (d->trkReadBuffer.size() > 10) { if (d->trkReadBuffer.size() > 10) {
const QString msg = QString::fromLatin1("Unable to extract message from '%1' '%2'"). const QString msg = QString::fromLatin1("Unable to extract message from '%1' '%2'").
@@ -488,10 +625,10 @@ void TrkDevice::tryTrkRead()
#endif // Q_OS_WIN #endif // Q_OS_WIN
TrkResult r; TrkResult r;
QByteArray rawData; QByteArray rawData;
while (extractResult(&d->trkReadBuffer, d->serialFrame, &r, &rawData)) { while (extractResult(&d->trkReadBuffer, d->deviceContext->serialFrame, &r, &rawData)) {
//if (verbose()) if (d->verbose > 1)
// logMessage("Read TrkResult " + r.data.toHex()); emitLogMessage("Read TrkResult " + r.data.toHex());
d->queue.slotHandleResult(r); d->writerThread->slotHandleResult(r);
emit messageReceived(r); emit messageReceived(r);
if (!rawData.isEmpty()) if (!rawData.isEmpty())
emit rawDataReceived(rawData); emit rawDataReceived(rawData);
@@ -500,7 +637,6 @@ void TrkDevice::tryTrkRead()
void TrkDevice::timerEvent(QTimerEvent *) void TrkDevice::timerEvent(QTimerEvent *)
{ {
tryTrkWrite();
tryTrkRead(); tryTrkRead();
} }
@@ -514,44 +650,35 @@ void TrkDevice::emitError(const QString &s)
void TrkDevice::sendTrkMessage(byte code, TrkCallback callback, void TrkDevice::sendTrkMessage(byte code, TrkCallback callback,
const QByteArray &data, const QVariant &cookie) const QByteArray &data, const QVariant &cookie)
{ {
d->queue.queueTrkMessage(code, callback, data, cookie); if (!d->writerThread.isNull())
d->writerThread->queueTrkMessage(code, callback, data, cookie);
} }
void TrkDevice::sendTrkInitialPing() void TrkDevice::sendTrkInitialPing()
{ {
d->queue.queueTrkInitialPing(); if (!d->writerThread.isNull())
d->writerThread->queueTrkInitialPing();
} }
bool TrkDevice::sendTrkAck(byte token) bool TrkDevice::sendTrkAck(byte token)
{ {
if (d->writerThread.isNull())
return false;
// The acknowledgement must not be queued! // The acknowledgement must not be queued!
TrkMessage msg(0x80, token); TrkMessage msg(0x80, token);
msg.token = token; msg.token = token;
msg.data.append('\0'); msg.data.append('\0');
return trkWriteRawMessage(msg); return d->writerThread->trkWriteRawMessage(msg);
// 01 90 00 07 7e 80 01 00 7d 5e 7e // 01 90 00 07 7e 80 01 00 7d 5e 7e
} }
void TrkDevice::tryTrkWrite() void TrkDevice::emitLogMessage(const QString &msg)
{ {
TrkMessage message; if (d->verbose)
if (!d->queue.pendingMessage(&message)) qDebug("%s\n", qPrintable(msg));
return; emit logMessage(msg);
const bool success = trkWriteRawMessage(message);
d->queue.notifyWriteResult(success);
}
bool TrkDevice::trkWriteRawMessage(const TrkMessage &msg)
{
const QByteArray ba = frameMessage(msg.code, msg.token, msg.data, serialFrame());
if (verbose())
logMessage("trk: -> " + stringFromArray(ba));
QString errorMessage;
const bool rc = write(ba, &errorMessage);
if (!rc)
emitError(errorMessage);
return rc;
} }
} // namespace trk } // namespace trk
#include "trkdevice.moc"

View File

@@ -52,7 +52,7 @@ struct TrkDevicePrivate;
* read operation. * read operation.
* The serialFrames property specifies whether packets are encapsulated in * The serialFrames property specifies whether packets are encapsulated in
* "0x90 <length>" frames, which is currently the case for serial ports. * "0x90 <length>" frames, which is currently the case for serial ports.
* Contains write message queue allowing * Contains a write message queue allowing
* for queueing messages with a notification callback. If the message receives * for queueing messages with a notification callback. If the message receives
* an ACK, the callback is invoked. * an ACK, the callback is invoked.
* The special message TRK_WRITE_QUEUE_NOOP_CODE code can be used for synchronisation. * The special message TRK_WRITE_QUEUE_NOOP_CODE code can be used for synchronisation.
@@ -80,24 +80,8 @@ public:
bool serialFrame() const; bool serialFrame() const;
void setSerialFrame(bool f); void setSerialFrame(bool f);
bool verbose() const; int verbose() const;
void setVerbose(bool b); void setVerbose(int b);
bool write(const QByteArray &data, QString *errorMessage);
signals:
void messageReceived(const trk::TrkResult &result);
// Emitted with the contents of messages enclosed in 07e, not for log output
void rawDataReceived(const QByteArray &data);
void error(const QString &msg);
void logMessage(const QString &msg);
protected:
void emitError(const QString &msg);
virtual void timerEvent(QTimerEvent *ev);
public:
void tryTrkRead();
// Enqueue a message with a notification callback. // Enqueue a message with a notification callback.
void sendTrkMessage(unsigned char code, void sendTrkMessage(unsigned char code,
@@ -111,10 +95,21 @@ public:
// Send an Ack synchronously, bypassing the queue // Send an Ack synchronously, bypassing the queue
bool sendTrkAck(unsigned char token); bool sendTrkAck(unsigned char token);
private: void tryTrkRead(); // TODO: Why public?
void tryTrkWrite();
bool trkWriteRawMessage(const TrkMessage &msg);
signals:
void messageReceived(const trk::TrkResult &result);
// Emitted with the contents of messages enclosed in 07e, not for log output
void rawDataReceived(const QByteArray &data);
void error(const QString &msg);
void logMessage(const QString &msg);
protected:
void emitError(const QString &msg);
void emitLogMessage(const QString &msg);
virtual void timerEvent(QTimerEvent *ev);
private:
TrkDevicePrivate *d; TrkDevicePrivate *d;
}; };

View File

@@ -13,9 +13,9 @@ static const char *usageC =
"\nRemote launch:\n" "\nRemote launch:\n"
"%1 COM5 C:\\sys\\bin\\test.exe\n" "%1 COM5 C:\\sys\\bin\\test.exe\n"
"\nInstallation and remote launch:\n" "\nInstallation and remote launch:\n"
"%1 COM5 -i C:\\Data\\test_gcce_udeb.sisx C:\\sys\\bin\\test.exe\n" "%1 -i COM5 C:\\Data\\test_gcce_udeb.sisx C:\\sys\\bin\\test.exe\n"
"\nCopy from local file, installation and remote launch:\n" "\nCopy from local file, installation and remote launch:\n"
"%1 COM5 -I C:\\Projects\\test\\test_gcce_udeb.sisx C:\\Data\\test_gcce_udeb.sisx C:\\sys\\bin\\test.exe\n"; "%1 -I COM5 C:\\Projects\\test\\test_gcce_udeb.sisx C:\\Data\\test_gcce_udeb.sisx C:\\sys\\bin\\test.exe\n";
static void usage() static void usage()
{ {