diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 51166b2..596dc27 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -104,9 +104,53 @@ static String generateEventMessage(const char *message, const char *event, uint3 return ev; } +// Message + +AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len) +: _status(EVS_MSG_ERROR), _data(nullptr), _len(len), _sent(0) +{ + _data = (uint8_t*)malloc(_len+1); + if(_data == NULL){ + _len = 0; + _status = EVS_MSG_ERROR; + } else { + _status = EVS_MSG_SENDING; + memcpy(_data, data, len); + _data[_len] = 0; + String temp = data; + } + + +} + +AsyncEventSourceMessage::~AsyncEventSourceMessage() { + if(_data != NULL) + free(_data); +} + +void AsyncEventSourceMessage::ack(size_t len, uint32_t time) { + if(_len == len && _sent == _len){ + _status = EVS_MSG_SENT; + } +} + +size_t AsyncEventSourceMessage::send(AsyncClient *client) { + if(!client->canSend()){ + return 0; + } + if(client->space() < _len){ + return 0; + } + size_t sent = client->write((const char *)_data, _len); + _sent = sent; + return sent; +} + // Client -AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server){ +AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) +: _messageQueue(LinkedList([](AsyncEventSourceMessage *m){ delete m; })) +{ _client = request->client(); _server = server; _lastId = 0; @@ -115,19 +159,50 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A _client->setRxTimeout(0); _client->onError(NULL, NULL); - _client->onAck(NULL, NULL); - _client->onPoll(NULL, NULL); + _client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this); + _client->onPoll([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onPoll(); }, this); _client->onData(NULL, NULL); - _client->onTimeout([](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this); - _client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this); + _client->onTimeout([this](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this); + _client->onDisconnect([this](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this); + _server->_addClient(this); delete request; } AsyncEventSourceClient::~AsyncEventSourceClient(){ + _messageQueue.free(); close(); } +void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){ + if(dataMessage == NULL) + return; + if(!connected()){ + delete dataMessage; + return; + } + + _messageQueue.add(dataMessage); + + if(_client->canSend()) { _runQueue(); } +} + +void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ + + if(len && !_messageQueue.isEmpty()){ + _messageQueue.front()->ack(len, time); + } + + _runQueue(); +} + +void AsyncEventSourceClient::_onPoll(){ + if(_client->canSend() && !_messageQueue.isEmpty()){ + _runQueue(); + } +} + + void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){ _client->close(true); } @@ -143,18 +218,22 @@ void AsyncEventSourceClient::close(){ } void AsyncEventSourceClient::write(const char * message, size_t len){ - if(!_client->canSend()){ - return; - } - if(_client->space() < len){ - return; - } - _client->write(message, len); + _queueMessage(new AsyncEventSourceMessage(message, len)); } void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev = generateEventMessage(message, event, id, reconnect); - write(ev.c_str(), ev.length()); + _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length())); +} + +void AsyncEventSourceClient::_runQueue(){ + while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){ + _messageQueue.remove(_messageQueue.front()); + } + + if(!_messageQueue.isEmpty()){ + _messageQueue.front()->send(_client); + } } @@ -210,8 +289,9 @@ void AsyncEventSource::send(const char *message, const char *event, uint32_t id, String ev = generateEventMessage(message, event, id, reconnect); for(const auto &c: _clients){ - if(c->connected()) + if(c->connected()) { c->write(ev.c_str(), ev.length()); + } } } @@ -257,3 +337,4 @@ size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len } return 0; } + diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index a000de2..ffa1a22 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -29,11 +29,33 @@ class AsyncEventSourceResponse; class AsyncEventSourceClient; typedef std::function ArEventHandlerFunction; +typedef enum { EVS_MSG_SENDING, EVS_MSG_SENT, EVS_MSG_ERROR } EventSourceMessageStatus; + + +class AsyncEventSourceMessage { + private: + EventSourceMessageStatus _status; + uint8_t * _data; + size_t _len; + size_t _sent; + //size_t _ack; + //size_t _acked; + public: + AsyncEventSourceMessage(const char * data, size_t len); + ~AsyncEventSourceMessage(); + void ack(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))); + size_t send(AsyncClient *client __attribute__((unused))); + bool finished(){ return _status != EVS_MSG_SENDING; } +}; + class AsyncEventSourceClient { private: AsyncClient *_client; AsyncEventSource *_server; uint32_t _lastId; + LinkedList _messageQueue; + void _queueMessage(AsyncEventSourceMessage *dataMessage); + void _runQueue(); public: @@ -48,6 +70,8 @@ class AsyncEventSourceClient { uint32_t lastId() const { return _lastId; } //system callbacks (do not call) + void _onAck(size_t len, uint32_t time); + void _onPoll(); void _onTimeout(uint32_t time); void _onDisconnect(); };