Trk: Refined mutexes.

This commit is contained in:
Robert Loehning
2009-10-08 18:16:47 +02:00
parent 7560467523
commit 8d9ccaff01

View File

@@ -179,6 +179,7 @@ private:
QQueue<TrkMessage> m_trkWriteQueue; QQueue<TrkMessage> m_trkWriteQueue;
TokenMessageMap m_writtenTrkMessages; TokenMessageMap m_writtenTrkMessages;
bool m_trkWriteBusy; bool m_trkWriteBusy;
QMutex m_dataMutex;
}; };
TrkWriteQueue::TrkWriteQueue() : TrkWriteQueue::TrkWriteQueue() :
@@ -198,6 +199,7 @@ byte TrkWriteQueue::nextTrkWriteToken()
void TrkWriteQueue::queueTrkMessage(byte code, TrkCallback callback, void TrkWriteQueue::queueTrkMessage(byte code, TrkCallback callback,
const QByteArray &data, const QVariant &cookie) const QByteArray &data, const QVariant &cookie)
{ {
QMutexLocker locker(&m_dataMutex);
const byte token = code == TRK_WRITE_QUEUE_NOOP_CODE ? const byte token = code == TRK_WRITE_QUEUE_NOOP_CODE ?
byte(0) : nextTrkWriteToken(); byte(0) : nextTrkWriteToken();
TrkMessage msg(code, token, callback); TrkMessage msg(code, token, callback);
@@ -208,6 +210,7 @@ void TrkWriteQueue::queueTrkMessage(byte code, TrkCallback callback,
TrkWriteQueue::PendingMessageResult TrkWriteQueue::pendingMessage(TrkMessage *message) TrkWriteQueue::PendingMessageResult TrkWriteQueue::pendingMessage(TrkMessage *message)
{ {
QMutexLocker locker(&m_dataMutex);
// Invoked from timer, try to flush out message queue // Invoked from timer, try to flush out message queue
if (m_trkWriteBusy || m_trkWriteQueue.isEmpty()) if (m_trkWriteBusy || m_trkWriteQueue.isEmpty())
return NoMessage; return NoMessage;
@@ -236,6 +239,7 @@ void TrkWriteQueue::invokeNoopMessage(trk::TrkMessage noopMessage)
void TrkWriteQueue::notifyWriteResult(WriteResult wr) void TrkWriteQueue::notifyWriteResult(WriteResult wr)
{ {
QMutexLocker locker(&m_dataMutex);
// On success, dequeue message and await result // On success, dequeue message and await result
const byte token = m_trkWriteQueue.front().token; const byte token = m_trkWriteQueue.front().token;
switch (wr) { switch (wr) {
@@ -254,6 +258,7 @@ void TrkWriteQueue::notifyWriteResult(WriteResult wr)
void TrkWriteQueue::slotHandleResult(const TrkResult &result) void TrkWriteQueue::slotHandleResult(const TrkResult &result)
{ {
QMutexLocker locker(&m_dataMutex);
m_trkWriteBusy = false; m_trkWriteBusy = false;
//if (result.code != TrkNotifyAck && result.code != TrkNotifyNak) //if (result.code != TrkNotifyAck && result.code != TrkNotifyNak)
// return; // return;
@@ -262,18 +267,22 @@ void TrkWriteQueue::slotHandleResult(const TrkResult &result)
const TokenMessageMap::iterator it = m_writtenTrkMessages.find(result.token); const TokenMessageMap::iterator it = m_writtenTrkMessages.find(result.token);
if (it == m_writtenTrkMessages.end()) if (it == m_writtenTrkMessages.end())
return; return;
const bool invokeCB = it.value().callback; TrkCallback callback = it.value().callback;
if (invokeCB) { if (callback) {
TrkResult result1 = result; TrkResult result1 = result;
result1.cookie = it.value().cookie; result1.cookie = it.value().cookie;
it.value().callback(result1); m_writtenTrkMessages.erase(it);
locker.unlock();
callback(result1);
} else {
m_writtenTrkMessages.erase(it);
} }
m_writtenTrkMessages.erase(it);
} }
void TrkWriteQueue::queueTrkInitialPing() void TrkWriteQueue::queueTrkInitialPing()
{ {
// Ping, reset sequence count // Ping, reset sequence count
QMutexLocker locker(&m_dataMutex);
m_trkWriteToken = 0; m_trkWriteToken = 0;
m_trkWriteQueue.append(TrkMessage(0, 0)); m_trkWriteQueue.append(TrkMessage(0, 0));
} }
@@ -348,7 +357,6 @@ private:
inline int writePendingMessage(); inline int writePendingMessage();
const QSharedPointer<DeviceContext> m_context; const QSharedPointer<DeviceContext> m_context;
QMutex m_dataMutex;
QMutex m_waitMutex; QMutex m_waitMutex;
QWaitCondition m_waitCondition; QWaitCondition m_waitCondition;
TrkWriteQueue m_queue; TrkWriteQueue m_queue;
@@ -382,10 +390,8 @@ int WriterThread::writePendingMessage()
if (m_terminate) if (m_terminate)
return 1; return 1;
// Send off message // Send off message
m_dataMutex.lock();
TrkMessage message; TrkMessage message;
const TrkWriteQueue::PendingMessageResult pr = m_queue.pendingMessage(&message); const TrkWriteQueue::PendingMessageResult pr = m_queue.pendingMessage(&message);
m_dataMutex.unlock();
switch (pr) { switch (pr) {
case TrkWriteQueue::NoMessage: case TrkWriteQueue::NoMessage:
break; break;
@@ -402,9 +408,7 @@ int WriterThread::writePendingMessage()
} }
} }
// Notify queue. If still failed, give up. // Notify queue. If still failed, give up.
m_dataMutex.lock();
m_queue.notifyWriteResult(success ? TrkWriteQueue::WriteOk : TrkWriteQueue::WriteFailedDiscard); m_queue.notifyWriteResult(success ? TrkWriteQueue::WriteOk : TrkWriteQueue::WriteFailedDiscard);
m_dataMutex.unlock();
} }
break; break;
case TrkWriteQueue::NoopMessageDequeued: case TrkWriteQueue::NoopMessageDequeued:
@@ -479,17 +483,13 @@ void WriterThread::tryWrite()
void WriterThread::queueTrkMessage(byte code, TrkCallback callback, void WriterThread::queueTrkMessage(byte code, TrkCallback callback,
const QByteArray &data, const QVariant &cookie) const QByteArray &data, const QVariant &cookie)
{ {
m_dataMutex.lock();
m_queue.queueTrkMessage(code, callback, data, cookie); m_queue.queueTrkMessage(code, callback, data, cookie);
m_dataMutex.unlock();
tryWrite(); tryWrite();
} }
void WriterThread::queueTrkInitialPing() void WriterThread::queueTrkInitialPing()
{ {
m_dataMutex.lock();
m_queue.queueTrkInitialPing(); m_queue.queueTrkInitialPing();
m_dataMutex.unlock();
tryWrite(); tryWrite();
} }