diff --git a/src/msgpackstream.cpp b/src/msgpackstream.cpp index aee0f59..d4d28b3 100644 --- a/src/msgpackstream.cpp +++ b/src/msgpackstream.cpp @@ -24,15 +24,15 @@ return retVal; MsgPackStream::MsgPackStream() : - dev(0), owndev(false), q_status(Ok) + dev(0), owndev(false), q_status(Ok), flushWrites(false) { } MsgPackStream::MsgPackStream(QIODevice *d) : - dev(d), owndev(false) + dev(d), owndev(false), q_status(Ok), flushWrites(false) { } MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) : - owndev(true), q_status(Ok) + owndev(true), q_status(Ok), flushWrites(false) { QBuffer *buf = new QBuffer(a); buf->open(mode); @@ -40,7 +40,7 @@ MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) : } MsgPackStream::MsgPackStream(const QByteArray &a) : - owndev(true), q_status(Ok) + owndev(true), q_status(Ok), flushWrites(false) { QBuffer *buf = new QBuffer(); buf->setData(a); @@ -87,6 +87,16 @@ void MsgPackStream::setStatus(Status status) q_status = status; } +void MsgPackStream::setFlushWrites(bool flush) +{ + flushWrites = flush; +} + +bool MsgPackStream::willFlushWrites() +{ + return flushWrites; +} + MsgPackStream &MsgPackStream::operator>>(bool &b) { CHECK_STREAM_PRECOND(*this) @@ -385,7 +395,7 @@ MsgPackStream &MsgPackStream::operator<<(bool b) CHECK_STREAM_WRITE_PRECOND(*this); quint8 m = b == true ? MsgPack::FirstByte::MTRUE : MsgPack::FirstByte::MFALSE; - if (dev->write((char *)&m, 1) != 1) + if (writeBytes((char *)&m, 1) != 1) setStatus(WriteFailed); return *this; } @@ -395,7 +405,7 @@ MsgPackStream &MsgPackStream::operator<<(quint32 u32) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[5]; quint8 sz = MsgPackPrivate::pack_uint(u32, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -405,7 +415,7 @@ MsgPackStream &MsgPackStream::operator<<(quint64 u64) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[9]; quint8 sz = MsgPackPrivate::pack_ulonglong(u64, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -415,7 +425,7 @@ MsgPackStream &MsgPackStream::operator<<(qint32 i32) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[5]; quint8 sz = MsgPackPrivate::pack_int(i32, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -425,7 +435,7 @@ MsgPackStream &MsgPackStream::operator<<(qint64 i64) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[9]; quint8 sz = MsgPackPrivate::pack_longlong(i64, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -435,7 +445,7 @@ MsgPackStream &MsgPackStream::operator<<(float f) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[5]; quint8 sz = MsgPackPrivate::pack_float(f, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -445,7 +455,7 @@ MsgPackStream &MsgPackStream::operator<<(double d) CHECK_STREAM_WRITE_PRECOND(*this); quint8 p[9]; quint8 sz = MsgPackPrivate::pack_double(d, p, true) - p; - if (dev->write((char *)p, sz) != sz) + if (writeBytes((char *)p, sz) != sz) setStatus(WriteFailed); return *this; } @@ -457,7 +467,7 @@ MsgPackStream &MsgPackStream::operator<<(QString str) quint32 sz = MsgPackPrivate::pack_string(str, p, false) - p; quint8 *data = new quint8[sz]; MsgPackPrivate::pack_string(str, data, true); - if (dev->write((char *)data, sz) != sz) + if (writeBytes((char *)data, sz) != sz) setStatus(WriteFailed); delete[] data; return *this; @@ -471,7 +481,7 @@ MsgPackStream &MsgPackStream::operator<<(const char *str) quint32 sz = MsgPackPrivate::pack_string_raw(str, str_len, p, false) - p; quint8 *data = new quint8[sz]; MsgPackPrivate::pack_string_raw(str, str_len, data, true); - if (dev->write((char *)data, sz) != sz) + if (writeBytes((char *)data, sz) != sz) setStatus(WriteFailed); delete[] data; return *this; @@ -483,11 +493,11 @@ MsgPackStream &MsgPackStream::operator<<(QByteArray array) quint8 p[5]; quint32 len = array.length(); quint8 header_len = MsgPackPrivate::pack_bin_header(len, p, true) - p; - if (dev->write((char *)p, header_len) != header_len) { + if (writeBytes((char *)p, header_len) != header_len) { setStatus(WriteFailed); return *this; } - if (dev->write(array.data(), len) != len) + if (writeBytes(array.data(), len) != len) setStatus(WriteFailed); return *this; } @@ -495,9 +505,28 @@ MsgPackStream &MsgPackStream::operator<<(QByteArray array) bool MsgPackStream::writeBytes(const char *data, uint len) { CHECK_STREAM_WRITE_PRECOND(false); - if (dev->write(data, len) != len) { - setStatus(WriteFailed); - return false; + uint written = 0; + uint thisWrite; + while (written < len) { + thisWrite = dev->write(data, len - written); + if (thisWrite < 0) { + setStatus(WriteFailed); + return false; + } + /* Apparently on Windows, the buffer size for named pipes is 0, and + * any data that is written before the remote end reads it is + * dropped (!!) without error (see https://bugreports.qt.io/browse/QTBUG-18385). + * We must be very sure that the data has been written before we try + * another write. This degrades performance in other cases, so callers + * must enable this behavior explicitly. + */ + if (this->flushWrites) { + dev->waitForBytesWritten(-1); + } + + /* Increment the write pointer and the total byte count. */ + data += thisWrite; + written += thisWrite; } return true; } @@ -534,7 +563,7 @@ bool MsgPackStream::writeExtHeader(quint32 len, qint8 msgpackType) d[5] = msgpackType; sz = 6; } - if (dev->write((const char *)d, sz) != sz) { + if (writeBytes((const char *)d, sz) != sz) { setStatus(WriteFailed); return false; } diff --git a/src/msgpackstream.h b/src/msgpackstream.h index 37f7222..61cd440 100644 --- a/src/msgpackstream.h +++ b/src/msgpackstream.h @@ -26,6 +26,8 @@ public: Status status() const; void resetStatus(); void setStatus(Status status); + void setFlushWrites(bool flushWrites); + bool willFlushWrites(); MsgPackStream &operator>>(bool &b); MsgPackStream &operator>>(quint8 &u8); @@ -60,6 +62,7 @@ private: QIODevice *dev; bool owndev; Status q_status; + bool flushWrites; bool unpack_longlong(qint64 &i64); bool unpack_ulonglong(quint64 &u64);