replace self-implemented mutex structures with std:mutex

Use STL's platform implementation for mutex locks
This commit is contained in:
Emil Muratov
2024-06-26 16:50:45 +09:00
committed by Mathieu Carbou
parent 16fd203198
commit d614ccde91
5 changed files with 95 additions and 212 deletions

View File

@@ -18,10 +18,10 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "Arduino.h"
#include "AsyncEventSource.h"
#ifndef ESP8266
#include <rom/ets_sys.h>
#endif
#include "AsyncEventSource.h"
static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev;
@@ -177,9 +177,10 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
}
AsyncEventSourceClient::~AsyncEventSourceClient(){
_lockmq.lock();
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lockmq);
#endif
_messageQueue.free();
_lockmq.unlock();
close();
}
@@ -190,8 +191,10 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
delete dataMessage;
return;
}
#ifdef ESP32
//length() is not thread-safe, thus acquiring the lock before this call..
_lockmq.lock();
std::lock_guard<std::mutex> lock(_lockmq);
#endif
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
#ifdef ESP8266
ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
@@ -206,27 +209,29 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
_runQueue();
}
}
_lockmq.unlock();
}
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
#ifdef ESP32
// Same here, acquiring the lock early
_lockmq.lock();
std::lock_guard<std::mutex> lock(_lockmq);
#endif
while(len && !_messageQueue.isEmpty()){
len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished())
_messageQueue.remove(_messageQueue.front());
}
_runQueue();
_lockmq.unlock();
}
void AsyncEventSourceClient::_onPoll(){
_lockmq.lock();
#ifdef ESP32
// Same here, acquiring the lock early
std::lock_guard<std::mutex> lock(_lockmq);
#endif
if(!_messageQueue.isEmpty()){
_runQueue();
}
_lockmq.unlock();
}
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
@@ -253,10 +258,10 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
}
size_t AsyncEventSourceClient::packetsWaiting() const {
size_t len;
_lockmq.lock();
len = _messageQueue.length();
_lockmq.unlock();
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lockmq);
#endif
size_t len = _messageQueue.length();
return len;
}
@@ -306,14 +311,18 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
client->write((const char *)temp, 2053);
free(temp);
}*/
AsyncWebLockGuard l(_client_queue_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
_clients.add(client);
if(_connectcb)
_connectcb(client);
}
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
AsyncWebLockGuard l(_client_queue_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
_clients.remove(client);
}
@@ -321,7 +330,9 @@ void AsyncEventSource::close(){
// While the whole loop is not done, the linked list is locked and so the
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
// is called very early
AsyncWebLockGuard l(_client_queue_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
for(const auto &c: _clients){
if(c->connected())
c->close();
@@ -332,7 +343,9 @@ void AsyncEventSource::close(){
size_t AsyncEventSource::avgPacketsWaiting() const {
size_t aql = 0;
uint32_t nConnectedClients = 0;
AsyncWebLockGuard l(_client_queue_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
if (_clients.isEmpty()) {
return 0;
}
@@ -348,7 +361,9 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
void AsyncEventSource::send(
const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = generateEventMessage(message, event, id, reconnect);
AsyncWebLockGuard l(_client_queue_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
for(const auto &c: _clients){
if(c->connected()) {
c->write(ev.c_str(), ev.length());
@@ -358,7 +373,9 @@ void AsyncEventSource::send(
size_t AsyncEventSource::count() const {
size_t n_clients;
AsyncWebLockGuard l(_client_queue_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
n_clients = _clients.count_if([](AsyncEventSourceClient *c){
return c->connected();
});

View File

@@ -22,11 +22,12 @@
#include <Arduino.h>
#ifdef ESP32
#include <mutex>
#include <AsyncTCP.h>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 32
#endif
#else
#else // esp8266
#include <ESPAsyncTCP.h>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 8
@@ -35,8 +36,6 @@
#include <ESPAsyncWebServer.h>
#include "AsyncWebSynchronization.h"
#ifdef ESP8266
#include <Hash.h>
#ifdef CRYPTO_HASH_h // include Hash.h from espressif framework if the first include was from the crypto library
@@ -81,7 +80,9 @@ class AsyncEventSourceClient {
uint32_t _lastId;
LinkedList<AsyncEventSourceMessage *> _messageQueue;
// ArFi 2020-08-27 for protecting/serializing _messageQueue
AsyncPlainLock _lockmq;
#ifdef ESP32
mutable std::mutex _lockmq;
#endif
void _queueMessage(AsyncEventSourceMessage *dataMessage);
void _runQueue();
@@ -109,9 +110,11 @@ class AsyncEventSource: public AsyncWebHandler {
private:
String _url;
LinkedList<AsyncEventSourceClient *> _clients;
#ifdef ESP32
// Same as for individual messages, protect mutations of _clients list
// since simultaneous access from different tasks is possible
AsyncWebLock _client_queue_lock;
mutable std::mutex _client_queue_lock;
#endif
ArEventHandlerFunction _connectcb;
ArAuthorizeConnectHandler _authorizeConnectHandler;
public:

View File

@@ -314,8 +314,9 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async
AsyncWebSocketClient::~AsyncWebSocketClient()
{
{
AsyncWebLockGuard l(_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
#endif
_messageQueue.clear();
_controlQueue.clear();
}
@@ -331,7 +332,9 @@ void AsyncWebSocketClient::_clearQueue()
void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){
_lastMessageTime = millis();
AsyncWebLockGuard l(_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
#endif
if (!_controlQueue.empty()) {
auto &head = _controlQueue.front();
@@ -340,7 +343,6 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){
if (_status == WS_DISCONNECTING && head.opcode() == WS_DISCONNECT){
_controlQueue.pop_front();
_status = WS_DISCONNECTED;
l.unlock();
if (_client) _client->close(true);
return;
}
@@ -362,65 +364,64 @@ void AsyncWebSocketClient::_onPoll()
if (!_client)
return;
AsyncWebLockGuard l(_lock);
#ifdef ESP32
std::unique_lock<std::mutex> lock(_lock);
#endif
if (_client->canSend() && (!_controlQueue.empty() || !_messageQueue.empty()))
{
l.unlock();
_runQueue();
}
else if (_keepAlivePeriod > 0 && (millis() - _lastMessageTime) >= _keepAlivePeriod && (_controlQueue.empty() && _messageQueue.empty()))
{
l.unlock();
#ifdef ESP32
lock.unlock();
#endif
ping((uint8_t *)AWSC_PING_PAYLOAD, AWSC_PING_PAYLOAD_LEN);
}
}
void AsyncWebSocketClient::_runQueue()
{
// all calls to this method MUST be protected by a mutex lock!
if (!_client)
return;
AsyncWebLockGuard l(_lock);
_clearQueue();
if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1))
{
//l.unlock();
_controlQueue.front().send(_client);
}
else if (!_messageQueue.empty() && _messageQueue.front().betweenFrames() && webSocketSendFrameWindow(_client))
{
//l.unlock();
_messageQueue.front().send(_client);
}
}
bool AsyncWebSocketClient::queueIsFull() const
{
size_t size;
{
AsyncWebLockGuard l(_lock);
size = _messageQueue.size();
}
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
#endif
size_t size = _messageQueue.size();;
return (size >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED);
}
size_t AsyncWebSocketClient::queueLen() const
{
AsyncWebLockGuard l(_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
#endif
return _messageQueue.size() + _controlQueue.size();
}
bool AsyncWebSocketClient::canSend() const
{
size_t size;
{
AsyncWebLockGuard l(_lock);
size = _messageQueue.size();
}
return size < WS_MAX_QUEUED_MESSAGES;
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
#endif
return _messageQueue.size() < WS_MAX_QUEUED_MESSAGES;
}
void AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, size_t len, bool mask)
@@ -429,7 +430,9 @@ void AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si
return;
{
AsyncWebLockGuard l(_lock);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
#endif
_controlQueue.emplace_back(opcode, data, len, mask);
}
@@ -439,42 +442,34 @@ void AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si
void AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask)
{
if(_status != WS_CONNECTED)
return;
if (!_client)
return;
if (buffer->size() == 0)
if(!_client || buffer->size() == 0 || _status != WS_CONNECTED)
return;
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
#endif
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES)
{
AsyncWebLockGuard l(_lock);
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES)
if(closeWhenFull)
{
l.unlock();
if(closeWhenFull)
{
#ifdef ESP8266
ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: closing connection\n");
ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: closing connection\n");
#else
log_e("Too many messages queued: closing connection");
log_e("Too many messages queued: closing connection");
#endif
_status = WS_DISCONNECTED;
if (_client) _client->close(true);
} else {
_status = WS_DISCONNECTED;
if (_client) _client->close(true);
} else {
#ifdef ESP8266
ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: discarding new message\n");
ets_printf("AsyncWebSocketClient::_queueMessage: Too many messages queued: discarding new message\n");
#else
log_e("Too many messages queued: discarding new message");
log_e("Too many messages queued: discarding new message");
#endif
}
return;
}
else
{
_messageQueue.emplace_back(buffer, opcode, mask);
}
return;
}
else {
_messageQueue.emplace_back(buffer, opcode, mask);
}
if (_client && _client->canSend())

View File

@@ -23,6 +23,7 @@
#include <Arduino.h>
#ifdef ESP32
#include <mutex>
#include <AsyncTCP.h>
#ifndef WS_MAX_QUEUED_MESSAGES
#define WS_MAX_QUEUED_MESSAGES 32
@@ -35,7 +36,6 @@
#endif
#include <ESPAsyncWebServer.h>
#include "AsyncWebSynchronization.h"
#include <list>
#include <deque>
@@ -136,9 +136,9 @@ class AsyncWebSocketClient {
AsyncWebSocket *_server;
uint32_t _clientId;
AwsClientStatus _status;
AsyncWebLock _lock;
#ifdef ESP32
mutable std::mutex _lock;
#endif
std::deque<AsyncWebSocketControl> _controlQueue;
std::deque<AsyncWebSocketMessage> _messageQueue;
bool closeWhenFull = true;
@@ -260,7 +260,9 @@ class AsyncWebSocket: public AsyncWebHandler {
AwsEventHandler _eventHandler{nullptr};
AwsHandshakeHandler _handshakeHandler;
bool _enabled;
AsyncWebLock _lock;
#ifdef ESP32
mutable std::mutex _lock;
#endif
public:
explicit AsyncWebSocket(const char* url) : _url(url) ,_cNextId(1), _enabled(true) {}

View File

@@ -1,134 +0,0 @@
#ifndef ASYNCWEBSYNCHRONIZATION_H_
#define ASYNCWEBSYNCHRONIZATION_H_
// Synchronisation is only available on ESP32, as the ESP8266 isn't using FreeRTOS by default
#include <ESPAsyncWebServer.h>
#ifdef ESP32
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not
// always available. According to example by Arjan Filius, changed name,
// added unimplemented version for ESP8266
class AsyncPlainLock
{
private:
SemaphoreHandle_t _lock;
public:
AsyncPlainLock() {
_lock = xSemaphoreCreateBinary();
// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
xSemaphoreGive(_lock);
}
~AsyncPlainLock() {
vSemaphoreDelete(_lock);
}
bool lock() const {
xSemaphoreTake(_lock, portMAX_DELAY);
return true;
}
void unlock() const {
xSemaphoreGive(_lock);
}
};
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
class AsyncWebLock
{
private:
SemaphoreHandle_t _lock;
mutable TaskHandle_t _lockedBy{};
public:
AsyncWebLock()
{
_lock = xSemaphoreCreateBinary();
// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
_lockedBy = NULL;
xSemaphoreGive(_lock);
}
~AsyncWebLock() {
vSemaphoreDelete(_lock);
}
bool lock() const {
const auto currentTask = xTaskGetCurrentTaskHandle();
if (_lockedBy != currentTask) {
xSemaphoreTake(_lock, portMAX_DELAY);
_lockedBy = currentTask;
return true;
}
return false;
}
void unlock() const {
_lockedBy = NULL;
xSemaphoreGive(_lock);
}
};
#else
// This is the 8266 version of the Sync Lock which is currently unimplemented
class AsyncWebLock
{
public:
AsyncWebLock() {
}
~AsyncWebLock() {
}
bool lock() const {
return false;
}
void unlock() const {
}
};
// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above.
using AsyncPlainLock = AsyncWebLock;
#endif
class AsyncWebLockGuard
{
private:
const AsyncWebLock *_lock;
public:
AsyncWebLockGuard(const AsyncWebLock &l) {
if (l.lock()) {
_lock = &l;
} else {
_lock = NULL;
}
}
~AsyncWebLockGuard() {
if (_lock) {
_lock->unlock();
}
}
void unlock() {
if (_lock) {
_lock->unlock();
_lock = NULL;
}
}
};
#endif // ASYNCWEBSYNCHRONIZATION_H_