From bec0d71cc13f1dd6efd13cbcdfea429875a27411 Mon Sep 17 00:00:00 2001 From: Matthew Stickney Date: Thu, 16 Feb 2017 16:24:04 -0500 Subject: [PATCH 1/3] Initialize the stream status to OK in the QIODevice constructor. Without this, the status is an an uninitialize state, which causes the precondition checks to fail on writes. --- src/msgpackstream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/msgpackstream.cpp b/src/msgpackstream.cpp index aee0f59..1a35149 100644 --- a/src/msgpackstream.cpp +++ b/src/msgpackstream.cpp @@ -28,7 +28,7 @@ MsgPackStream::MsgPackStream() : { } MsgPackStream::MsgPackStream(QIODevice *d) : - dev(d), owndev(false) + dev(d), owndev(false), q_status(Ok) { } MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) : From 067c72767a3604e5aa8ba72c6e56bae4af48a95b Mon Sep 17 00:00:00 2001 From: Matthew Stickney Date: Thu, 16 Feb 2017 16:23:40 -0500 Subject: [PATCH 2/3] Handle partial writes correctly. QIODevice::write() can theoretically write any number of bytes less than the amount that was requested. If the result was >= 0, we haven't hit EOF or an actual error yet and should continue attempting to write. Note that the docs[1] for QIODevice::writeData() specify that writeData should write all available data before returning or else QDataStream won't work, but that's no guarantee that all implementors of QIODevice will actually do so. Also note that this does not call QIODevice::waitForBytesWritten after writing, which would negate the benefits of buffering for buffered devices. As a side-effect, that means this will spin if the writes don't complete. [1] http://doc.qt.io/qt-5/qiodevice.html#writeData --- src/msgpackstream.cpp | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/src/msgpackstream.cpp b/src/msgpackstream.cpp index 1a35149..bbbcc37 100644 --- a/src/msgpackstream.cpp +++ b/src/msgpackstream.cpp @@ -385,7 +385,7 @@ MsgPackStream &MsgPackStream::operator<<(bool b) CHECK_STREAM_WRITE_PRECOND(*this); quint8 m = b == true ? MsgPack::FirstByte::MTRUE : MsgPack::FirstByte::MFALSE; - if (dev->write((char *)&m, 1) != 1) + if (writeBytes((char *)&m, 1) != 1) setStatus(WriteFailed); return *this; } @@ -395,7 +395,7 @@ MsgPackStream &MsgPackStream::operator<<(quint32 u32) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[5]; quint8 sz = MsgPackPrivate::pack_uint(u32, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -405,7 +405,7 @@ MsgPackStream &MsgPackStream::operator<<(quint64 u64) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[9]; quint8 sz = MsgPackPrivate::pack_ulonglong(u64, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -415,7 +415,7 @@ MsgPackStream &MsgPackStream::operator<<(qint32 i32) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[5]; quint8 sz = MsgPackPrivate::pack_int(i32, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -425,7 +425,7 @@ MsgPackStream &MsgPackStream::operator<<(qint64 i64) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[9]; quint8 sz = MsgPackPrivate::pack_longlong(i64, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -435,7 +435,7 @@ MsgPackStream &MsgPackStream::operator<<(float f) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[5]; quint8 sz = MsgPackPrivate::pack_float(f, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -445,7 +445,7 @@ MsgPackStream &MsgPackStream::operator<<(double d) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[9]; quint8 sz = MsgPackPrivate::pack_double(d, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -457,7 +457,7 @@ MsgPackStream &MsgPackStream::operator<<(QString str) quint32 sz = MsgPackPrivate::pack_string(str, p, false) - p; quint8 *data = new quint8[sz]; MsgPackPrivate::pack_string(str, data, true); - if (dev->write((char *)data, sz) != sz) + if (writeBytes((char *)data, sz) != sz) setStatus(WriteFailed); delete[] data; return *this; @@ -471,7 +471,7 @@ MsgPackStream &MsgPackStream::operator<<(const char *str) quint32 sz = MsgPackPrivate::pack_string_raw(str, str_len, p, false) - p; quint8 *data = new quint8[sz]; MsgPackPrivate::pack_string_raw(str, str_len, data, true); - if (dev->write((char *)data, sz) != sz) + if (writeBytes((char *)data, sz) != sz) setStatus(WriteFailed); delete[] data; return *this; @@ -483,11 +483,11 @@ MsgPackStream &MsgPackStream::operator<<(QByteArray array) quint8 p[5]; quint32 len = array.length(); quint8 header_len = MsgPackPrivate::pack_bin_header(len, p, true) - p; - if (dev->write((char *)p, header_len) != header_len) { + if (writeBytes((char *)p, header_len) != header_len) { setStatus(WriteFailed); return *this; } - if (dev->write(array.data(), len) != len) + if (writeBytes(array.data(), len) != len) setStatus(WriteFailed); return *this; } @@ -495,9 +495,18 @@ MsgPackStream &MsgPackStream::operator<<(QByteArray array) bool MsgPackStream::writeBytes(const char *data, uint len) { CHECK_STREAM_WRITE_PRECOND(false); - if (dev->write(data, len) != len) { - setStatus(WriteFailed); - return false; + uint written = 0; + uint thisWrite; + while (written < len) { + thisWrite = dev->write(data, len - written); + if (thisWrite < 0) { + setStatus(WriteFailed); + return false; + } + + /* Increment the write pointer and the total byte count. */ + data += thisWrite; + written += thisWrite; } return true; } @@ -534,7 +543,7 @@ bool MsgPackStream::writeExtHeader(quint32 len, qint8 msgpackType) d[5] = msgpackType; sz = 6; } - if (dev->write((const char *)d, sz) != sz) { + if (writeBytes((const char *)d, sz) != sz) { setStatus(WriteFailed); return false; } From ca56d9e3626fb87a786ebdebc52e63210a9a2522 Mon Sep 17 00:00:00 2001 From: Matthew Stickney Date: Mon, 27 Feb 2017 17:32:49 -0500 Subject: [PATCH 3/3] Add an option to flush writes. There is a bug with QLocalSockets on Windows where data will be silently dropped if two consecutive writes are performed without flushing and the remote end hasn't read all the data between the two (see https://bugreports.qt.io/browse/QTBUG-18385). Since qmsgpack performs several write operations as part of a single stream output, it must allow callers to specify whether to flush writes in order to work around this bug. --- src/msgpackstream.cpp | 28 ++++++++++++++++++++++++---- src/msgpackstream.h | 3 +++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/msgpackstream.cpp b/src/msgpackstream.cpp index bbbcc37..d4d28b3 100644 --- a/src/msgpackstream.cpp +++ b/src/msgpackstream.cpp @@ -24,15 +24,15 @@ return retVal; MsgPackStream::MsgPackStream() : - dev(0), owndev(false), q_status(Ok) + dev(0), owndev(false), q_status(Ok), flushWrites(false) { } MsgPackStream::MsgPackStream(QIODevice *d) : - dev(d), owndev(false), q_status(Ok) + dev(d), owndev(false), q_status(Ok), flushWrites(false) { } MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) : - owndev(true), q_status(Ok) + owndev(true), q_status(Ok), flushWrites(false) { QBuffer *buf = new QBuffer(a); buf->open(mode); @@ -40,7 +40,7 @@ MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) : } MsgPackStream::MsgPackStream(const QByteArray &a) : - owndev(true), q_status(Ok) + owndev(true), q_status(Ok), flushWrites(false) { QBuffer *buf = new QBuffer(); buf->setData(a); @@ -87,6 +87,16 @@ void MsgPackStream::setStatus(Status status) q_status = status; } +void MsgPackStream::setFlushWrites(bool flush) +{ + flushWrites = flush; +} + +bool MsgPackStream::willFlushWrites() +{ + return flushWrites; +} + MsgPackStream &MsgPackStream::operator>>(bool &b) { CHECK_STREAM_PRECOND(*this) @@ -503,6 +513,16 @@ bool MsgPackStream::writeBytes(const char *data, uint len) setStatus(WriteFailed); return false; } + /* Apparently on Windows, the buffer size for named pipes is 0, and + * any data that is written before the remote end reads it is + * dropped (!!) without error (see https://bugreports.qt.io/browse/QTBUG-18385). + * We must be very sure that the data has been written before we try + * another write. This degrades performance in other cases, so callers + * must enable this behavior explicitly. + */ + if (this->flushWrites) { + dev->waitForBytesWritten(-1); + } /* Increment the write pointer and the total byte count. */ data += thisWrite; diff --git a/src/msgpackstream.h b/src/msgpackstream.h index 37f7222..61cd440 100644 --- a/src/msgpackstream.h +++ b/src/msgpackstream.h @@ -26,6 +26,8 @@ public: Status status() const; void resetStatus(); void setStatus(Status status); + void setFlushWrites(bool flushWrites); + bool willFlushWrites(); MsgPackStream &operator>>(bool &b); MsgPackStream &operator>>(quint8 &u8); @@ -60,6 +62,7 @@ private: QIODevice *dev; bool owndev; Status q_status; + bool flushWrites; bool unpack_longlong(qint64 &i64); bool unpack_ulonglong(quint64 &u64);