Trk: Make work with ultra-fast devices, fix race condition

Insert message into the WriteQueue's map storing the messages
before writing it out in case the reader thread gets the
answer before WriteQueue::notifyResult is called.
This commit is contained in:
Friedemann Kleint
2009-09-28 12:41:13 +02:00
parent 6bd55be672
commit 92bfcf4335

View File

@@ -160,7 +160,12 @@ public:
PendingMessageResult pendingMessage(TrkMessage *message); PendingMessageResult pendingMessage(TrkMessage *message);
// Notify the queue about the success of the write operation // Notify the queue about the success of the write operation
// after taking the pendingMessage off. // after taking the pendingMessage off.
void notifyWriteResult(bool ok); enum WriteResult {
WriteOk,
WriteFailedDiscard, // Discard failed message
WriteFailedKeep, // Keep failed message
};
void notifyWriteResult(WriteResult ok);
// Helper function that invokes the callback of a no-op message // Helper function that invokes the callback of a no-op message
static void invokeNoopMessage(trk::TrkMessage); static void invokeNoopMessage(trk::TrkMessage);
@@ -211,8 +216,11 @@ TrkWriteQueue::PendingMessageResult TrkWriteQueue::pendingMessage(TrkMessage *me
*message = m_trkWriteQueue.dequeue(); *message = m_trkWriteQueue.dequeue();
return NoopMessageDequeued; return NoopMessageDequeued;
} }
if (message) // Insert into map fir answers (as reading threads might get an
// answer before notifyWriteResult(true)) is called.
*message = m_trkWriteQueue.front(); *message = m_trkWriteQueue.front();
m_writtenTrkMessages.insert(message->token, *message);
m_trkWriteBusy = true;
return PendingMessage; return PendingMessage;
} }
@@ -226,13 +234,21 @@ void TrkWriteQueue::invokeNoopMessage(trk::TrkMessage noopMessage)
noopMessage.callback(result); noopMessage.callback(result);
} }
void TrkWriteQueue::notifyWriteResult(bool ok) void TrkWriteQueue::notifyWriteResult(WriteResult wr)
{ {
// On success, dequeue message and await result // On success, dequeue message and await result
if (ok) { const byte token = m_trkWriteQueue.front().token;
TrkMessage firstMsg = m_trkWriteQueue.dequeue(); switch (wr) {
m_writtenTrkMessages.insert(firstMsg.token, firstMsg); case WriteOk:
m_trkWriteBusy = true; m_trkWriteQueue.dequeue();
break;
case WriteFailedKeep:
case WriteFailedDiscard:
m_writtenTrkMessages.remove(token);
m_trkWriteBusy = false;
if (wr == WriteFailedDiscard)
m_trkWriteQueue.dequeue();
break;
} }
} }
@@ -329,6 +345,7 @@ private slots:
private: private:
bool write(const QByteArray &data, QString *errorMessage); bool write(const QByteArray &data, QString *errorMessage);
inline int writePendingMessage();
const QSharedPointer<DeviceContext> m_context; const QSharedPointer<DeviceContext> m_context;
QMutex m_dataMutex; QMutex m_dataMutex;
@@ -350,14 +367,20 @@ WriterThread::WriterThread(const QSharedPointer<DeviceContext> &context) :
void WriterThread::run() void WriterThread::run()
{ {
while (true) { while (writePendingMessage() == 0) ;
}
int WriterThread::writePendingMessage()
{
enum { MaxAttempts = 100, RetryIntervalMS = 200 };
// Wait. Use a timeout in case something is already queued before we // Wait. Use a timeout in case something is already queued before we
// start up or some weird hanging exit condition // start up or some weird hanging exit condition
m_waitMutex.lock(); m_waitMutex.lock();
m_waitCondition.wait(&m_waitMutex, 100); m_waitCondition.wait(&m_waitMutex, 100);
m_waitMutex.unlock(); m_waitMutex.unlock();
if (m_terminate) if (m_terminate)
break; return 1;
// Send off message // Send off message
m_dataMutex.lock(); m_dataMutex.lock();
TrkMessage message; TrkMessage message;
@@ -367,9 +390,20 @@ void WriterThread::run()
case TrkWriteQueue::NoMessage: case TrkWriteQueue::NoMessage:
break; break;
case TrkWriteQueue::PendingMessage: { case TrkWriteQueue::PendingMessage: {
const bool success = trkWriteRawMessage(message); // Untested: try to re-send a few times
bool success = false;
for (int r = 0; !success && (r < MaxAttempts); r++) {
success = trkWriteRawMessage(message);
if (!success) {
emit error(QString::fromLatin1("Write failure, attempt %1 of %2.").arg(r).arg(int(MaxAttempts)));
if (m_terminate)
return 1;
QThread::msleep(RetryIntervalMS);
}
}
// Notify queue. If still failed, give up.
m_dataMutex.lock(); m_dataMutex.lock();
m_queue.notifyWriteResult(success); m_queue.notifyWriteResult(success ? TrkWriteQueue::WriteOk : TrkWriteQueue::WriteFailedDiscard);
m_dataMutex.unlock(); m_dataMutex.unlock();
} }
break; break;
@@ -378,7 +412,7 @@ void WriterThread::run()
emit internalNoopMessageDequeued(message); emit internalNoopMessageDequeued(message);
break; break;
} // switch } // switch
} return 0;
} }
void WriterThread::invokeNoopMessage(const trk::TrkMessage &msg) void WriterThread::invokeNoopMessage(const trk::TrkMessage &msg)