diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 77589bb..90faa3f 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -183,30 +183,25 @@ AsyncEventSourceClient::~AsyncEventSourceClient(){ close(); } -void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){ - if(dataMessage == NULL) - return; - if(!connected()){ - delete dataMessage; - return; - } +void AsyncEventSourceClient::_queueMessage(const char * message, size_t len){ #ifdef ESP32 //length() is not thread-safe, thus acquiring the lock before this call.. std::lock_guard lock(_lockmq); #endif + if(_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES){ #ifdef ESP8266 ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); #else log_e("Too many messages queued: deleting message"); #endif - delete dataMessage; - } else { - _messageQueue.emplace_back(dataMessage); - // runqueue trigger when new messages added - if(_client->canSend()) { - _runQueue(); - } + return; + } + + _messageQueue.emplace_back(message, len); + // runqueue trigger when new messages added + if(_client->canSend()) { + _runQueue(); } } @@ -216,8 +211,8 @@ void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ std::lock_guard lock(_lockmq); #endif while(len && _messageQueue.size()){ - len = _messageQueue.front()->ack(len, time); - if(_messageQueue.front()->finished()) + len = _messageQueue.front().ack(len, time); + if(_messageQueue.front().finished()) _messageQueue.pop_front(); } _runQueue(); @@ -248,12 +243,14 @@ void AsyncEventSourceClient::close(){ } void AsyncEventSourceClient::write(const char * message, size_t len){ - _queueMessage(new AsyncEventSourceMessage(message, len)); + if(!connected()) return; + _queueMessage(message, len); } void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ + if(!connected()) return; String ev = generateEventMessage(message, event, id, reconnect); - _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length())); + _queueMessage(ev.c_str(), ev.length()); } size_t AsyncEventSourceClient::packetsWaiting() const { @@ -267,8 +264,8 @@ void AsyncEventSourceClient::_runQueue() { // Calls to this private method now already protected by _lockmq acquisition // so no extra call of _lockmq.lock() here.. for ( auto &i : _messageQueue){ - if (!i->sent()) - i->send(_client); + if (!i.sent()) + i.send(_client); } } diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index 351e92c..6ce3e57 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -79,12 +79,11 @@ class AsyncEventSourceClient { AsyncClient *_client; AsyncEventSource *_server; uint32_t _lastId; - std::list< std::unique_ptr > _messageQueue; - // ArFi 2020-08-27 for protecting/serializing _messageQueue + std::list< AsyncEventSourceMessage > _messageQueue; #ifdef ESP32 mutable std::mutex _lockmq; #endif - void _queueMessage(AsyncEventSourceMessage *dataMessage); + void _queueMessage(const char * message, size_t len); void _runQueue(); public: