mirror of
https://github.com/romixlab/qmsgpack.git
synced 2025-07-30 18:37:14 +02:00
Merge pull request #27 from mtstickney/stream_partial_writes
Stream partial writes
This commit is contained in:
@ -24,15 +24,15 @@
|
|||||||
return retVal;
|
return retVal;
|
||||||
|
|
||||||
MsgPackStream::MsgPackStream() :
|
MsgPackStream::MsgPackStream() :
|
||||||
dev(0), owndev(false), q_status(Ok)
|
dev(0), owndev(false), q_status(Ok), flushWrites(false)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
MsgPackStream::MsgPackStream(QIODevice *d) :
|
MsgPackStream::MsgPackStream(QIODevice *d) :
|
||||||
dev(d), owndev(false)
|
dev(d), owndev(false), q_status(Ok), flushWrites(false)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) :
|
MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) :
|
||||||
owndev(true), q_status(Ok)
|
owndev(true), q_status(Ok), flushWrites(false)
|
||||||
{
|
{
|
||||||
QBuffer *buf = new QBuffer(a);
|
QBuffer *buf = new QBuffer(a);
|
||||||
buf->open(mode);
|
buf->open(mode);
|
||||||
@ -40,7 +40,7 @@ MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) :
|
|||||||
}
|
}
|
||||||
|
|
||||||
MsgPackStream::MsgPackStream(const QByteArray &a) :
|
MsgPackStream::MsgPackStream(const QByteArray &a) :
|
||||||
owndev(true), q_status(Ok)
|
owndev(true), q_status(Ok), flushWrites(false)
|
||||||
{
|
{
|
||||||
QBuffer *buf = new QBuffer();
|
QBuffer *buf = new QBuffer();
|
||||||
buf->setData(a);
|
buf->setData(a);
|
||||||
@ -87,6 +87,16 @@ void MsgPackStream::setStatus(Status status)
|
|||||||
q_status = status;
|
q_status = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MsgPackStream::setFlushWrites(bool flush)
|
||||||
|
{
|
||||||
|
flushWrites = flush;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool MsgPackStream::willFlushWrites()
|
||||||
|
{
|
||||||
|
return flushWrites;
|
||||||
|
}
|
||||||
|
|
||||||
MsgPackStream &MsgPackStream::operator>>(bool &b)
|
MsgPackStream &MsgPackStream::operator>>(bool &b)
|
||||||
{
|
{
|
||||||
CHECK_STREAM_PRECOND(*this)
|
CHECK_STREAM_PRECOND(*this)
|
||||||
@ -385,7 +395,7 @@ MsgPackStream &MsgPackStream::operator<<(bool b)
|
|||||||
CHECK_STREAM_WRITE_PRECOND(*this);
|
CHECK_STREAM_WRITE_PRECOND(*this);
|
||||||
quint8 m = b == true ?
|
quint8 m = b == true ?
|
||||||
MsgPack::FirstByte::MTRUE : MsgPack::FirstByte::MFALSE;
|
MsgPack::FirstByte::MTRUE : MsgPack::FirstByte::MFALSE;
|
||||||
if (dev->write((char *)&m, 1) != 1)
|
if (writeBytes((char *)&m, 1) != 1)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -395,7 +405,7 @@ MsgPackStream &MsgPackStream::operator<<(quint32 u32)
|
|||||||
CHECK_STREAM_WRITE_PRECOND(*this);
|
CHECK_STREAM_WRITE_PRECOND(*this);
|
||||||
quint8 p[5];
|
quint8 p[5];
|
||||||
quint8 sz = MsgPackPrivate::pack_uint(u32, p, true) - p;
|
quint8 sz = MsgPackPrivate::pack_uint(u32, p, true) - p;
|
||||||
if (dev->write((char *)p, sz) != sz)
|
if (writeBytes((char *)p, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -405,7 +415,7 @@ MsgPackStream &MsgPackStream::operator<<(quint64 u64)
|
|||||||
CHECK_STREAM_WRITE_PRECOND(*this);
|
CHECK_STREAM_WRITE_PRECOND(*this);
|
||||||
quint8 p[9];
|
quint8 p[9];
|
||||||
quint8 sz = MsgPackPrivate::pack_ulonglong(u64, p, true) - p;
|
quint8 sz = MsgPackPrivate::pack_ulonglong(u64, p, true) - p;
|
||||||
if (dev->write((char *)p, sz) != sz)
|
if (writeBytes((char *)p, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -415,7 +425,7 @@ MsgPackStream &MsgPackStream::operator<<(qint32 i32)
|
|||||||
CHECK_STREAM_WRITE_PRECOND(*this);
|
CHECK_STREAM_WRITE_PRECOND(*this);
|
||||||
quint8 p[5];
|
quint8 p[5];
|
||||||
quint8 sz = MsgPackPrivate::pack_int(i32, p, true) - p;
|
quint8 sz = MsgPackPrivate::pack_int(i32, p, true) - p;
|
||||||
if (dev->write((char *)p, sz) != sz)
|
if (writeBytes((char *)p, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -425,7 +435,7 @@ MsgPackStream &MsgPackStream::operator<<(qint64 i64)
|
|||||||
CHECK_STREAM_WRITE_PRECOND(*this);
|
CHECK_STREAM_WRITE_PRECOND(*this);
|
||||||
quint8 p[9];
|
quint8 p[9];
|
||||||
quint8 sz = MsgPackPrivate::pack_longlong(i64, p, true) - p;
|
quint8 sz = MsgPackPrivate::pack_longlong(i64, p, true) - p;
|
||||||
if (dev->write((char *)p, sz) != sz)
|
if (writeBytes((char *)p, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -435,7 +445,7 @@ MsgPackStream &MsgPackStream::operator<<(float f)
|
|||||||
CHECK_STREAM_WRITE_PRECOND(*this);
|
CHECK_STREAM_WRITE_PRECOND(*this);
|
||||||
quint8 p[5];
|
quint8 p[5];
|
||||||
quint8 sz = MsgPackPrivate::pack_float(f, p, true) - p;
|
quint8 sz = MsgPackPrivate::pack_float(f, p, true) - p;
|
||||||
if (dev->write((char *)p, sz) != sz)
|
if (writeBytes((char *)p, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -445,7 +455,7 @@ MsgPackStream &MsgPackStream::operator<<(double d)
|
|||||||
CHECK_STREAM_WRITE_PRECOND(*this);
|
CHECK_STREAM_WRITE_PRECOND(*this);
|
||||||
quint8 p[9];
|
quint8 p[9];
|
||||||
quint8 sz = MsgPackPrivate::pack_double(d, p, true) - p;
|
quint8 sz = MsgPackPrivate::pack_double(d, p, true) - p;
|
||||||
if (dev->write((char *)p, sz) != sz)
|
if (writeBytes((char *)p, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -457,7 +467,7 @@ MsgPackStream &MsgPackStream::operator<<(QString str)
|
|||||||
quint32 sz = MsgPackPrivate::pack_string(str, p, false) - p;
|
quint32 sz = MsgPackPrivate::pack_string(str, p, false) - p;
|
||||||
quint8 *data = new quint8[sz];
|
quint8 *data = new quint8[sz];
|
||||||
MsgPackPrivate::pack_string(str, data, true);
|
MsgPackPrivate::pack_string(str, data, true);
|
||||||
if (dev->write((char *)data, sz) != sz)
|
if (writeBytes((char *)data, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
delete[] data;
|
delete[] data;
|
||||||
return *this;
|
return *this;
|
||||||
@ -471,7 +481,7 @@ MsgPackStream &MsgPackStream::operator<<(const char *str)
|
|||||||
quint32 sz = MsgPackPrivate::pack_string_raw(str, str_len, p, false) - p;
|
quint32 sz = MsgPackPrivate::pack_string_raw(str, str_len, p, false) - p;
|
||||||
quint8 *data = new quint8[sz];
|
quint8 *data = new quint8[sz];
|
||||||
MsgPackPrivate::pack_string_raw(str, str_len, data, true);
|
MsgPackPrivate::pack_string_raw(str, str_len, data, true);
|
||||||
if (dev->write((char *)data, sz) != sz)
|
if (writeBytes((char *)data, sz) != sz)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
delete[] data;
|
delete[] data;
|
||||||
return *this;
|
return *this;
|
||||||
@ -483,11 +493,11 @@ MsgPackStream &MsgPackStream::operator<<(QByteArray array)
|
|||||||
quint8 p[5];
|
quint8 p[5];
|
||||||
quint32 len = array.length();
|
quint32 len = array.length();
|
||||||
quint8 header_len = MsgPackPrivate::pack_bin_header(len, p, true) - p;
|
quint8 header_len = MsgPackPrivate::pack_bin_header(len, p, true) - p;
|
||||||
if (dev->write((char *)p, header_len) != header_len) {
|
if (writeBytes((char *)p, header_len) != header_len) {
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
if (dev->write(array.data(), len) != len)
|
if (writeBytes(array.data(), len) != len)
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
@ -495,9 +505,28 @@ MsgPackStream &MsgPackStream::operator<<(QByteArray array)
|
|||||||
bool MsgPackStream::writeBytes(const char *data, uint len)
|
bool MsgPackStream::writeBytes(const char *data, uint len)
|
||||||
{
|
{
|
||||||
CHECK_STREAM_WRITE_PRECOND(false);
|
CHECK_STREAM_WRITE_PRECOND(false);
|
||||||
if (dev->write(data, len) != len) {
|
uint written = 0;
|
||||||
setStatus(WriteFailed);
|
uint thisWrite;
|
||||||
return false;
|
while (written < len) {
|
||||||
|
thisWrite = dev->write(data, len - written);
|
||||||
|
if (thisWrite < 0) {
|
||||||
|
setStatus(WriteFailed);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
/* Apparently on Windows, the buffer size for named pipes is 0, and
|
||||||
|
* any data that is written before the remote end reads it is
|
||||||
|
* dropped (!!) without error (see https://bugreports.qt.io/browse/QTBUG-18385).
|
||||||
|
* We must be very sure that the data has been written before we try
|
||||||
|
* another write. This degrades performance in other cases, so callers
|
||||||
|
* must enable this behavior explicitly.
|
||||||
|
*/
|
||||||
|
if (this->flushWrites) {
|
||||||
|
dev->waitForBytesWritten(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Increment the write pointer and the total byte count. */
|
||||||
|
data += thisWrite;
|
||||||
|
written += thisWrite;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -534,7 +563,7 @@ bool MsgPackStream::writeExtHeader(quint32 len, qint8 msgpackType)
|
|||||||
d[5] = msgpackType;
|
d[5] = msgpackType;
|
||||||
sz = 6;
|
sz = 6;
|
||||||
}
|
}
|
||||||
if (dev->write((const char *)d, sz) != sz) {
|
if (writeBytes((const char *)d, sz) != sz) {
|
||||||
setStatus(WriteFailed);
|
setStatus(WriteFailed);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@ public:
|
|||||||
Status status() const;
|
Status status() const;
|
||||||
void resetStatus();
|
void resetStatus();
|
||||||
void setStatus(Status status);
|
void setStatus(Status status);
|
||||||
|
void setFlushWrites(bool flushWrites);
|
||||||
|
bool willFlushWrites();
|
||||||
|
|
||||||
MsgPackStream &operator>>(bool &b);
|
MsgPackStream &operator>>(bool &b);
|
||||||
MsgPackStream &operator>>(quint8 &u8);
|
MsgPackStream &operator>>(quint8 &u8);
|
||||||
@ -60,6 +62,7 @@ private:
|
|||||||
QIODevice *dev;
|
QIODevice *dev;
|
||||||
bool owndev;
|
bool owndev;
|
||||||
Status q_status;
|
Status q_status;
|
||||||
|
bool flushWrites;
|
||||||
|
|
||||||
bool unpack_longlong(qint64 &i64);
|
bool unpack_longlong(qint64 &i64);
|
||||||
bool unpack_ulonglong(quint64 &u64);
|
bool unpack_ulonglong(quint64 &u64);
|
||||||
|
Reference in New Issue
Block a user