Merge branch 'ul-gh-fix_884' into yuboxfixes

This commit is contained in:
Alex Villacís Lasso
2020-12-23 19:36:53 -05:00
3 changed files with 110 additions and 34 deletions

View File

@@ -173,7 +173,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
} }
AsyncEventSourceClient::~AsyncEventSourceClient(){ AsyncEventSourceClient::~AsyncEventSourceClient(){
_lockmq.lock();
_messageQueue.free(); _messageQueue.free();
_lockmq.unlock();
close(); close();
} }
@@ -184,33 +186,41 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
delete dataMessage; delete dataMessage;
return; return;
} }
//length() is not thread-safe, thus acquiring the lock before this call..
_lockmq.lock();
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){ if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
delete dataMessage; delete dataMessage;
} else { } else {
_messageQueue.add(dataMessage); _messageQueue.add(dataMessage);
} // runqueue trigger when new messages added
if(_client->canSend()) if(_client->canSend()) {
_runQueue(); _runQueue();
} }
}
_lockmq.unlock();
}
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
// Same here, acquiring the lock early
_lockmq.lock();
while(len && !_messageQueue.isEmpty()){ while(len && !_messageQueue.isEmpty()){
len = _messageQueue.front()->ack(len, time); len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished()) if(_messageQueue.front()->finished())
_messageQueue.remove(_messageQueue.front()); _messageQueue.remove(_messageQueue.front());
} }
_runQueue(); _runQueue();
_lockmq.unlock();
} }
void AsyncEventSourceClient::_onPoll(){ void AsyncEventSourceClient::_onPoll(){
_lockmq.lock();
if(!_messageQueue.isEmpty()){ if(!_messageQueue.isEmpty()){
_runQueue(); _runQueue();
} }
_lockmq.unlock();
} }
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){ void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
_client->close(true); _client->close(true);
} }
@@ -225,7 +235,7 @@ void AsyncEventSourceClient::close(){
_client->close(); _client->close();
} }
void AsyncEventSourceClient::write(const char * message, size_t len){ void AsyncEventSourceClient::_write(const char * message, size_t len){
_queueMessage(new AsyncEventSourceMessage(message, len)); _queueMessage(new AsyncEventSourceMessage(message, len));
} }
@@ -234,17 +244,25 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length())); _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
} }
void AsyncEventSourceClient::_runQueue(){ size_t AsyncEventSourceClient::packetsWaiting() const {
while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){ size_t len;
_messageQueue.remove(_messageQueue.front()); _lockmq.lock();
len = _messageQueue.length();
_lockmq.unlock();
return len;
} }
for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) void AsyncEventSourceClient::_runQueue() {
{ // Calls to this private method now already protected by _lockmq acquisition
if(!(*i)->sent()) // so no extra call of _lockmq.lock() here..
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
// If it crashes here, iterator (i) has been invalidated as _messageQueue
// has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) )
if (!(*i)->sent()) {
(*i)->send(_client); (*i)->send(_client);
} }
} }
}
// Handler // Handler
@@ -280,56 +298,70 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
client->write((const char *)temp, 2053); client->write((const char *)temp, 2053);
free(temp); free(temp);
}*/ }*/
_client_queue_lock.lock();
_clients.add(client); _clients.add(client);
if(_connectcb) if(_connectcb)
_connectcb(client); _connectcb(client);
_client_queue_lock.unlock();
} }
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
_client_queue_lock.lock();
_clients.remove(client); _clients.remove(client);
_client_queue_lock.unlock();
} }
void AsyncEventSource::close(){ void AsyncEventSource::close(){
// While the whole loop is not done, the linked list is locked and so the
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
// is called very early
_client_queue_lock.lock();
for(const auto &c: _clients){ for(const auto &c: _clients){
if(c->connected()) if(c->connected())
c->close(); c->close();
} }
_client_queue_lock.unlock();
} }
// pmb fix // pmb fix
size_t AsyncEventSource::avgPacketsWaiting() const { size_t AsyncEventSource::avgPacketsWaiting() const {
if(_clients.isEmpty())
return 0;
size_t aql = 0; size_t aql = 0;
uint32_t nConnectedClients = 0; uint32_t nConnectedClients = 0;
_client_queue_lock.lock();
if (_clients.isEmpty()) {
_client_queue_lock.unlock();
return 0;
}
for(const auto &c: _clients){ for(const auto &c: _clients){
if(c->connected()) { if(c->connected()) {
aql += c->packetsWaiting(); aql += c->packetsWaiting();
++nConnectedClients; ++nConnectedClients;
} }
} }
// return aql / nConnectedClients; _client_queue_lock.unlock();
return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up
} }
void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ void AsyncEventSource::send(
const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = generateEventMessage(message, event, id, reconnect); String ev = generateEventMessage(message, event, id, reconnect);
_client_queue_lock.lock();
for(const auto &c: _clients){ for(const auto &c: _clients){
if(c->connected()) { if(c->connected()) {
c->write(ev.c_str(), ev.length()); c->_write(ev.c_str(), ev.length());
} }
} }
_client_queue_lock.unlock();
} }
size_t AsyncEventSource::count() const { size_t AsyncEventSource::count() const {
return _clients.count_if([](AsyncEventSourceClient *c){ size_t n_clients;
_client_queue_lock.lock();
n_clients = _clients.count_if([](AsyncEventSourceClient *c){
return c->connected(); return c->connected();
}); });
_client_queue_lock.unlock();
return n_clients;
} }
bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){ bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){

View File

@@ -73,6 +73,8 @@ class AsyncEventSourceClient {
AsyncEventSource *_server; AsyncEventSource *_server;
uint32_t _lastId; uint32_t _lastId;
LinkedList<AsyncEventSourceMessage *> _messageQueue; LinkedList<AsyncEventSourceMessage *> _messageQueue;
// ArFi 2020-08-27 for protecting/serializing _messageQueue
AsyncPlainLock _lockmq;
void _queueMessage(AsyncEventSourceMessage *dataMessage); void _queueMessage(AsyncEventSourceMessage *dataMessage);
void _runQueue(); void _runQueue();
@@ -83,12 +85,12 @@ class AsyncEventSourceClient {
AsyncClient* client(){ return _client; } AsyncClient* client(){ return _client; }
void close(); void close();
void write(const char * message, size_t len);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
bool connected() const { return (_client != NULL) && _client->connected(); } bool connected() const { return (_client != NULL) && _client->connected(); }
uint32_t lastId() const { return _lastId; } uint32_t lastId() const { return _lastId; }
size_t packetsWaiting() const { return _messageQueue.length(); } size_t packetsWaiting() const;
void _write(const char * message, size_t len);
//system callbacks (do not call) //system callbacks (do not call)
void _onAck(size_t len, uint32_t time); void _onAck(size_t len, uint32_t time);
void _onPoll(); void _onPoll();
@@ -100,6 +102,9 @@ class AsyncEventSource: public AsyncWebHandler {
private: private:
String _url; String _url;
LinkedList<AsyncEventSourceClient *> _clients; LinkedList<AsyncEventSourceClient *> _clients;
// Same as for individual messages, protect mutations of _clients list
// since simultaneous access from different tasks is possible
AsyncPlainLock _client_queue_lock;
ArEventHandlerFunction _connectcb; ArEventHandlerFunction _connectcb;
ArAuthorizeConnectHandler _authorizeConnectHandler; ArAuthorizeConnectHandler _authorizeConnectHandler;
public: public:
@@ -111,7 +116,7 @@ class AsyncEventSource: public AsyncWebHandler {
void onConnect(ArEventHandlerFunction cb); void onConnect(ArEventHandlerFunction cb);
void authorizeConnect(ArAuthorizeConnectHandler cb); void authorizeConnect(ArAuthorizeConnectHandler cb);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
size_t count() const; //number clinets connected size_t count() const; //number clients connected
size_t avgPacketsWaiting() const; size_t avgPacketsWaiting() const;
//system callbacks (do not call) //system callbacks (do not call)

View File

@@ -7,6 +7,38 @@
#ifdef ESP32 #ifdef ESP32
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not
// always available. According to example by Arjan Filius, changed name,
// added unimplemented version for ESP8266
class AsyncPlainLock
{
private:
SemaphoreHandle_t _lock;
public:
AsyncPlainLock() {
_lock = xSemaphoreCreateBinary();
// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
xSemaphoreGive(_lock);
}
~AsyncPlainLock() {
vSemaphoreDelete(_lock);
}
bool lock() const {
xSemaphoreTake(_lock, portMAX_DELAY);
return true;
}
void unlock() const {
xSemaphoreGive(_lock);
}
};
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore // This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
class AsyncWebLock class AsyncWebLock
{ {
@@ -17,6 +49,9 @@ private:
public: public:
AsyncWebLock() { AsyncWebLock() {
_lock = xSemaphoreCreateBinary(); _lock = xSemaphoreCreateBinary();
// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
_lockedBy = NULL; _lockedBy = NULL;
xSemaphoreGive(_lock); xSemaphoreGive(_lock);
} }
@@ -61,6 +96,10 @@ public:
void unlock() const { void unlock() const {
} }
}; };
// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above.
using AsyncPlainLock = AsyncWebLock;
#endif #endif
class AsyncWebLockGuard class AsyncWebLockGuard