The previous event need not be ACKed to send the next one (it will be queued). (#149)

Signed-off-by: Alexandr Zarubkin <me21@yandex.ru>
This commit is contained in:
Alexandr Zarubkin
2017-04-06 11:11:48 +03:00
committed by Me No Dev
parent 120984e6d2
commit ab61227a8a
2 changed files with 33 additions and 31 deletions

View File

@@ -107,20 +107,15 @@ static String generateEventMessage(const char *message, const char *event, uint3
// Message // Message
AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len) AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len)
: _status(EVS_MSG_ERROR), _data(nullptr), _len(len), _sent(0) : _data(nullptr), _len(len), _sent(0), _acked(0)
{ {
_data = (uint8_t*)malloc(_len+1); _data = (uint8_t*)malloc(_len+1);
if(_data == NULL){ if(_data == nullptr){
_len = 0; _len = 0;
_status = EVS_MSG_ERROR;
} else { } else {
_status = EVS_MSG_SENDING;
memcpy(_data, data, len); memcpy(_data, data, len);
_data[_len] = 0; _data[_len] = 0;
String temp = data;
} }
} }
AsyncEventSourceMessage::~AsyncEventSourceMessage() { AsyncEventSourceMessage::~AsyncEventSourceMessage() {
@@ -128,21 +123,28 @@ AsyncEventSourceMessage::~AsyncEventSourceMessage() {
free(_data); free(_data);
} }
void AsyncEventSourceMessage::ack(size_t len, uint32_t time) { size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
if(_len == len && _sent == _len){ // If the whole message is now acked...
_status = EVS_MSG_SENT; if(_acked + len > _len){
// Return the number of extra bytes acked (they will be carried on to the next message)
const size_t extra = _acked + len - _len;
_acked = _len;
return extra;
} }
// Return that no extra bytes left.
_acked += len;
return 0;
} }
size_t AsyncEventSourceMessage::send(AsyncClient *client) { size_t AsyncEventSourceMessage::send(AsyncClient *client) {
if(!client->canSend()){ const size_t len = _len - _sent;
if(client->space() < len){
return 0; return 0;
} }
if(client->space() < _len){ size_t sent = client->add((const char *)_data, len);
return 0; if(client->canSend())
} client->send();
size_t sent = client->write((const char *)_data, _len); _sent += sent;
_sent = sent;
return sent; return sent;
} }
@@ -184,20 +186,21 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
_messageQueue.add(dataMessage); _messageQueue.add(dataMessage);
if(_client->canSend()) { _runQueue(); } _runQueue();
} }
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
while(len && !_messageQueue.isEmpty()){
if(len && !_messageQueue.isEmpty()){ len = _messageQueue.front()->ack(len, time);
_messageQueue.front()->ack(len, time); if(_messageQueue.front()->finished())
_messageQueue.remove(_messageQueue.front());
} }
_runQueue(); _runQueue();
} }
void AsyncEventSourceClient::_onPoll(){ void AsyncEventSourceClient::_onPoll(){
if(_client->canSend() && !_messageQueue.isEmpty()){ if(!_messageQueue.isEmpty()){
_runQueue(); _runQueue();
} }
} }
@@ -231,8 +234,10 @@ void AsyncEventSourceClient::_runQueue(){
_messageQueue.remove(_messageQueue.front()); _messageQueue.remove(_messageQueue.front());
} }
if(!_messageQueue.isEmpty()){ for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
_messageQueue.front()->send(_client); {
if(!(*i)->sent())
(*i)->send(_client);
} }
} }

View File

@@ -29,23 +29,20 @@ class AsyncEventSourceResponse;
class AsyncEventSourceClient; class AsyncEventSourceClient;
typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction; typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction;
typedef enum { EVS_MSG_SENDING, EVS_MSG_SENT, EVS_MSG_ERROR } EventSourceMessageStatus;
class AsyncEventSourceMessage { class AsyncEventSourceMessage {
private: private:
EventSourceMessageStatus _status;
uint8_t * _data; uint8_t * _data;
size_t _len; size_t _len;
size_t _sent; size_t _sent;
//size_t _ack; //size_t _ack;
//size_t _acked; size_t _acked;
public: public:
AsyncEventSourceMessage(const char * data, size_t len); AsyncEventSourceMessage(const char * data, size_t len);
~AsyncEventSourceMessage(); ~AsyncEventSourceMessage();
void ack(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))); size_t ack(size_t len, uint32_t time __attribute__((unused)));
size_t send(AsyncClient *client __attribute__((unused))); size_t send(AsyncClient *client);
bool finished(){ return _status != EVS_MSG_SENDING; } bool finished(){ return _acked == _len; }
bool sent() { return _sent == _len; }
}; };
class AsyncEventSourceClient { class AsyncEventSourceClient {