forked from me-no-dev/ESPAsyncWebServer
Fixed mutex dead lock
This commit is contained in:
@ -225,6 +225,8 @@ AsyncWebSocketMessageBuffer::~AsyncWebSocketMessageBuffer()
|
|||||||
|
|
||||||
bool AsyncWebSocketMessageBuffer::reserve(size_t size)
|
bool AsyncWebSocketMessageBuffer::reserve(size_t size)
|
||||||
{
|
{
|
||||||
|
Serial.printf("AsyncWebSocketMessageBuffer::reserve() this=0x%llx\r\n", uint64_t(this));
|
||||||
|
|
||||||
_len = size;
|
_len = size;
|
||||||
|
|
||||||
if (_data) {
|
if (_data) {
|
||||||
@ -482,43 +484,49 @@ AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() {
|
|||||||
const size_t AWSC_PING_PAYLOAD_LEN = 22;
|
const size_t AWSC_PING_PAYLOAD_LEN = 22;
|
||||||
|
|
||||||
AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server)
|
AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server)
|
||||||
: _tempObject(NULL)
|
: _lock{"AsyncWebSocketClient"}
|
||||||
|
, _tempObject(NULL)
|
||||||
{
|
{
|
||||||
_client = request->client();
|
Serial.printf("AsyncWebSocketClient::AsyncWebSocketClient this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
_server = server;
|
|
||||||
_clientId = _server->_getNextId();
|
_client = request->client();
|
||||||
_status = WS_CONNECTED;
|
_server = server;
|
||||||
_pstate = 0;
|
_clientId = _server->_getNextId();
|
||||||
_lastMessageTime = millis();
|
_status = WS_CONNECTED;
|
||||||
_keepAlivePeriod = 0;
|
_pstate = 0;
|
||||||
_client->setRxTimeout(0);
|
_lastMessageTime = millis();
|
||||||
_client->onError([](void *r, AsyncClient* c, int8_t error){ (void)c; ((AsyncWebSocketClient*)(r))->_onError(error); }, this);
|
_keepAlivePeriod = 0;
|
||||||
_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncWebSocketClient*)(r))->_onAck(len, time); }, this);
|
_client->setRxTimeout(0);
|
||||||
_client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncWebSocketClient*)(r))->_onDisconnect(); delete c; }, this);
|
_client->onError([](void *r, AsyncClient* c, int8_t error){ (void)c; ((AsyncWebSocketClient*)(r))->_onError(error); }, this);
|
||||||
_client->onTimeout([](void *r, AsyncClient* c, uint32_t time){ (void)c; ((AsyncWebSocketClient*)(r))->_onTimeout(time); }, this);
|
_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncWebSocketClient*)(r))->_onAck(len, time); }, this);
|
||||||
_client->onData([](void *r, AsyncClient* c, void *buf, size_t len){ (void)c; ((AsyncWebSocketClient*)(r))->_onData(buf, len); }, this);
|
_client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncWebSocketClient*)(r))->_onDisconnect(); delete c; }, this);
|
||||||
_client->onPoll([](void *r, AsyncClient* c){ (void)c; ((AsyncWebSocketClient*)(r))->_onPoll(); }, this);
|
_client->onTimeout([](void *r, AsyncClient* c, uint32_t time){ (void)c; ((AsyncWebSocketClient*)(r))->_onTimeout(time); }, this);
|
||||||
//_server->_addClient(this);
|
_client->onData([](void *r, AsyncClient* c, void *buf, size_t len){ (void)c; ((AsyncWebSocketClient*)(r))->_onData(buf, len); }, this);
|
||||||
_server->_handleEvent(this, WS_EVT_CONNECT, request, NULL, 0);
|
_client->onPoll([](void *r, AsyncClient* c){ (void)c; ((AsyncWebSocketClient*)(r))->_onPoll(); }, this);
|
||||||
delete request;
|
//_server->_addClient(this);
|
||||||
|
_server->_handleEvent(this, WS_EVT_CONNECT, request, NULL, 0);
|
||||||
|
delete request;
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncWebSocketClient::~AsyncWebSocketClient(){
|
AsyncWebSocketClient::~AsyncWebSocketClient()
|
||||||
Serial.printf("AsyncWebSocketClient::~AsyncWebSocketClient task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
{
|
||||||
|
Serial.printf("AsyncWebSocketClient::~AsyncWebSocketClient this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
AsyncWebLockGuard l(_lock);
|
{
|
||||||
_messageQueue = {};
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::~AsyncWebSocketClient()");
|
||||||
_controlQueue = {};
|
_messageQueue = {};
|
||||||
_server->_handleEvent(this, WS_EVT_DISCONNECT, NULL, NULL, 0);
|
_controlQueue = {};
|
||||||
|
}
|
||||||
|
_server->_handleEvent(this, WS_EVT_DISCONNECT, NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){
|
void AsyncWebSocketClient::_onAck(size_t len, uint32_t time)
|
||||||
Serial.printf("AsyncWebSocketClient::_onAck task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
{
|
||||||
|
Serial.printf("AsyncWebSocketClient::_onAck this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
_lastMessageTime = millis();
|
_lastMessageTime = millis();
|
||||||
|
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::_onAck()");
|
||||||
|
|
||||||
if (!_controlQueue.empty()) {
|
if (!_controlQueue.empty()) {
|
||||||
auto &head = _controlQueue.front();
|
auto &head = _controlQueue.front();
|
||||||
@ -545,14 +553,14 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){
|
|||||||
|
|
||||||
void AsyncWebSocketClient::_onPoll()
|
void AsyncWebSocketClient::_onPoll()
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocketClient::_onPoll task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocketClient::_onPoll this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
AsyncWebLockGuard l(_lock);
|
if(_client->canSend() && [this](){ AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::_onPoll(1)"); return !_controlQueue.empty() || !_messageQueue.empty(); }())
|
||||||
if(_client->canSend() && (!_controlQueue.empty() || !_messageQueue.empty()))
|
|
||||||
{
|
{
|
||||||
_runQueue();
|
_runQueue();
|
||||||
}
|
}
|
||||||
else if(_keepAlivePeriod > 0 && _controlQueue.empty() && _messageQueue.empty() && (millis() - _lastMessageTime) >= _keepAlivePeriod)
|
else if(_keepAlivePeriod > 0 && (millis() - _lastMessageTime) >= _keepAlivePeriod &&
|
||||||
|
[this](){ AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::_onPoll(1)"); return _controlQueue.empty() && _messageQueue.empty(); }())
|
||||||
{
|
{
|
||||||
ping((uint8_t *)AWSC_PING_PAYLOAD, AWSC_PING_PAYLOAD_LEN);
|
ping((uint8_t *)AWSC_PING_PAYLOAD, AWSC_PING_PAYLOAD_LEN);
|
||||||
}
|
}
|
||||||
@ -560,32 +568,38 @@ void AsyncWebSocketClient::_onPoll()
|
|||||||
|
|
||||||
void AsyncWebSocketClient::_runQueue()
|
void AsyncWebSocketClient::_runQueue()
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocketClient::_runQueue task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocketClient::_runQueue this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
AsyncWebLockGuard l(_lock);
|
|
||||||
|
|
||||||
while (!_messageQueue.empty() && _messageQueue.front().get().finished())
|
|
||||||
{
|
{
|
||||||
_messageQueue.pop();
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::_runQueue()");
|
||||||
|
|
||||||
|
while (!_messageQueue.empty() && _messageQueue.front().get().finished())
|
||||||
|
{
|
||||||
|
_messageQueue.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().get().betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1))
|
||||||
|
{
|
||||||
|
l.unlock();
|
||||||
|
_controlQueue.front().send(_client);
|
||||||
|
}
|
||||||
|
else if (!_messageQueue.empty() && _messageQueue.front().get().betweenFrames() && webSocketSendFrameWindow(_client))
|
||||||
|
{
|
||||||
|
l.unlock();
|
||||||
|
_messageQueue.front().get().send(_client);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().get().betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1))
|
Serial.printf("AsyncWebSocketClient::_runQueue this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
{
|
|
||||||
_controlQueue.front().send(_client);
|
|
||||||
}
|
|
||||||
else if (!_messageQueue.empty() && _messageQueue.front().get().betweenFrames() && webSocketSendFrameWindow(_client))
|
|
||||||
{
|
|
||||||
_messageQueue.front().get().send(_client);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AsyncWebSocketClient::queueIsFull() const
|
bool AsyncWebSocketClient::queueIsFull() const
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocketClient::queueIsFull task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocketClient::queueIsFull this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
size_t size;
|
size_t size;
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::queueIsFull()");
|
||||||
size = _messageQueue.size();
|
size = _messageQueue.size();
|
||||||
}
|
}
|
||||||
return (size >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED);
|
return (size >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED);
|
||||||
@ -593,11 +607,11 @@ bool AsyncWebSocketClient::queueIsFull() const
|
|||||||
|
|
||||||
bool AsyncWebSocketClient::canSend() const
|
bool AsyncWebSocketClient::canSend() const
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocketClient::canSend task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocketClient::canSend this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
size_t size;
|
size_t size;
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::canSend()");
|
||||||
size = _messageQueue.size();
|
size = _messageQueue.size();
|
||||||
}
|
}
|
||||||
return size < WS_MAX_QUEUED_MESSAGES;
|
return size < WS_MAX_QUEUED_MESSAGES;
|
||||||
@ -605,10 +619,10 @@ bool AsyncWebSocketClient::canSend() const
|
|||||||
|
|
||||||
void AsyncWebSocketClient::_queueControl(uint8_t opcode, uint8_t *data, size_t len, bool mask)
|
void AsyncWebSocketClient::_queueControl(uint8_t opcode, uint8_t *data, size_t len, bool mask)
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocketClient::_queueControl task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocketClient::_queueControl this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::_queueControl");
|
||||||
_controlQueue.emplace(opcode, data, len, mask);
|
_controlQueue.emplace(opcode, data, len, mask);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -618,20 +632,17 @@ void AsyncWebSocketClient::_queueControl(uint8_t opcode, uint8_t *data, size_t l
|
|||||||
|
|
||||||
void AsyncWebSocketClient::_queueMessage(const char *data, size_t len, uint8_t opcode, bool mask)
|
void AsyncWebSocketClient::_queueMessage(const char *data, size_t len, uint8_t opcode, bool mask)
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocketClient::_queueMessage task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocketClient::_queueMessage this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
if(_status != WS_CONNECTED)
|
if(_status != WS_CONNECTED)
|
||||||
{
|
|
||||||
PolymorphMessageContainer{data, len, opcode, mask};
|
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::_queueMessage");
|
||||||
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES)
|
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES)
|
||||||
{
|
{
|
||||||
|
l.unlock();
|
||||||
ets_printf("ERROR: Too many messages queued\n");
|
ets_printf("ERROR: Too many messages queued\n");
|
||||||
PolymorphMessageContainer{data, len, opcode, mask};
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -645,20 +656,17 @@ void AsyncWebSocketClient::_queueMessage(const char *data, size_t len, uint8_t o
|
|||||||
|
|
||||||
void AsyncWebSocketClient::_queueMessage(AsyncWebSocketMessageBuffer *buffer, uint8_t opcode, bool mask)
|
void AsyncWebSocketClient::_queueMessage(AsyncWebSocketMessageBuffer *buffer, uint8_t opcode, bool mask)
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocketClient::_queueMessage task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocketClient::_queueMessage this=0x%llx task=0x%llx %s\r\n", uint64_t(this), uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
if(_status != WS_CONNECTED)
|
if(_status != WS_CONNECTED)
|
||||||
{
|
|
||||||
PolymorphMessageContainer{buffer, opcode, mask};
|
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocketClient::_queueMessage");
|
||||||
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES)
|
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES)
|
||||||
{
|
{
|
||||||
|
l.unlock();
|
||||||
ets_printf("ERROR: Too many messages queued\n");
|
ets_printf("ERROR: Too many messages queued\n");
|
||||||
PolymorphMessageContainer{buffer, opcode, mask};
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -956,6 +964,7 @@ AsyncWebSocket::AsyncWebSocket(const String& url)
|
|||||||
:_url(url)
|
:_url(url)
|
||||||
,_cNextId(1)
|
,_cNextId(1)
|
||||||
,_enabled(true)
|
,_enabled(true)
|
||||||
|
,_lock{"AsyncWebSocket"}
|
||||||
{
|
{
|
||||||
_eventHandler = NULL;
|
_eventHandler = NULL;
|
||||||
}
|
}
|
||||||
@ -1052,16 +1061,20 @@ void AsyncWebSocket::text(uint32_t id, const char * message, size_t len){
|
|||||||
c->text(message, len);
|
c->text(message, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer * buffer){
|
void AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer *buffer)
|
||||||
if (!buffer) return;
|
{
|
||||||
buffer->lock();
|
if (!buffer)
|
||||||
for(auto &c : _clients){
|
return;
|
||||||
if (c.status() == WS_CONNECTED){
|
|
||||||
c.text(buffer);
|
buffer->lock();
|
||||||
}
|
|
||||||
}
|
for(auto &c : _clients)
|
||||||
buffer->unlock();
|
if (c.status() == WS_CONNECTED)
|
||||||
_cleanBuffers();
|
c.text(buffer);
|
||||||
|
|
||||||
|
buffer->unlock();
|
||||||
|
|
||||||
|
_cleanBuffers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1088,10 +1101,9 @@ void AsyncWebSocket::binaryAll(AsyncWebSocketMessageBuffer *buffer)
|
|||||||
|
|
||||||
buffer->lock();
|
buffer->lock();
|
||||||
|
|
||||||
for (auto &c : _clients) {
|
for (auto &c : _clients)
|
||||||
if (c.status() == WS_CONNECTED)
|
if (c.status() == WS_CONNECTED)
|
||||||
c.binary(buffer);
|
c.binary(buffer);
|
||||||
}
|
|
||||||
|
|
||||||
buffer->unlock();
|
buffer->unlock();
|
||||||
|
|
||||||
@ -1315,14 +1327,14 @@ void AsyncWebSocket::handleRequest(AsyncWebServerRequest *request){
|
|||||||
request->send(response);
|
request->send(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(size_t size)
|
AsyncWebSocketMessageBuffer *AsyncWebSocket::makeBuffer(size_t size)
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocket::makeBuffer task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocket::makeBuffer task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
AsyncWebSocketMessageBuffer *buffer{};
|
AsyncWebSocketMessageBuffer *buffer{};
|
||||||
|
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocket::makeBuffer");
|
||||||
_buffers.emplace_back(size);
|
_buffers.emplace_back(size);
|
||||||
buffer = &_buffers.back();
|
buffer = &_buffers.back();
|
||||||
}
|
}
|
||||||
@ -1330,14 +1342,14 @@ AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(size_t size)
|
|||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(uint8_t * data, size_t size)
|
AsyncWebSocketMessageBuffer *AsyncWebSocket::makeBuffer(uint8_t * data, size_t size)
|
||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocket::makeBuffer task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocket::makeBuffer task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
AsyncWebSocketMessageBuffer *buffer{};
|
AsyncWebSocketMessageBuffer *buffer{};
|
||||||
|
|
||||||
{
|
{
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocket::makeBuffer");
|
||||||
_buffers.emplace_back(data, size);
|
_buffers.emplace_back(data, size);
|
||||||
buffer = &_buffers.back();
|
buffer = &_buffers.back();
|
||||||
}
|
}
|
||||||
@ -1349,7 +1361,7 @@ void AsyncWebSocket::_cleanBuffers()
|
|||||||
{
|
{
|
||||||
Serial.printf("AsyncWebSocket::_cleanBuffers task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
Serial.printf("AsyncWebSocket::_cleanBuffers task=0x%llx %s\r\n", uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
|
|
||||||
AsyncWebLockGuard l(_lock);
|
AsyncWebLockGuard l(_lock, "AsyncWebSocket::_cleanBuffers()");
|
||||||
|
|
||||||
for (auto iter = std::begin(_buffers); iter != std::end(_buffers);)
|
for (auto iter = std::begin(_buffers); iter != std::end(_buffers);)
|
||||||
{
|
{
|
||||||
|
@ -88,7 +88,7 @@ class AsyncWebSocketMessageBuffer {
|
|||||||
uint8_t * _data;
|
uint8_t * _data;
|
||||||
size_t _len;
|
size_t _len;
|
||||||
bool _lock;
|
bool _lock;
|
||||||
uint32_t _count;
|
uint32_t _count;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AsyncWebSocketMessageBuffer();
|
AsyncWebSocketMessageBuffer();
|
||||||
@ -97,15 +97,15 @@ class AsyncWebSocketMessageBuffer {
|
|||||||
AsyncWebSocketMessageBuffer(const AsyncWebSocketMessageBuffer &);
|
AsyncWebSocketMessageBuffer(const AsyncWebSocketMessageBuffer &);
|
||||||
AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBuffer &&);
|
AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBuffer &&);
|
||||||
~AsyncWebSocketMessageBuffer();
|
~AsyncWebSocketMessageBuffer();
|
||||||
void operator ++(int i) { (void)i; _count++; }
|
void operator ++(int i) { Serial.printf("AsyncWebSocketMessageBuffer::operator++() this=0x%llx\r\n", uint64_t(this)); (void)i; _count++; }
|
||||||
void operator --(int i) { (void)i; if (_count > 0) { _count--; } ; }
|
void operator --(int i) { Serial.printf("AsyncWebSocketMessageBuffer::operator--() this=0x%llx\r\n", uint64_t(this)); (void)i; if (_count > 0) { _count--; } ; }
|
||||||
bool reserve(size_t size);
|
bool reserve(size_t size);
|
||||||
void lock() { _lock = true; }
|
void lock() { Serial.printf("AsyncWebSocketMessageBuffer::lock() this=0x%llx\r\n", uint64_t(this)); _lock = true; }
|
||||||
void unlock() { _lock = false; }
|
void unlock() { Serial.printf("AsyncWebSocketMessageBuffer::unlock() this=0x%llx\r\n", uint64_t(this)); _lock = false; }
|
||||||
uint8_t * get() { return _data; }
|
uint8_t *get() { Serial.printf("AsyncWebSocketMessageBuffer::get() this=0x%llx\r\n", uint64_t(this)); return _data; }
|
||||||
size_t length() { return _len; }
|
size_t length() { Serial.printf("AsyncWebSocketMessageBuffer::length() this=0x%llx\r\n", uint64_t(this)); return _len; }
|
||||||
uint32_t count() { return _count; }
|
uint32_t count() { Serial.printf("AsyncWebSocketMessageBuffer::count() this=0x%llx\r\n", uint64_t(this)); return _count; }
|
||||||
bool canDelete() { return (!_count && !_lock); }
|
bool canDelete() { Serial.printf("AsyncWebSocketMessageBuffer::canDelete() this=0x%llx\r\n", uint64_t(this)); return (!_count && !_lock); }
|
||||||
|
|
||||||
friend AsyncWebSocket;
|
friend AsyncWebSocket;
|
||||||
|
|
||||||
@ -148,7 +148,8 @@ class AsyncWebSocketMultiMessage: public AsyncWebSocketMessage {
|
|||||||
size_t _sent;
|
size_t _sent;
|
||||||
size_t _ack;
|
size_t _ack;
|
||||||
size_t _acked;
|
size_t _acked;
|
||||||
AsyncWebSocketMessageBuffer * _WSbuffer;
|
AsyncWebSocketMessageBuffer *_WSbuffer;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuffer * buffer, uint8_t opcode=WS_TEXT, bool mask=false);
|
AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuffer * buffer, uint8_t opcode=WS_TEXT, bool mask=false);
|
||||||
virtual ~AsyncWebSocketMultiMessage() override;
|
virtual ~AsyncWebSocketMultiMessage() override;
|
||||||
@ -348,7 +349,7 @@ class AsyncWebSocket: public AsyncWebHandler {
|
|||||||
void textAll(char * message);
|
void textAll(char * message);
|
||||||
void textAll(const String &message);
|
void textAll(const String &message);
|
||||||
void textAll(const __FlashStringHelper *message); // need to convert
|
void textAll(const __FlashStringHelper *message); // need to convert
|
||||||
void textAll(AsyncWebSocketMessageBuffer * buffer);
|
void textAll(AsyncWebSocketMessageBuffer *buffer);
|
||||||
|
|
||||||
void binary(uint32_t id, const char * message, size_t len);
|
void binary(uint32_t id, const char * message, size_t len);
|
||||||
void binary(uint32_t id, const char * message);
|
void binary(uint32_t id, const char * message);
|
||||||
@ -363,7 +364,7 @@ class AsyncWebSocket: public AsyncWebHandler {
|
|||||||
void binaryAll(char * message);
|
void binaryAll(char * message);
|
||||||
void binaryAll(const String &message);
|
void binaryAll(const String &message);
|
||||||
void binaryAll(const __FlashStringHelper *message, size_t len);
|
void binaryAll(const __FlashStringHelper *message, size_t len);
|
||||||
void binaryAll(AsyncWebSocketMessageBuffer * buffer);
|
void binaryAll(AsyncWebSocketMessageBuffer *buffer);
|
||||||
|
|
||||||
void message(uint32_t id, const char *data, size_t len, uint8_t opcode=WS_TEXT, bool mask=false);
|
void message(uint32_t id, const char *data, size_t len, uint8_t opcode=WS_TEXT, bool mask=false);
|
||||||
void message(uint32_t id, AsyncWebSocketMessageBuffer *buffer, uint8_t opcode=WS_TEXT, bool mask=false);
|
void message(uint32_t id, AsyncWebSocketMessageBuffer *buffer, uint8_t opcode=WS_TEXT, bool mask=false);
|
||||||
|
@ -12,10 +12,14 @@ class AsyncWebLock
|
|||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
SemaphoreHandle_t _lock;
|
SemaphoreHandle_t _lock;
|
||||||
mutable void *_lockedBy;
|
mutable TaskHandle_t _lockedBy{};
|
||||||
|
mutable const char *_lastLockerName;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AsyncWebLock() {
|
const char * const lockName;
|
||||||
|
AsyncWebLock(const char *_lockName) :
|
||||||
|
lockName{_lockName}
|
||||||
|
{
|
||||||
_lock = xSemaphoreCreateBinary();
|
_lock = xSemaphoreCreateBinary();
|
||||||
_lockedBy = NULL;
|
_lockedBy = NULL;
|
||||||
xSemaphoreGive(_lock);
|
xSemaphoreGive(_lock);
|
||||||
@ -25,18 +29,29 @@ public:
|
|||||||
vSemaphoreDelete(_lock);
|
vSemaphoreDelete(_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool lock() const {
|
bool lock(const char *lockerName) const {
|
||||||
extern void *pxCurrentTCB;
|
const auto currentTask = xTaskGetCurrentTaskHandle();
|
||||||
if (_lockedBy != pxCurrentTCB) {
|
if (_lockedBy != currentTask) {
|
||||||
xSemaphoreTake(_lock, portMAX_DELAY);
|
while (true)
|
||||||
_lockedBy = pxCurrentTCB;
|
{
|
||||||
|
Serial.printf("AsyncWebLock::lock this=0x%llx name=%s locker=%s task=0x%llx %s\r\n", uint64_t(this), lockName, lockerName, uint64_t(currentTask), pcTaskGetTaskName(currentTask));
|
||||||
|
|
||||||
|
if (xSemaphoreTake(_lock, 200 / portTICK_PERIOD_MS))
|
||||||
|
break;
|
||||||
|
else
|
||||||
|
Serial.printf("AsyncWebLock::lock FAILED this=0x%llx name=%s locker=%s task=0x%llx %s lastLockedBy=%s\r\n", uint64_t(this), lockName, lockerName, uint64_t(currentTask), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()), _lastLockerName);
|
||||||
|
}
|
||||||
|
_lockedBy = currentTask;
|
||||||
|
_lastLockerName = lockerName;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void unlock() const {
|
void unlock(const char *lockerName) const {
|
||||||
|
Serial.printf("AsyncWebLock::unlock this=0x%llx name=%s locker=%s task=0x%llx %s\r\n", uint64_t(this), lockName, lockerName, uint64_t(xTaskGetCurrentTaskHandle()), pcTaskGetTaskName(xTaskGetCurrentTaskHandle()));
|
||||||
_lockedBy = NULL;
|
_lockedBy = NULL;
|
||||||
|
_lastLockerName = NULL;
|
||||||
xSemaphoreGive(_lock);
|
xSemaphoreGive(_lock);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -69,8 +84,12 @@ private:
|
|||||||
const AsyncWebLock *_lock;
|
const AsyncWebLock *_lock;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AsyncWebLockGuard(const AsyncWebLock &l) {
|
const char * const lockerName;
|
||||||
if (l.lock()) {
|
|
||||||
|
AsyncWebLockGuard(const AsyncWebLock &l, const char *_lockerName) :
|
||||||
|
lockerName{_lockerName}
|
||||||
|
{
|
||||||
|
if (l.lock(lockerName)) {
|
||||||
_lock = &l;
|
_lock = &l;
|
||||||
} else {
|
} else {
|
||||||
_lock = NULL;
|
_lock = NULL;
|
||||||
@ -79,9 +98,16 @@ public:
|
|||||||
|
|
||||||
~AsyncWebLockGuard() {
|
~AsyncWebLockGuard() {
|
||||||
if (_lock) {
|
if (_lock) {
|
||||||
_lock->unlock();
|
_lock->unlock(lockerName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void unlock() {
|
||||||
|
if (_lock) {
|
||||||
|
_lock->unlock(lockerName);
|
||||||
|
_lock = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif // ASYNCWEBSYNCHRONIZATION_H_
|
#endif // ASYNCWEBSYNCHRONIZATION_H_
|
||||||
|
@ -171,7 +171,7 @@ class AsyncWebServerRequest {
|
|||||||
|
|
||||||
std::list<AsyncWebHeader> _headers;
|
std::list<AsyncWebHeader> _headers;
|
||||||
LinkedList<AsyncWebParameter *> _params;
|
LinkedList<AsyncWebParameter *> _params;
|
||||||
LinkedList<String *> _pathParams;
|
std::vector<String> _pathParams;
|
||||||
|
|
||||||
uint8_t _multiParseState;
|
uint8_t _multiParseState;
|
||||||
uint8_t _boundaryPosition;
|
uint8_t _boundaryPosition;
|
||||||
|
@ -54,7 +54,6 @@ AsyncWebServerRequest::AsyncWebServerRequest(AsyncWebServer* s, AsyncClient* c)
|
|||||||
, _contentLength(0)
|
, _contentLength(0)
|
||||||
, _parsedLength(0)
|
, _parsedLength(0)
|
||||||
, _params(LinkedList<AsyncWebParameter *>([](AsyncWebParameter *p){ delete p; }))
|
, _params(LinkedList<AsyncWebParameter *>([](AsyncWebParameter *p){ delete p; }))
|
||||||
, _pathParams(LinkedList<String *>([](String *p){ delete p; }))
|
|
||||||
, _multiParseState(0)
|
, _multiParseState(0)
|
||||||
, _boundaryPosition(0)
|
, _boundaryPosition(0)
|
||||||
, _itemStartIndex(0)
|
, _itemStartIndex(0)
|
||||||
@ -80,7 +79,7 @@ AsyncWebServerRequest::~AsyncWebServerRequest(){
|
|||||||
_headers.clear();
|
_headers.clear();
|
||||||
|
|
||||||
_params.free();
|
_params.free();
|
||||||
_pathParams.free();
|
_pathParams.clear();
|
||||||
|
|
||||||
_interestingHeaders.clear();
|
_interestingHeaders.clear();
|
||||||
|
|
||||||
@ -241,7 +240,7 @@ void AsyncWebServerRequest::_addParam(AsyncWebParameter *p){
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AsyncWebServerRequest::_addPathParam(const char *p){
|
void AsyncWebServerRequest::_addPathParam(const char *p){
|
||||||
_pathParams.add(new String(p));
|
_pathParams.emplace_back(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncWebServerRequest::_addGetParams(const String& params){
|
void AsyncWebServerRequest::_addGetParams(const String& params){
|
||||||
@ -957,9 +956,9 @@ const String& AsyncWebServerRequest::argName(size_t i) const {
|
|||||||
return getParam(i)->name();
|
return getParam(i)->name();
|
||||||
}
|
}
|
||||||
|
|
||||||
const String& AsyncWebServerRequest::pathArg(size_t i) const {
|
const String& AsyncWebServerRequest::pathArg(size_t i) const
|
||||||
auto param = _pathParams.nth(i);
|
{
|
||||||
return param ? **param : SharedEmptyString;
|
return i < _pathParams.size() ? _pathParams[i] : SharedEmptyString;
|
||||||
}
|
}
|
||||||
|
|
||||||
const String& AsyncWebServerRequest::header(const char* name) const {
|
const String& AsyncWebServerRequest::header(const char* name) const {
|
||||||
|
Reference in New Issue
Block a user