From 92bfcf433504cdf8aa04718bc718cbc0284acbf0 Mon Sep 17 00:00:00 2001 From: Friedemann Kleint Date: Mon, 28 Sep 2009 12:41:13 +0200 Subject: [PATCH] Trk: Make work with ultra-fast devices, fix race condition Insert message into the WriteQueue's map storing the messages before writing it out in case the reader thread gets the answer before WriteQueue::notifyResult is called. --- src/shared/trk/trkdevice.cpp | 102 +++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 34 deletions(-) diff --git a/src/shared/trk/trkdevice.cpp b/src/shared/trk/trkdevice.cpp index 775b7727960..25f35f33d9d 100644 --- a/src/shared/trk/trkdevice.cpp +++ b/src/shared/trk/trkdevice.cpp @@ -160,7 +160,12 @@ public: PendingMessageResult pendingMessage(TrkMessage *message); // Notify the queue about the success of the write operation // after taking the pendingMessage off. - void notifyWriteResult(bool ok); + enum WriteResult { + WriteOk, + WriteFailedDiscard, // Discard failed message + WriteFailedKeep, // Keep failed message + }; + void notifyWriteResult(WriteResult ok); // Helper function that invokes the callback of a no-op message static void invokeNoopMessage(trk::TrkMessage); @@ -211,8 +216,11 @@ TrkWriteQueue::PendingMessageResult TrkWriteQueue::pendingMessage(TrkMessage *me *message = m_trkWriteQueue.dequeue(); return NoopMessageDequeued; } - if (message) - *message = m_trkWriteQueue.front(); + // Insert into map fir answers (as reading threads might get an + // answer before notifyWriteResult(true)) is called. + *message = m_trkWriteQueue.front(); + m_writtenTrkMessages.insert(message->token, *message); + m_trkWriteBusy = true; return PendingMessage; } @@ -226,13 +234,21 @@ void TrkWriteQueue::invokeNoopMessage(trk::TrkMessage noopMessage) noopMessage.callback(result); } -void TrkWriteQueue::notifyWriteResult(bool ok) +void TrkWriteQueue::notifyWriteResult(WriteResult wr) { // On success, dequeue message and await result - if (ok) { - TrkMessage firstMsg = m_trkWriteQueue.dequeue(); - m_writtenTrkMessages.insert(firstMsg.token, firstMsg); - m_trkWriteBusy = true; + const byte token = m_trkWriteQueue.front().token; + switch (wr) { + case WriteOk: + m_trkWriteQueue.dequeue(); + break; + case WriteFailedKeep: + case WriteFailedDiscard: + m_writtenTrkMessages.remove(token); + m_trkWriteBusy = false; + if (wr == WriteFailedDiscard) + m_trkWriteQueue.dequeue(); + break; } } @@ -329,6 +345,7 @@ private slots: private: bool write(const QByteArray &data, QString *errorMessage); + inline int writePendingMessage(); const QSharedPointer m_context; QMutex m_dataMutex; @@ -350,35 +367,52 @@ WriterThread::WriterThread(const QSharedPointer &context) : void WriterThread::run() { - while (true) { - // Wait. Use a timeout in case something is already queued before we - // start up or some weird hanging exit condition - m_waitMutex.lock(); - m_waitCondition.wait(&m_waitMutex, 100); - m_waitMutex.unlock(); - if (m_terminate) - break; - // Send off message - m_dataMutex.lock(); - TrkMessage message; - const TrkWriteQueue::PendingMessageResult pr = m_queue.pendingMessage(&message); - m_dataMutex.unlock(); - switch (pr) { - case TrkWriteQueue::NoMessage: - break; - case TrkWriteQueue::PendingMessage: { - const bool success = trkWriteRawMessage(message); + while (writePendingMessage() == 0) ; +} + +int WriterThread::writePendingMessage() +{ + enum { MaxAttempts = 100, RetryIntervalMS = 200 }; + + // Wait. Use a timeout in case something is already queued before we + // start up or some weird hanging exit condition + m_waitMutex.lock(); + m_waitCondition.wait(&m_waitMutex, 100); + m_waitMutex.unlock(); + if (m_terminate) + return 1; + // Send off message + m_dataMutex.lock(); + TrkMessage message; + const TrkWriteQueue::PendingMessageResult pr = m_queue.pendingMessage(&message); + m_dataMutex.unlock(); + switch (pr) { + case TrkWriteQueue::NoMessage: + break; + case TrkWriteQueue::PendingMessage: { + // Untested: try to re-send a few times + bool success = false; + for (int r = 0; !success && (r < MaxAttempts); r++) { + success = trkWriteRawMessage(message); + if (!success) { + emit error(QString::fromLatin1("Write failure, attempt %1 of %2.").arg(r).arg(int(MaxAttempts))); + if (m_terminate) + return 1; + QThread::msleep(RetryIntervalMS); + } + } + // Notify queue. If still failed, give up. m_dataMutex.lock(); - m_queue.notifyWriteResult(success); + m_queue.notifyWriteResult(success ? TrkWriteQueue::WriteOk : TrkWriteQueue::WriteFailedDiscard); m_dataMutex.unlock(); } - break; - case TrkWriteQueue::NoopMessageDequeued: - // Sync with thread that owns us via a blocking signal - emit internalNoopMessageDequeued(message); - break; - } // switch - } + break; + case TrkWriteQueue::NoopMessageDequeued: + // Sync with thread that owns us via a blocking signal + emit internalNoopMessageDequeued(message); + break; + } // switch + return 0; } void WriterThread::invokeNoopMessage(const trk::TrkMessage &msg)