mirror of
https://github.com/me-no-dev/ESPAsyncWebServer.git
synced 2025-09-27 06:40:56 +02:00
Fix #884, protect list concurrent access with mutex
This commit is contained in:
@@ -173,7 +173,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
|
|||||||
}
|
}
|
||||||
|
|
||||||
AsyncEventSourceClient::~AsyncEventSourceClient(){
|
AsyncEventSourceClient::~AsyncEventSourceClient(){
|
||||||
_messageQueue.free();
|
_lockmq.lock();
|
||||||
|
_messageQueue.free();
|
||||||
|
_lockmq.unlock();
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,33 +186,41 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
|
|||||||
delete dataMessage;
|
delete dataMessage;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
//length() is not thread-safe, thus acquiring the lock before this call..
|
||||||
|
_lockmq.lock();
|
||||||
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
|
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
|
||||||
ets_printf("ERROR: Too many messages queued\n");
|
ets_printf("ERROR: Too many messages queued\n");
|
||||||
delete dataMessage;
|
delete dataMessage;
|
||||||
} else {
|
} else {
|
||||||
_messageQueue.add(dataMessage);
|
_messageQueue.add(dataMessage);
|
||||||
|
// runqueue trigger when new messages added
|
||||||
|
if(_client->canSend()) {
|
||||||
|
_runQueue();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if(_client->canSend())
|
_lockmq.unlock();
|
||||||
_runQueue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
|
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
|
||||||
|
// Same here, acquiring the lock early
|
||||||
|
_lockmq.lock();
|
||||||
while(len && !_messageQueue.isEmpty()){
|
while(len && !_messageQueue.isEmpty()){
|
||||||
len = _messageQueue.front()->ack(len, time);
|
len = _messageQueue.front()->ack(len, time);
|
||||||
if(_messageQueue.front()->finished())
|
if(_messageQueue.front()->finished())
|
||||||
_messageQueue.remove(_messageQueue.front());
|
_messageQueue.remove(_messageQueue.front());
|
||||||
}
|
}
|
||||||
|
|
||||||
_runQueue();
|
_runQueue();
|
||||||
|
_lockmq.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncEventSourceClient::_onPoll(){
|
void AsyncEventSourceClient::_onPoll(){
|
||||||
|
_lockmq.lock();
|
||||||
if(!_messageQueue.isEmpty()){
|
if(!_messageQueue.isEmpty()){
|
||||||
_runQueue();
|
_runQueue();
|
||||||
}
|
}
|
||||||
|
_lockmq.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
|
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
|
||||||
_client->close(true);
|
_client->close(true);
|
||||||
}
|
}
|
||||||
@@ -225,7 +235,7 @@ void AsyncEventSourceClient::close(){
|
|||||||
_client->close();
|
_client->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncEventSourceClient::write(const char * message, size_t len){
|
void AsyncEventSourceClient::_write(const char * message, size_t len){
|
||||||
_queueMessage(new AsyncEventSourceMessage(message, len));
|
_queueMessage(new AsyncEventSourceMessage(message, len));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,15 +244,23 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
|
|||||||
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
|
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncEventSourceClient::_runQueue(){
|
size_t AsyncEventSourceClient::packetsWaiting() const {
|
||||||
while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
|
size_t len;
|
||||||
_messageQueue.remove(_messageQueue.front());
|
_lockmq.lock();
|
||||||
}
|
len = _messageQueue.length();
|
||||||
|
_lockmq.unlock();
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
|
void AsyncEventSourceClient::_runQueue() {
|
||||||
{
|
// Calls to this private method now already protected by _lockmq acquisition
|
||||||
if(!(*i)->sent())
|
// so no extra call of _lockmq.lock() here..
|
||||||
|
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
|
||||||
|
// If it crashes here, iterator (i) has been invalidated as _messageQueue
|
||||||
|
// has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) )
|
||||||
|
if (!(*i)->sent()) {
|
||||||
(*i)->send(_client);
|
(*i)->send(_client);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,56 +294,70 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
|
|||||||
client->write((const char *)temp, 2053);
|
client->write((const char *)temp, 2053);
|
||||||
free(temp);
|
free(temp);
|
||||||
}*/
|
}*/
|
||||||
|
_client_queue_lock.lock();
|
||||||
_clients.add(client);
|
_clients.add(client);
|
||||||
if(_connectcb)
|
if(_connectcb)
|
||||||
_connectcb(client);
|
_connectcb(client);
|
||||||
|
_client_queue_lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
|
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
|
||||||
|
_client_queue_lock.lock();
|
||||||
_clients.remove(client);
|
_clients.remove(client);
|
||||||
|
_client_queue_lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncEventSource::close(){
|
void AsyncEventSource::close(){
|
||||||
|
// While the whole loop is not done, the linked list is locked and so the
|
||||||
|
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
|
||||||
|
// is called very early
|
||||||
|
_client_queue_lock.lock();
|
||||||
for(const auto &c: _clients){
|
for(const auto &c: _clients){
|
||||||
if(c->connected())
|
if(c->connected())
|
||||||
c->close();
|
c->close();
|
||||||
}
|
}
|
||||||
|
_client_queue_lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
// pmb fix
|
// pmb fix
|
||||||
size_t AsyncEventSource::avgPacketsWaiting() const {
|
size_t AsyncEventSource::avgPacketsWaiting() const {
|
||||||
if(_clients.isEmpty())
|
size_t aql = 0;
|
||||||
|
uint32_t nConnectedClients = 0;
|
||||||
|
_client_queue_lock.lock();
|
||||||
|
if (_clients.isEmpty()) {
|
||||||
|
_client_queue_lock.unlock();
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
size_t aql=0;
|
|
||||||
uint32_t nConnectedClients=0;
|
|
||||||
|
|
||||||
for(const auto &c: _clients){
|
for(const auto &c: _clients){
|
||||||
if(c->connected()) {
|
if(c->connected()) {
|
||||||
aql+=c->packetsWaiting();
|
aql += c->packetsWaiting();
|
||||||
++nConnectedClients;
|
++nConnectedClients;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// return aql / nConnectedClients;
|
_client_queue_lock.unlock();
|
||||||
return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
|
return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
|
void AsyncEventSource::send(
|
||||||
|
const char *message, const char *event, uint32_t id, uint32_t reconnect){
|
||||||
|
|
||||||
String ev = generateEventMessage(message, event, id, reconnect);
|
String ev = generateEventMessage(message, event, id, reconnect);
|
||||||
|
_client_queue_lock.lock();
|
||||||
for(const auto &c: _clients){
|
for(const auto &c: _clients){
|
||||||
if(c->connected()) {
|
if(c->connected()) {
|
||||||
c->write(ev.c_str(), ev.length());
|
c->_write(ev.c_str(), ev.length());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_client_queue_lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t AsyncEventSource::count() const {
|
size_t AsyncEventSource::count() const {
|
||||||
return _clients.count_if([](AsyncEventSourceClient *c){
|
size_t n_clients;
|
||||||
return c->connected();
|
_client_queue_lock.lock();
|
||||||
});
|
n_clients = _clients.count_if([](AsyncEventSourceClient *c){
|
||||||
|
return c->connected();
|
||||||
|
});
|
||||||
|
_client_queue_lock.unlock();
|
||||||
|
return n_clients;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
|
bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
|
||||||
|
@@ -72,6 +72,8 @@ class AsyncEventSourceClient {
|
|||||||
AsyncEventSource *_server;
|
AsyncEventSource *_server;
|
||||||
uint32_t _lastId;
|
uint32_t _lastId;
|
||||||
LinkedList<AsyncEventSourceMessage *> _messageQueue;
|
LinkedList<AsyncEventSourceMessage *> _messageQueue;
|
||||||
|
// ArFi 2020-08-27 for protecting/serializing _messageQueue
|
||||||
|
AsyncPlainLock _lockmq;
|
||||||
void _queueMessage(AsyncEventSourceMessage *dataMessage);
|
void _queueMessage(AsyncEventSourceMessage *dataMessage);
|
||||||
void _runQueue();
|
void _runQueue();
|
||||||
|
|
||||||
@@ -82,12 +84,12 @@ class AsyncEventSourceClient {
|
|||||||
|
|
||||||
AsyncClient* client(){ return _client; }
|
AsyncClient* client(){ return _client; }
|
||||||
void close();
|
void close();
|
||||||
void write(const char * message, size_t len);
|
|
||||||
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
|
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
|
||||||
bool connected() const { return (_client != NULL) && _client->connected(); }
|
bool connected() const { return (_client != NULL) && _client->connected(); }
|
||||||
uint32_t lastId() const { return _lastId; }
|
uint32_t lastId() const { return _lastId; }
|
||||||
size_t packetsWaiting() const { return _messageQueue.length(); }
|
size_t packetsWaiting() const;
|
||||||
|
|
||||||
|
void _write(const char * message, size_t len);
|
||||||
//system callbacks (do not call)
|
//system callbacks (do not call)
|
||||||
void _onAck(size_t len, uint32_t time);
|
void _onAck(size_t len, uint32_t time);
|
||||||
void _onPoll();
|
void _onPoll();
|
||||||
@@ -99,7 +101,11 @@ class AsyncEventSource: public AsyncWebHandler {
|
|||||||
private:
|
private:
|
||||||
String _url;
|
String _url;
|
||||||
LinkedList<AsyncEventSourceClient *> _clients;
|
LinkedList<AsyncEventSourceClient *> _clients;
|
||||||
|
// Same as for individual messages, protect mutations of _clients list
|
||||||
|
// since simultaneous access from different tasks is possible
|
||||||
|
AsyncPlainLock _client_queue_lock;
|
||||||
ArEventHandlerFunction _connectcb;
|
ArEventHandlerFunction _connectcb;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AsyncEventSource(const String& url);
|
AsyncEventSource(const String& url);
|
||||||
~AsyncEventSource();
|
~AsyncEventSource();
|
||||||
@@ -108,7 +114,7 @@ class AsyncEventSource: public AsyncWebHandler {
|
|||||||
void close();
|
void close();
|
||||||
void onConnect(ArEventHandlerFunction cb);
|
void onConnect(ArEventHandlerFunction cb);
|
||||||
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
|
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
|
||||||
size_t count() const; //number clinets connected
|
size_t count() const; //number clients connected
|
||||||
size_t avgPacketsWaiting() const;
|
size_t avgPacketsWaiting() const;
|
||||||
|
|
||||||
//system callbacks (do not call)
|
//system callbacks (do not call)
|
||||||
|
@@ -7,6 +7,36 @@
|
|||||||
|
|
||||||
#ifdef ESP32
|
#ifdef ESP32
|
||||||
|
|
||||||
|
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
|
||||||
|
// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not
|
||||||
|
// always available. According to example by Arjan Filius, changed name,
|
||||||
|
// added unimplemented version for ESP8266
|
||||||
|
class AsyncPlainLock
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
SemaphoreHandle_t _lock;
|
||||||
|
mutable void *_lockedBy;
|
||||||
|
|
||||||
|
public:
|
||||||
|
AsyncPlainLock() {
|
||||||
|
_lock = xSemaphoreCreateBinary();
|
||||||
|
xSemaphoreGive(_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
~AsyncPlainLock() {
|
||||||
|
vSemaphoreDelete(_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool lock() const {
|
||||||
|
xSemaphoreTake(_lock, portMAX_DELAY);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void unlock() const {
|
||||||
|
xSemaphoreGive(_lock);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
|
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
|
||||||
class AsyncWebLock
|
class AsyncWebLock
|
||||||
{
|
{
|
||||||
@@ -61,6 +91,10 @@ public:
|
|||||||
void unlock() const {
|
void unlock() const {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above.
|
||||||
|
using AsyncPlainLock = AsyncWebLock;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
class AsyncWebLockGuard
|
class AsyncWebLockGuard
|
||||||
|
Reference in New Issue
Block a user