diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 98f18de..69371c6 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -18,10 +18,10 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "Arduino.h" -#include "AsyncEventSource.h" #ifndef ESP8266 #include #endif +#include "AsyncEventSource.h" static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev; @@ -177,9 +177,10 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A } AsyncEventSourceClient::~AsyncEventSourceClient(){ - _lockmq.lock(); +#ifdef ESP32 + std::lock_guard lock(_lockmq); +#endif _messageQueue.free(); - _lockmq.unlock(); close(); } @@ -190,8 +191,10 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage) delete dataMessage; return; } +#ifdef ESP32 //length() is not thread-safe, thus acquiring the lock before this call.. - _lockmq.lock(); + std::lock_guard lock(_lockmq); +#endif if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){ #ifdef ESP8266 ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); @@ -206,27 +209,29 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage) _runQueue(); } } - _lockmq.unlock(); } void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ +#ifdef ESP32 // Same here, acquiring the lock early - _lockmq.lock(); + std::lock_guard lock(_lockmq); +#endif while(len && !_messageQueue.isEmpty()){ len = _messageQueue.front()->ack(len, time); if(_messageQueue.front()->finished()) _messageQueue.remove(_messageQueue.front()); } _runQueue(); - _lockmq.unlock(); } void AsyncEventSourceClient::_onPoll(){ - _lockmq.lock(); +#ifdef ESP32 + // Same here, acquiring the lock early + std::lock_guard lock(_lockmq); +#endif if(!_messageQueue.isEmpty()){ _runQueue(); } - _lockmq.unlock(); } void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){ @@ -253,10 +258,10 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32 } size_t AsyncEventSourceClient::packetsWaiting() const { - size_t len; - _lockmq.lock(); - len = _messageQueue.length(); - _lockmq.unlock(); +#ifdef ESP32 + std::lock_guard lock(_lockmq); +#endif + size_t len = _messageQueue.length(); return len; } @@ -306,14 +311,18 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ client->write((const char *)temp, 2053); free(temp); }*/ - AsyncWebLockGuard l(_client_queue_lock); +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif _clients.add(client); if(_connectcb) _connectcb(client); } void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){ - AsyncWebLockGuard l(_client_queue_lock); +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif _clients.remove(client); } @@ -321,7 +330,9 @@ void AsyncEventSource::close(){ // While the whole loop is not done, the linked list is locked and so the // iterator should remain valid even when AsyncEventSource::_handleDisconnect() // is called very early - AsyncWebLockGuard l(_client_queue_lock); +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif for(const auto &c: _clients){ if(c->connected()) c->close(); @@ -332,7 +343,9 @@ void AsyncEventSource::close(){ size_t AsyncEventSource::avgPacketsWaiting() const { size_t aql = 0; uint32_t nConnectedClients = 0; - AsyncWebLockGuard l(_client_queue_lock); +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif if (_clients.isEmpty()) { return 0; } @@ -348,7 +361,9 @@ size_t AsyncEventSource::avgPacketsWaiting() const { void AsyncEventSource::send( const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev = generateEventMessage(message, event, id, reconnect); - AsyncWebLockGuard l(_client_queue_lock); +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif for(const auto &c: _clients){ if(c->connected()) { c->write(ev.c_str(), ev.length()); @@ -358,7 +373,9 @@ void AsyncEventSource::send( size_t AsyncEventSource::count() const { size_t n_clients; - AsyncWebLockGuard l(_client_queue_lock); +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif n_clients = _clients.count_if([](AsyncEventSourceClient *c){ return c->connected(); }); diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index 2efb0dd..6d76b2f 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -22,11 +22,12 @@ #include #ifdef ESP32 +#include #include #ifndef SSE_MAX_QUEUED_MESSAGES #define SSE_MAX_QUEUED_MESSAGES 32 #endif -#else +#else // esp8266 #include #ifndef SSE_MAX_QUEUED_MESSAGES #define SSE_MAX_QUEUED_MESSAGES 8 @@ -35,8 +36,6 @@ #include -#include "AsyncWebSynchronization.h" - #ifdef ESP8266 #include #ifdef CRYPTO_HASH_h // include Hash.h from espressif framework if the first include was from the crypto library @@ -81,7 +80,9 @@ class AsyncEventSourceClient { uint32_t _lastId; LinkedList _messageQueue; // ArFi 2020-08-27 for protecting/serializing _messageQueue - AsyncPlainLock _lockmq; +#ifdef ESP32 + mutable std::mutex _lockmq; +#endif void _queueMessage(AsyncEventSourceMessage *dataMessage); void _runQueue(); @@ -109,9 +110,11 @@ class AsyncEventSource: public AsyncWebHandler { private: String _url; LinkedList _clients; +#ifdef ESP32 // Same as for individual messages, protect mutations of _clients list // since simultaneous access from different tasks is possible - AsyncWebLock _client_queue_lock; + mutable std::mutex _client_queue_lock; +#endif ArEventHandlerFunction _connectcb; ArAuthorizeConnectHandler _authorizeConnectHandler; public: diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index df6821d..926d6a4 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -314,8 +314,9 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async AsyncWebSocketClient::~AsyncWebSocketClient() { { - AsyncWebLockGuard l(_lock); - + #ifdef ESP32 + std::lock_guard lock(_lock); + #endif _messageQueue.clear(); _controlQueue.clear(); } @@ -331,7 +332,9 @@ void AsyncWebSocketClient::_clearQueue() void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){ _lastMessageTime = millis(); - AsyncWebLockGuard l(_lock); + #ifdef ESP32 + std::lock_guard lock(_lock); + #endif if (!_controlQueue.empty()) { auto &head = _controlQueue.front(); @@ -340,7 +343,6 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){ if (_status == WS_DISCONNECTING && head.opcode() == WS_DISCONNECT){ _controlQueue.pop_front(); _status = WS_DISCONNECTED; - l.unlock(); if (_client) _client->close(true); return; } @@ -362,65 +364,64 @@ void AsyncWebSocketClient::_onPoll() if (!_client) return; - AsyncWebLockGuard l(_lock); + #ifdef ESP32 + std::unique_lock lock(_lock); + #endif if (_client->canSend() && (!_controlQueue.empty() || !_messageQueue.empty())) { - l.unlock(); _runQueue(); } else if (_keepAlivePeriod > 0 && (millis() - _lastMessageTime) >= _keepAlivePeriod && (_controlQueue.empty() && _messageQueue.empty())) { - l.unlock(); +#ifdef ESP32 + lock.unlock(); +#endif ping((uint8_t *)AWSC_PING_PAYLOAD, AWSC_PING_PAYLOAD_LEN); } } void AsyncWebSocketClient::_runQueue() { + // all calls to this method MUST be protected by a mutex lock! if (!_client) return; - AsyncWebLockGuard l(_lock); - _clearQueue(); if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)) { - //l.unlock(); _controlQueue.front().send(_client); } else if (!_messageQueue.empty() && _messageQueue.front().betweenFrames() && webSocketSendFrameWindow(_client)) { - //l.unlock(); _messageQueue.front().send(_client); } } bool AsyncWebSocketClient::queueIsFull() const { - size_t size; - { - AsyncWebLockGuard l(_lock); - size = _messageQueue.size(); - } + #ifdef ESP32 + std::lock_guard lock(_lock); + #endif + size_t size = _messageQueue.size();; return (size >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED); } size_t AsyncWebSocketClient::queueLen() const { - AsyncWebLockGuard l(_lock); + #ifdef ESP32 + std::lock_guard lock(_lock); + #endif return _messageQueue.size() + _controlQueue.size(); } bool AsyncWebSocketClient::canSend() const { - size_t size; - { - AsyncWebLockGuard l(_lock); - size = _messageQueue.size(); - } - return size < WS_MAX_QUEUED_MESSAGES; + #ifdef ESP32 + std::lock_guard lock(_lock); + #endif + return _messageQueue.size() < WS_MAX_QUEUED_MESSAGES; } void AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, size_t len, bool mask) @@ -429,7 +430,9 @@ void AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si return; { - AsyncWebLockGuard l(_lock); + #ifdef ESP32 + std::lock_guard lock(_lock); + #endif _controlQueue.emplace_back(opcode, data, len, mask); } @@ -439,42 +442,34 @@ void AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si void AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask) { - if(_status != WS_CONNECTED) - return; - - if (!_client) - return; - - if (buffer->size() == 0) + if(!_client || buffer->size() == 0 || _status != WS_CONNECTED) return; + #ifdef ESP32 + std::lock_guard lock(_lock); + #endif + if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) { - AsyncWebLockGuard l(_lock); - if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) + if(closeWhenFull) { - l.unlock(); - if(closeWhenFull) - { #ifdef ESP8266 - ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: closing connection\n"); + ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: closing connection\n"); #else - log_e("Too many messages queued: closing connection"); + log_e("Too many messages queued: closing connection"); #endif - _status = WS_DISCONNECTED; - if (_client) _client->close(true); - } else { + _status = WS_DISCONNECTED; + if (_client) _client->close(true); + } else { #ifdef ESP8266 - ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: discarding new message\n"); + ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: discarding new message\n"); #else - log_e("Too many messages queued: discarding new message"); + log_e("Too many messages queued: discarding new message"); #endif - } - return; - } - else - { - _messageQueue.emplace_back(buffer, opcode, mask); } + return; + } + else { + _messageQueue.emplace_back(buffer, opcode, mask); } if (_client && _client->canSend()) diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index cde0b43..a40fc85 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -23,6 +23,7 @@ #include #ifdef ESP32 +#include #include #ifndef WS_MAX_QUEUED_MESSAGES #define WS_MAX_QUEUED_MESSAGES 32 @@ -35,7 +36,6 @@ #endif #include -#include "AsyncWebSynchronization.h" #include #include @@ -136,9 +136,9 @@ class AsyncWebSocketClient { AsyncWebSocket *_server; uint32_t _clientId; AwsClientStatus _status; - - AsyncWebLock _lock; - +#ifdef ESP32 + mutable std::mutex _lock; +#endif std::deque _controlQueue; std::deque _messageQueue; bool closeWhenFull = true; @@ -260,7 +260,9 @@ class AsyncWebSocket: public AsyncWebHandler { AwsEventHandler _eventHandler{nullptr}; AwsHandshakeHandler _handshakeHandler; bool _enabled; - AsyncWebLock _lock; +#ifdef ESP32 + mutable std::mutex _lock; +#endif public: explicit AsyncWebSocket(const char* url) : _url(url) ,_cNextId(1), _enabled(true) {} diff --git a/src/AsyncWebSynchronization.h b/src/AsyncWebSynchronization.h deleted file mode 100644 index 0ff8ab6..0000000 --- a/src/AsyncWebSynchronization.h +++ /dev/null @@ -1,134 +0,0 @@ -#ifndef ASYNCWEBSYNCHRONIZATION_H_ -#define ASYNCWEBSYNCHRONIZATION_H_ - -// Synchronisation is only available on ESP32, as the ESP8266 isn't using FreeRTOS by default - -#include - -#ifdef ESP32 - -// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore -// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not -// always available. According to example by Arjan Filius, changed name, -// added unimplemented version for ESP8266 -class AsyncPlainLock -{ -private: - SemaphoreHandle_t _lock; - -public: - AsyncPlainLock() { - _lock = xSemaphoreCreateBinary(); - // In this fails, the system is likely that much out of memory that - // we should abort anyways. If assertions are disabled, nothing is lost.. - assert(_lock); - xSemaphoreGive(_lock); - } - - ~AsyncPlainLock() { - vSemaphoreDelete(_lock); - } - - bool lock() const { - xSemaphoreTake(_lock, portMAX_DELAY); - return true; - } - - void unlock() const { - xSemaphoreGive(_lock); - } -}; - -// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore -class AsyncWebLock -{ -private: - SemaphoreHandle_t _lock; - mutable TaskHandle_t _lockedBy{}; - -public: - AsyncWebLock() - { - _lock = xSemaphoreCreateBinary(); - // In this fails, the system is likely that much out of memory that - // we should abort anyways. If assertions are disabled, nothing is lost.. - assert(_lock); - _lockedBy = NULL; - xSemaphoreGive(_lock); - } - - ~AsyncWebLock() { - vSemaphoreDelete(_lock); - } - - bool lock() const { - const auto currentTask = xTaskGetCurrentTaskHandle(); - if (_lockedBy != currentTask) { - xSemaphoreTake(_lock, portMAX_DELAY); - _lockedBy = currentTask; - return true; - } - return false; - } - - void unlock() const { - _lockedBy = NULL; - xSemaphoreGive(_lock); - } -}; - -#else - -// This is the 8266 version of the Sync Lock which is currently unimplemented -class AsyncWebLock -{ - -public: - AsyncWebLock() { - } - - ~AsyncWebLock() { - } - - bool lock() const { - return false; - } - - void unlock() const { - } -}; - -// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above. -using AsyncPlainLock = AsyncWebLock; - -#endif - -class AsyncWebLockGuard -{ -private: - const AsyncWebLock *_lock; - -public: - AsyncWebLockGuard(const AsyncWebLock &l) { - if (l.lock()) { - _lock = &l; - } else { - _lock = NULL; - } - } - - ~AsyncWebLockGuard() { - if (_lock) { - _lock->unlock(); - } - } - - void unlock() { - if (_lock) { - _lock->unlock(); - _lock = NULL; - } - } -}; - -#endif // ASYNCWEBSYNCHRONIZATION_H_