From 8feb6730a80ca313986d50a82f1c79a70f0230bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Villac=C3=ADs=20Lasso?= Date: Tue, 29 Dec 2020 18:49:23 -0500 Subject: [PATCH] _controlQueue is now a std::deque Based on commit f5e439c8f86609c3be855b76844352322e366b80 of dumbfixes branch of 0xFEEDC0DE64 fork of ESPAsyncWebServer. Unlike the original patch, clearing the std::queue using assignment of {} is not supported in C++11, and no other method of clearing the queue is available, so this forces use of std::deque instead. This in turn forces all methods of std::queue to be replaced with their underlying equivalents in std::deque. Control messages are constructed directly inside the container using .emplace_back() (.emplace() in the original patch) instead of building the message and passing its pointer as a parameter. --- src/AsyncWebSocket.cpp | 50 +++++++++++++++++++++--------------------- src/AsyncWebSocket.h | 7 +++--- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index c1c7900..35a4003 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -522,8 +522,7 @@ AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() { const size_t AWSC_PING_PAYLOAD_LEN = 22; AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server) - : _controlQueue(LinkedList([](AsyncWebSocketControl *c){ delete c; })) - , _messageQueue(LinkedList([](AsyncWebSocketMessage *m){ delete m; })) + : _messageQueue(LinkedList([](AsyncWebSocketMessage *m){ delete m; })) , _tempObject(NULL) { _client = request->client(); @@ -548,7 +547,7 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async AsyncWebSocketClient::~AsyncWebSocketClient(){ // Serial.printf("%u FREE Q\n", id()); _messageQueue.free(); - _controlQueue.free(); + _controlQueue.clear(); _server->_cleanBuffers(); _server->_handleEvent(this, WS_EVT_DISCONNECT, NULL, NULL, 0); } @@ -562,17 +561,17 @@ void AsyncWebSocketClient::_clearQueue(){ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){ // Serial.printf("%u onAck\n", id()); _lastMessageTime = millis(); - if(!_controlQueue.isEmpty()){ - auto head = _controlQueue.front(); - if(head->finished()){ - len -= head->len(); - if(_status == WS_DISCONNECTING && head->opcode() == WS_DISCONNECT){ - _controlQueue.remove(head); + if(!_controlQueue.empty()){ + auto &head = _controlQueue.front(); + if(head.finished()){ + len -= head.len(); + if(_status == WS_DISCONNECTING && head.opcode() == WS_DISCONNECT){ + _controlQueue.pop_front(); _status = WS_DISCONNECTED; _client->close(true); return; } - _controlQueue.remove(head); + _controlQueue.pop_front(); } } @@ -588,10 +587,9 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){ } void AsyncWebSocketClient::_onPoll(){ - if(_client->canSend() && (!_controlQueue.isEmpty() || !_messageQueue.isEmpty())){ - // Serial.println("RUN 2"); + if(_client->canSend() && (!_controlQueue.empty() || !_messageQueue.isEmpty())){ _runQueue(); - } else if(_keepAlivePeriod > 0 && _controlQueue.isEmpty() && _messageQueue.isEmpty() && (millis() - _lastMessageTime) >= _keepAlivePeriod){ + } else if(_keepAlivePeriod > 0 && _controlQueue.empty() && _messageQueue.isEmpty() && (millis() - _lastMessageTime) >= _keepAlivePeriod){ ping((uint8_t *)AWSC_PING_PAYLOAD, AWSC_PING_PAYLOAD_LEN); } } @@ -602,9 +600,9 @@ void AsyncWebSocketClient::_runQueue(){ //size_t m0 = _messageQueue.isEmpty()? 0 : _messageQueue.length(); //size_t m1 = _messageQueue.isEmpty()? 0 : _messageQueue.front()->betweenFrames(); // Serial.printf("%u R C = %u %u\n", _clientId, m0, m1); - if(!_controlQueue.isEmpty() && (_messageQueue.isEmpty() || _messageQueue.front()->betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front()->len() - 1)){ + if(!_controlQueue.empty() && (_messageQueue.isEmpty() || _messageQueue.front()->betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)){ // Serial.printf("%u R S C\n", _clientId); - _controlQueue.front()->send(_client); + _controlQueue.front().send(_client); } else if(!_messageQueue.isEmpty() && _messageQueue.front()->betweenFrames() && webSocketSendFrameWindow(_client)){ // Serial.printf("%u R S M = ", _clientId); _messageQueue.front()->send(_client); @@ -617,6 +615,10 @@ bool AsyncWebSocketClient::queueIsFull() const { return (_messageQueue.length() >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED); } +size_t AsyncWebSocketClient::queueLen() const { + return _messageQueue.length() + _controlQueue.size(); +} + void AsyncWebSocketClient::_queueMessage(AsyncWebSocketMessage *dataMessage){ if(dataMessage == NULL){ // Serial.printf("%u Q1\n", _clientId); @@ -642,11 +644,9 @@ void AsyncWebSocketClient::_queueMessage(AsyncWebSocketMessage *dataMessage){ } } -void AsyncWebSocketClient::_queueControl(AsyncWebSocketControl *controlMessage){ - if(controlMessage == NULL) - return; - _controlQueue.add(controlMessage); - if(_client->canSend()) { +void AsyncWebSocketClient::_queueControl(uint8_t opcode, uint8_t *data, size_t len, bool mask){ + _controlQueue.emplace_back(opcode, data, len, mask); + if (_client->canSend()) { // Serial.println("RUN 4"); _runQueue(); } @@ -669,17 +669,17 @@ void AsyncWebSocketClient::close(uint16_t code, const char * message){ if(message != NULL){ memcpy(buf+2, message, packetLen -2); } - _queueControl(new AsyncWebSocketControl(WS_DISCONNECT,(uint8_t*)buf,packetLen)); + _queueControl(WS_DISCONNECT, (uint8_t*)buf, packetLen); free(buf); return; } } - _queueControl(new AsyncWebSocketControl(WS_DISCONNECT)); + _queueControl(WS_DISCONNECT); } void AsyncWebSocketClient::ping(uint8_t *data, size_t len){ if(_status == WS_CONNECTED) - _queueControl(new AsyncWebSocketControl(WS_PING, data, len)); + _queueControl(WS_PING, data, len); } void AsyncWebSocketClient::_onError(int8_t){ @@ -765,10 +765,10 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen){ } else { _status = WS_DISCONNECTING; _client->ackLater(); - _queueControl(new AsyncWebSocketControl(WS_DISCONNECT, data, datalen)); + _queueControl(WS_DISCONNECT, data, datalen); } } else if(_pinfo.opcode == WS_PING){ - _queueControl(new AsyncWebSocketControl(WS_PONG, data, datalen)); + _queueControl(WS_PONG, data, datalen); } else if(_pinfo.opcode == WS_PONG){ if(datalen != AWSC_PING_PAYLOAD_LEN || memcmp(AWSC_PING_PAYLOAD, data, AWSC_PING_PAYLOAD_LEN) != 0) _server->_handleEvent(this, WS_EVT_PONG, NULL, data, datalen); diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index 414c5ff..d23b144 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -34,6 +34,7 @@ #include "AsyncWebSynchronization.h" #include +#include #ifdef ESP8266 #include @@ -163,7 +164,7 @@ class AsyncWebSocketClient { uint32_t _clientId; AwsClientStatus _status; - LinkedList _controlQueue; + std::deque _controlQueue; LinkedList _messageQueue; uint8_t _pstate; @@ -173,7 +174,7 @@ class AsyncWebSocketClient { uint32_t _keepAlivePeriod; void _queueMessage(AsyncWebSocketMessage *dataMessage); - void _queueControl(AsyncWebSocketControl *controlMessage); + void _queueControl(uint8_t opcode, uint8_t *data=NULL, size_t len=0, bool mask=false); void _runQueue(); void _clearQueue(); @@ -210,7 +211,7 @@ class AsyncWebSocketClient { //data packets void message(AsyncWebSocketMessage *message){ _queueMessage(message); } bool queueIsFull() const; - size_t queueLen() { return _messageQueue.length() + _controlQueue.length(); } + size_t queueLen() const; size_t printf(const char *format, ...) __attribute__ ((format (printf, 2, 3))); #ifndef ESP32