From ab61227a8aac6891083453e8e919839d923a418e Mon Sep 17 00:00:00 2001 From: Alexandr Zarubkin Date: Thu, 6 Apr 2017 11:11:48 +0300 Subject: [PATCH] The previous event need not be ACKed to send the next one (it will be queued). (#149) Signed-off-by: Alexandr Zarubkin --- src/AsyncEventSource.cpp | 51 ++++++++++++++++++++++------------------ src/AsyncEventSource.h | 13 ++++------ 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 596dc27..8ea5b1b 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -107,20 +107,15 @@ static String generateEventMessage(const char *message, const char *event, uint3 // Message AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len) -: _status(EVS_MSG_ERROR), _data(nullptr), _len(len), _sent(0) +: _data(nullptr), _len(len), _sent(0), _acked(0) { _data = (uint8_t*)malloc(_len+1); - if(_data == NULL){ + if(_data == nullptr){ _len = 0; - _status = EVS_MSG_ERROR; } else { - _status = EVS_MSG_SENDING; memcpy(_data, data, len); _data[_len] = 0; - String temp = data; } - - } AsyncEventSourceMessage::~AsyncEventSourceMessage() { @@ -128,21 +123,28 @@ AsyncEventSourceMessage::~AsyncEventSourceMessage() { free(_data); } -void AsyncEventSourceMessage::ack(size_t len, uint32_t time) { - if(_len == len && _sent == _len){ - _status = EVS_MSG_SENT; +size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) { + // If the whole message is now acked... + if(_acked + len > _len){ + // Return the number of extra bytes acked (they will be carried on to the next message) + const size_t extra = _acked + len - _len; + _acked = _len; + return extra; } + // Return that no extra bytes left. + _acked += len; + return 0; } size_t AsyncEventSourceMessage::send(AsyncClient *client) { - if(!client->canSend()){ + const size_t len = _len - _sent; + if(client->space() < len){ return 0; } - if(client->space() < _len){ - return 0; - } - size_t sent = client->write((const char *)_data, _len); - _sent = sent; + size_t sent = client->add((const char *)_data, len); + if(client->canSend()) + client->send(); + _sent += sent; return sent; } @@ -184,20 +186,21 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage) _messageQueue.add(dataMessage); - if(_client->canSend()) { _runQueue(); } + _runQueue(); } void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ - - if(len && !_messageQueue.isEmpty()){ - _messageQueue.front()->ack(len, time); + while(len && !_messageQueue.isEmpty()){ + len = _messageQueue.front()->ack(len, time); + if(_messageQueue.front()->finished()) + _messageQueue.remove(_messageQueue.front()); } _runQueue(); } void AsyncEventSourceClient::_onPoll(){ - if(_client->canSend() && !_messageQueue.isEmpty()){ + if(!_messageQueue.isEmpty()){ _runQueue(); } } @@ -231,8 +234,10 @@ void AsyncEventSourceClient::_runQueue(){ _messageQueue.remove(_messageQueue.front()); } - if(!_messageQueue.isEmpty()){ - _messageQueue.front()->send(_client); + for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) + { + if(!(*i)->sent()) + (*i)->send(_client); } } diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index ffa1a22..751d470 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -29,23 +29,20 @@ class AsyncEventSourceResponse; class AsyncEventSourceClient; typedef std::function ArEventHandlerFunction; -typedef enum { EVS_MSG_SENDING, EVS_MSG_SENT, EVS_MSG_ERROR } EventSourceMessageStatus; - - class AsyncEventSourceMessage { private: - EventSourceMessageStatus _status; uint8_t * _data; size_t _len; size_t _sent; //size_t _ack; - //size_t _acked; + size_t _acked; public: AsyncEventSourceMessage(const char * data, size_t len); ~AsyncEventSourceMessage(); - void ack(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))); - size_t send(AsyncClient *client __attribute__((unused))); - bool finished(){ return _status != EVS_MSG_SENDING; } + size_t ack(size_t len, uint32_t time __attribute__((unused))); + size_t send(AsyncClient *client); + bool finished(){ return _acked == _len; } + bool sent() { return _sent == _len; } }; class AsyncEventSourceClient {