AsyncEventSourceClient - replace _messageQueue with std::list

replace container with an std::list of unique pointers
This commit is contained in:
Emil Muratov
2024-06-26 17:55:58 +09:00
committed by Mathieu Carbou
parent d614ccde91
commit a1627af702
2 changed files with 14 additions and 18 deletions

View File

@@ -156,7 +156,6 @@ size_t AsyncEventSourceMessage::send(AsyncClient *client) {
// Client // Client
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server)
: _messageQueue(LinkedList<AsyncEventSourceMessage *>([](AsyncEventSourceMessage *m){ delete m; }))
{ {
_client = request->client(); _client = request->client();
_server = server; _server = server;
@@ -180,7 +179,7 @@ AsyncEventSourceClient::~AsyncEventSourceClient(){
#ifdef ESP32 #ifdef ESP32
std::lock_guard<std::mutex> lock(_lockmq); std::lock_guard<std::mutex> lock(_lockmq);
#endif #endif
_messageQueue.free(); _messageQueue.clear();
close(); close();
} }
@@ -195,7 +194,7 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
//length() is not thread-safe, thus acquiring the lock before this call.. //length() is not thread-safe, thus acquiring the lock before this call..
std::lock_guard<std::mutex> lock(_lockmq); std::lock_guard<std::mutex> lock(_lockmq);
#endif #endif
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){ if(_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES){
#ifdef ESP8266 #ifdef ESP8266
ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
#else #else
@@ -203,7 +202,7 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
#endif #endif
delete dataMessage; delete dataMessage;
} else { } else {
_messageQueue.add(dataMessage); _messageQueue.emplace_back(dataMessage);
// runqueue trigger when new messages added // runqueue trigger when new messages added
if(_client->canSend()) { if(_client->canSend()) {
_runQueue(); _runQueue();
@@ -216,10 +215,10 @@ void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
// Same here, acquiring the lock early // Same here, acquiring the lock early
std::lock_guard<std::mutex> lock(_lockmq); std::lock_guard<std::mutex> lock(_lockmq);
#endif #endif
while(len && !_messageQueue.isEmpty()){ while(len && _messageQueue.size()){
len = _messageQueue.front()->ack(len, time); len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished()) if(_messageQueue.front()->finished())
_messageQueue.remove(_messageQueue.front()); _messageQueue.pop_front();
} }
_runQueue(); _runQueue();
} }
@@ -229,7 +228,7 @@ void AsyncEventSourceClient::_onPoll(){
// Same here, acquiring the lock early // Same here, acquiring the lock early
std::lock_guard<std::mutex> lock(_lockmq); std::lock_guard<std::mutex> lock(_lockmq);
#endif #endif
if(!_messageQueue.isEmpty()){ if(_messageQueue.size()){
_runQueue(); _runQueue();
} }
} }
@@ -261,19 +260,15 @@ size_t AsyncEventSourceClient::packetsWaiting() const {
#ifdef ESP32 #ifdef ESP32
std::lock_guard<std::mutex> lock(_lockmq); std::lock_guard<std::mutex> lock(_lockmq);
#endif #endif
size_t len = _messageQueue.length(); return _messageQueue.size();
return len;
} }
void AsyncEventSourceClient::_runQueue() { void AsyncEventSourceClient::_runQueue() {
// Calls to this private method now already protected by _lockmq acquisition // Calls to this private method now already protected by _lockmq acquisition
// so no extra call of _lockmq.lock() here.. // so no extra call of _lockmq.lock() here..
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) { for ( auto &i : _messageQueue){
// If it crashes here, iterator (i) has been invalidated as _messageQueue if (!i->sent())
// has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) ) i->send(_client);
if (!(*i)->sent()) {
(*i)->send(_client);
}
} }
} }

View File

@@ -21,6 +21,7 @@
#define ASYNCEVENTSOURCE_H_ #define ASYNCEVENTSOURCE_H_
#include <Arduino.h> #include <Arduino.h>
#include <list>
#ifdef ESP32 #ifdef ESP32
#include <mutex> #include <mutex>
#include <AsyncTCP.h> #include <AsyncTCP.h>
@@ -54,8 +55,8 @@
class AsyncEventSource; class AsyncEventSource;
class AsyncEventSourceResponse; class AsyncEventSourceResponse;
class AsyncEventSourceClient; class AsyncEventSourceClient;
typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction; using ArEventHandlerFunction = std::function<void(AsyncEventSourceClient *client)>;
typedef std::function<bool(AsyncWebServerRequest *request)> ArAuthorizeConnectHandler; using ArAuthorizeConnectHandler = std::function<bool(AsyncWebServerRequest *request)>;
class AsyncEventSourceMessage { class AsyncEventSourceMessage {
private: private:
@@ -78,7 +79,7 @@ class AsyncEventSourceClient {
AsyncClient *_client; AsyncClient *_client;
AsyncEventSource *_server; AsyncEventSource *_server;
uint32_t _lastId; uint32_t _lastId;
LinkedList<AsyncEventSourceMessage *> _messageQueue; std::list< std::unique_ptr<AsyncEventSourceMessage> > _messageQueue;
// ArFi 2020-08-27 for protecting/serializing _messageQueue // ArFi 2020-08-27 for protecting/serializing _messageQueue
#ifdef ESP32 #ifdef ESP32
mutable std::mutex _lockmq; mutable std::mutex _lockmq;