Trk: Move message decoding into ReaderThread and emit via signal

... using a DirectBlockingConnection, hopefully fixing
Windows deadlocks.
This commit is contained in:
Friedemann Kleint
2009-09-28 09:48:21 +02:00
parent 6ddc5afd00
commit 724cc50ed5
2 changed files with 84 additions and 48 deletions

View File

@@ -119,6 +119,7 @@ TrkMessage::TrkMessage(byte c, byte t, TrkCallback cb) :
} // namespace trk } // namespace trk
Q_DECLARE_METATYPE(trk::TrkMessage) Q_DECLARE_METATYPE(trk::TrkMessage)
Q_DECLARE_METATYPE(trk::TrkResult)
namespace trk { namespace trk {
@@ -408,7 +409,7 @@ static inline bool overlappedSyncWrite(HANDLE file, const char *data,
bool WriterThread::write(const QByteArray &data, QString *errorMessage) bool WriterThread::write(const QByteArray &data, QString *errorMessage)
{ {
QMutexLocker(&m_context->mutex); QMutexLocker locker(&m_context->mutex);
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
DWORD charsWritten; DWORD charsWritten;
if (!overlappedSyncWrite(m_context->device, data.data(), data.size(), &charsWritten, &m_context->writeOverlapped)) { if (!overlappedSyncWrite(m_context->device, data.data(), data.size(), &charsWritten, &m_context->writeOverlapped)) {
@@ -465,6 +466,65 @@ void WriterThread::slotHandleResult(const TrkResult &result)
tryWrite(); // Have messages been enqueued in-between? tryWrite(); // Have messages been enqueued in-between?
} }
///////////////////////////////////////////////////////////////////////
//
// ReaderThreadBase: Base class for a thread that reads data from
// the device, decodes the messages and emit signals for the messages.
// A Qt::BlockingQueuedConnection should be used for the message signal
// to ensure messages are processed in the correct sequence.
//
///////////////////////////////////////////////////////////////////////
class ReaderThreadBase : public QThread {
Q_OBJECT
Q_DISABLE_COPY(ReaderThreadBase)
public:
signals:
void messageReceived(const trk::TrkResult &result, const QByteArray &rawData);
protected:
explicit ReaderThreadBase(const QSharedPointer<DeviceContext> &context);
void processData(const QByteArray &a);
void processData(char c);
const QSharedPointer<DeviceContext> m_context;
private:
void readMessages();
QByteArray m_trkReadBuffer;
};
ReaderThreadBase::ReaderThreadBase(const QSharedPointer<DeviceContext> &context) :
m_context(context)
{
static const int trkResultMetaId = qRegisterMetaType<trk::TrkResult>();
Q_UNUSED(trkResultMetaId)
}
void ReaderThreadBase::processData(const QByteArray &a)
{
m_trkReadBuffer += a;
readMessages();
}
void ReaderThreadBase::processData(char c)
{
m_trkReadBuffer += c;
if (m_trkReadBuffer.size() > 1)
readMessages();
}
void ReaderThreadBase::readMessages()
{
TrkResult r;
QByteArray rawData;
while (extractResult(&m_trkReadBuffer, m_context->serialFrame, &r, &rawData)) {
emit messageReceived(r, rawData);
}
}
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
// //
@@ -474,7 +534,7 @@ void WriterThread::slotHandleResult(const TrkResult &result)
// //
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
class WinReaderThread : public QThread { class WinReaderThread : public ReaderThreadBase {
Q_OBJECT Q_OBJECT
Q_DISABLE_COPY(WinReaderThread) Q_DISABLE_COPY(WinReaderThread)
public: public:
@@ -485,8 +545,6 @@ public:
signals: signals:
void error(const QString &); void error(const QString &);
void dataReceived(char c);
void dataReceived(const QByteArray &data);
public slots: public slots:
void terminate(); void terminate();
@@ -496,12 +554,11 @@ private:
inline int tryRead(); inline int tryRead();
const QSharedPointer<DeviceContext> m_context;
HANDLE m_handles[HandleCount]; HANDLE m_handles[HandleCount];
}; };
WinReaderThread::WinReaderThread(const QSharedPointer<DeviceContext> &context) : WinReaderThread::WinReaderThread(const QSharedPointer<DeviceContext> &context) :
m_context(context) ReaderThreadBase(context)
{ {
m_handles[FileHandle] = NULL; m_handles[FileHandle] = NULL;
m_handles[TerminateEventHandle] = CreateEvent(NULL, FALSE, FALSE, NULL); m_handles[TerminateEventHandle] = CreateEvent(NULL, FALSE, FALSE, NULL);
@@ -528,9 +585,9 @@ int WinReaderThread::tryRead()
DWORD bytesRead = 0; DWORD bytesRead = 0;
if (ReadFile(m_context->device, &buffer, bytesToRead, &bytesRead, &m_context->readOverlapped)) { if (ReadFile(m_context->device, &buffer, bytesToRead, &bytesRead, &m_context->readOverlapped)) {
if (bytesRead == 1) { if (bytesRead == 1) {
emit dataReceived(buffer[0]); processData(buffer[0]);
} else { } else {
emit dataReceived(QByteArray(buffer, bytesRead)); processData(QByteArray(buffer, bytesRead));
} }
return 0; return 0;
} }
@@ -554,9 +611,9 @@ int WinReaderThread::tryRead()
return -3; return -3;
} }
if (bytesRead == 1) { if (bytesRead == 1) {
emit dataReceived(buffer[0]); processData(buffer[0]);
} else { } else {
emit dataReceived(QByteArray(buffer, bytesRead)); processData(QByteArray(buffer, bytesRead));
} }
return 0; return 0;
} }
@@ -593,7 +650,7 @@ static inline QString msgUnixCallFailedErrno(const char *func, int errorNumber)
return QString::fromLatin1("Call to %1() failed: %2").arg(QLatin1String(func), QString::fromLocal8Bit(strerror(errorNumber))); return QString::fromLatin1("Call to %1() failed: %2").arg(QLatin1String(func), QString::fromLocal8Bit(strerror(errorNumber)));
} }
class UnixReaderThread : public QThread { class UnixReaderThread : public ReaderThreadBase {
Q_OBJECT Q_OBJECT
Q_DISABLE_COPY(UnixReaderThread) Q_DISABLE_COPY(UnixReaderThread)
public: public:
@@ -604,7 +661,6 @@ public:
signals: signals:
void error(const QString &); void error(const QString &);
void dataReceived(const QByteArray &);
public slots: public slots:
void terminate(); void terminate();
@@ -612,12 +668,11 @@ public slots:
private: private:
inline int tryRead(); inline int tryRead();
const QSharedPointer<DeviceContext> m_context;
int m_terminatePipeFileDescriptors[2]; int m_terminatePipeFileDescriptors[2];
}; };
UnixReaderThread::UnixReaderThread(const QSharedPointer<DeviceContext> &context) : UnixReaderThread::UnixReaderThread(const QSharedPointer<DeviceContext> &context) :
m_context(context) ReaderThreadBase(context)
{ {
m_terminatePipeFileDescriptors[0] = m_terminatePipeFileDescriptors[1] = -1; m_terminatePipeFileDescriptors[0] = m_terminatePipeFileDescriptors[1] = -1;
// Set up pipes for termination. Should not fail // Set up pipes for termination. Should not fail
@@ -675,7 +730,7 @@ int UnixReaderThread::tryRead()
m_context->mutex.lock(); m_context->mutex.lock();
const QByteArray data = m_context->file.read(numBytes); const QByteArray data = m_context->file.read(numBytes);
m_context->mutex.unlock(); m_context->mutex.unlock();
emit dataReceived(data); processData(data);
return 0; return 0;
} }
@@ -801,12 +856,9 @@ bool TrkDevice::open(const QString &port, QString *errorMessage)
d->readerThread = QSharedPointer<ReaderThread>(new ReaderThread(d->deviceContext)); d->readerThread = QSharedPointer<ReaderThread>(new ReaderThread(d->deviceContext));
connect(d->readerThread.data(), SIGNAL(error(QString)), this, SLOT(emitError(QString)), connect(d->readerThread.data(), SIGNAL(error(QString)), this, SLOT(emitError(QString)),
Qt::QueuedConnection); Qt::QueuedConnection);
#ifdef Q_OS_WIN connect(d->readerThread.data(), SIGNAL(messageReceived(trk::TrkResult,QByteArray)),
connect(d->readerThread.data(), SIGNAL(dataReceived(char)), this, SLOT(slotMessageReceived(trk::TrkResult,QByteArray)),
this, SLOT(dataReceived(char)), Qt::QueuedConnection); Qt::BlockingQueuedConnection);
#endif
connect(d->readerThread.data(), SIGNAL(dataReceived(QByteArray)),
this, SLOT(dataReceived(QByteArray)), Qt::QueuedConnection);
d->readerThread->start(); d->readerThread->start();
d->writerThread = QSharedPointer<WriterThread>(new WriterThread(d->deviceContext)); d->writerThread = QSharedPointer<WriterThread>(new WriterThread(d->deviceContext));
@@ -872,31 +924,13 @@ void TrkDevice::setVerbose(int b)
d->verbose = b; d->verbose = b;
} }
void TrkDevice::dataReceived(char c) void TrkDevice::slotMessageReceived(const trk::TrkResult &result, const QByteArray &rawData)
{ {
d->trkReadBuffer += c; d->writerThread->slotHandleResult(result);
readMessages(); emit messageReceived(result);
}
void TrkDevice::dataReceived(const QByteArray &data)
{
d->trkReadBuffer += data;
readMessages();
}
void TrkDevice::readMessages()
{
TrkResult r;
QByteArray rawData;
while (extractResult(&d->trkReadBuffer, d->deviceContext->serialFrame, &r, &rawData)) {
if (d->verbose > 1)
emitLogMessage("Read TrkResult " + r.data.toHex());
d->writerThread->slotHandleResult(r);
emit messageReceived(r);
if (!rawData.isEmpty()) if (!rawData.isEmpty())
emit rawDataReceived(rawData); emit rawDataReceived(rawData);
} }
}
void TrkDevice::emitError(const QString &s) void TrkDevice::emitError(const QString &s)
{ {
@@ -908,9 +942,12 @@ void TrkDevice::emitError(const QString &s)
void TrkDevice::sendTrkMessage(byte code, TrkCallback callback, void TrkDevice::sendTrkMessage(byte code, TrkCallback callback,
const QByteArray &data, const QVariant &cookie) const QByteArray &data, const QVariant &cookie)
{ {
if (!d->writerThread.isNull()) if (!d->writerThread.isNull()) {
if (d->verbose > 1)
qDebug() << "Sending " << code << data.toHex();
d->writerThread->queueTrkMessage(code, callback, data, cookie); d->writerThread->queueTrkMessage(code, callback, data, cookie);
} }
}
void TrkDevice::sendTrkInitialPing() void TrkDevice::sendTrkInitialPing()
{ {

View File

@@ -105,8 +105,7 @@ signals:
void logMessage(const QString &msg); void logMessage(const QString &msg);
private slots: private slots:
void dataReceived(char c); void slotMessageReceived(const trk::TrkResult &result, const QByteArray &a);
void dataReceived(const QByteArray &a);
protected slots: protected slots:
void emitError(const QString &msg); void emitError(const QString &msg);