diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index 6b2f9e9..9ebab12 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -52,11 +52,15 @@ size_t webSocketSendFrameWindow(AsyncClient *client){ } size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool mask, uint8_t *data, size_t len){ - if(!client->canSend()) + if(!client->canSend()) { + // Serial.println("SF 1"); return 0; + } size_t space = client->space(); - if(space < 2) + if(space < 2) { + // Serial.println("SF 2"); return 0; + } uint8_t mbuf[4] = {0,0,0,0}; uint8_t headLen = 2; if(len && mask){ @@ -68,8 +72,10 @@ size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool } if(len > 125) headLen += 2; - if(space < headLen) + if(space < headLen) { + // Serial.println("SF 2"); return 0; + } space -= headLen; if(len > space) len = space; @@ -77,6 +83,7 @@ size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool uint8_t *buf = (uint8_t*)malloc(headLen); if(buf == NULL){ //os_printf("could not malloc %u bytes for frame header\n", headLen); + // Serial.println("SF 3"); return 0; } @@ -97,6 +104,7 @@ size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool if(client->add((const char *)buf, headLen) != headLen){ //os_printf("error adding %lu header bytes\n", headLen); free(buf); + // Serial.println("SF 4"); return 0; } free(buf); @@ -109,13 +117,16 @@ size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool } if(client->add((const char *)data, len) != len){ //os_printf("error adding %lu data bytes\n", len); + // Serial.println("SF 5"); return 0; } } if(!client->send()){ //os_printf("error sending frame: %lu\n", headLen+len); + // Serial.println("SF 6"); return 0; } + // Serial.println("SF"); return len; } @@ -149,6 +160,7 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(uint8_t * data, size_t _data = new uint8_t[_len + 1]; if (_data) { + // Serial.println("BUFF alloc"); memcpy(_data, data, _len); _data[_len] = 0; } @@ -164,6 +176,7 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(size_t size) _data = new uint8_t[_len + 1]; if (_data) { + // Serial.println("BUFF alloc"); _data[_len] = 0; } @@ -185,6 +198,7 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(const AsyncWebSocketMes } if (_data) { + // Serial.println("BUFF alloc"); memcpy(_data, copy._data, _len); _data[_len] = 0; } @@ -202,6 +216,7 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBu _count = 0; if (copy._data) { + // Serial.println("BUFF alloc"); _data = copy._data; copy._data = nullptr; } @@ -211,6 +226,7 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBu AsyncWebSocketMessageBuffer::~AsyncWebSocketMessageBuffer() { if (_data) { + // Serial.println("BUFF free"); delete[] _data; } } @@ -293,6 +309,7 @@ AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(const char * data, size_t _opcode = opcode & 0x07; _mask = mask; _data = (uint8_t*)malloc(_len+1); + // Serial.println("MSG alloc"); if(_data == NULL){ _len = 0; _status = WS_MSG_ERROR; @@ -316,35 +333,43 @@ AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(uint8_t opcode, bool mask AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() { - if(_data != NULL) + if(_data != NULL) { + // Serial.println("MSG free"); free(_data); + } } void AsyncWebSocketBasicMessage::ack(size_t len, uint32_t time) { (void)time; _acked += len; + // Serial.printf("ACK %u = %u | %u = %u\n", _sent, _len, _acked, _ack); if(_sent == _len && _acked == _ack){ + // Serial.println("ACK end"); _status = WS_MSG_SENT; } } size_t AsyncWebSocketBasicMessage::send(AsyncClient *client) { - if(_status != WS_MSG_SENDING) + if(_status != WS_MSG_SENDING){ + // Serial.println("MS 1"); return 0; + } if(_acked < _ack){ + // Serial.println("MS 2"); return 0; } if(_sent == _len){ - if(_acked == _ack) - _status = WS_MSG_SENT; + // Serial.println("MS 3"); + _status = WS_MSG_SENT; return 0; } if(_sent > _len){ + // Serial.println("MS 4"); _status = WS_MSG_ERROR; return 0; } - size_t toSend = _len - _sent; size_t window = webSocketSendFrameWindow(client); + // Serial.printf("Send %u %u %u\n", _len, _sent, toSend); if(window < toSend) { toSend = window; @@ -360,8 +385,14 @@ AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() { size_t sent = webSocketSendFrame(client, final, opCode, _mask, dPtr, toSend); _status = WS_MSG_SENDING; if(toSend && sent != toSend){ - _sent -= (toSend - sent); - _ack -= (toSend - sent); + size_t delta = (toSend - sent); + // Serial.printf("\ns:%u a:%u d:%u\n", _sent, _ack, delta); + _sent -= delta; + _ack -= delta + ((delta < 126)?2:4) + (_mask * 4); + // Serial.printf("s:%u a:%u\n", _sent, _ack); + if (!sent) { + _status = WS_MSG_ERROR; + } } return sent; } @@ -399,11 +430,13 @@ AsyncWebSocketMultiMessage::AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuff if (buffer) { _WSbuffer = buffer; (*_WSbuffer)++; + // Serial.printf("INC WSbuffer == %u\n", _WSbuffer->count()); _data = buffer->get(); _len = buffer->length(); _status = WS_MSG_SENDING; //ets_printf("M: %u\n", _len); } else { + // Serial.println("BUFF ERROR"); _status = WS_MSG_ERROR; } @@ -413,40 +446,48 @@ AsyncWebSocketMultiMessage::AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuff AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() { if (_WSbuffer) { (*_WSbuffer)--; // decreases the counter. + // Serial.printf("DEC WSbuffer == %u\n", _WSbuffer->count()); } } void AsyncWebSocketMultiMessage::ack(size_t len, uint32_t time) { (void)time; _acked += len; + // Serial.printf("ACK %u = %u | %u = %u\n", _sent, _len, _acked, _ack); if(_sent >= _len && _acked >= _ack){ + // Serial.println("ACK end"); _status = WS_MSG_SENT; } //ets_printf("A: %u\n", len); } size_t AsyncWebSocketMultiMessage::send(AsyncClient *client) { - if(_status != WS_MSG_SENDING) + if(_status != WS_MSG_SENDING) { + // Serial.println("MS 1"); return 0; + } if(_acked < _ack){ + // Serial.println("MS 2"); return 0; } if(_sent == _len){ + // Serial.println("MS 3"); _status = WS_MSG_SENT; return 0; } if(_sent > _len){ + // Serial.println("MS 4"); _status = WS_MSG_ERROR; //ets_printf("E: %u > %u\n", _sent, _len); return 0; } - size_t toSend = _len - _sent; size_t window = webSocketSendFrameWindow(client); + // Serial.printf("Send %u %u %u\n", _len, _sent, toSend); if(window < toSend) { toSend = window; } - + // Serial.printf("s:%u a:%u t:%u\n", _sent, _ack, toSend); _sent += toSend; _ack += toSend + ((toSend < 126)?2:4) + (_mask * 4); @@ -460,8 +501,14 @@ AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() { _status = WS_MSG_SENDING; if(toSend && sent != toSend){ //ets_printf("E: %u != %u\n", toSend, sent); - _sent -= (toSend - sent); - _ack -= (toSend - sent); + size_t delta = (toSend - sent); + // Serial.printf("\ns:%u a:%u d:%u\n", _sent, _ack, delta); + _sent -= delta; + _ack -= delta + ((delta < 126)?2:4) + (_mask * 4); + // Serial.printf("s:%u a:%u\n", _sent, _ack); + if (!sent) { + _status = WS_MSG_ERROR; + } } //ets_printf("S: %u %u\n", _sent, sent); return sent; @@ -496,16 +543,25 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async _server->_addClient(this); _server->_handleEvent(this, WS_EVT_CONNECT, request, NULL, 0); delete request; - memset(&_pinfo,0,sizeof(_pinfo)); // + memset(&_pinfo,0,sizeof(_pinfo)); } AsyncWebSocketClient::~AsyncWebSocketClient(){ + // Serial.printf("%u FREE Q\n", id()); _messageQueue.free(); _controlQueue.free(); + _server->_cleanBuffers(); _server->_handleEvent(this, WS_EVT_DISCONNECT, NULL, NULL, 0); } +void AsyncWebSocketClient::_clearQueue(){ + while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){ + _messageQueue.remove(_messageQueue.front()); + } +} + void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){ + // Serial.printf("%u onAck\n", id()); _lastMessageTime = millis(); if(!_controlQueue.isEmpty()){ auto head = _controlQueue.front(); @@ -520,15 +576,21 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){ _controlQueue.remove(head); } } + if(len && !_messageQueue.isEmpty()){ _messageQueue.front()->ack(len, time); } + + _clearQueue(); + _server->_cleanBuffers(); + // Serial.println("RUN 1"); _runQueue(); } void AsyncWebSocketClient::_onPoll(){ if(_client->canSend() && (!_controlQueue.isEmpty() || !_messageQueue.isEmpty())){ + // Serial.println("RUN 2"); _runQueue(); } else if(_keepAlivePeriod > 0 && _controlQueue.isEmpty() && _messageQueue.isEmpty() && (millis() - _lastMessageTime) >= _keepAlivePeriod){ ping((uint8_t *)AWSC_PING_PAYLOAD, AWSC_PING_PAYLOAD_LEN); @@ -536,15 +598,20 @@ void AsyncWebSocketClient::_onPoll(){ } void AsyncWebSocketClient::_runQueue(){ - while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){ - _messageQueue.remove(_messageQueue.front()); - } + _clearQueue(); + //size_t m0 = _messageQueue.isEmpty()? 0 : _messageQueue.length(); + //size_t m1 = _messageQueue.isEmpty()? 0 : _messageQueue.front()->betweenFrames(); + // Serial.printf("%u R C = %u %u\n", _clientId, m0, m1); if(!_controlQueue.isEmpty() && (_messageQueue.isEmpty() || _messageQueue.front()->betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front()->len() - 1)){ + // Serial.printf("%u R S C\n", _clientId); _controlQueue.front()->send(_client); } else if(!_messageQueue.isEmpty() && _messageQueue.front()->betweenFrames() && webSocketSendFrameWindow(_client)){ + // Serial.printf("%u R S M = ", _clientId); _messageQueue.front()->send(_client); } + + _clearQueue(); } bool AsyncWebSocketClient::queueIsFull(){ @@ -553,28 +620,38 @@ bool AsyncWebSocketClient::queueIsFull(){ } void AsyncWebSocketClient::_queueMessage(AsyncWebSocketMessage *dataMessage){ - if(dataMessage == NULL) + if(dataMessage == NULL){ + // Serial.printf("%u Q1\n", _clientId); return; + } if(_status != WS_CONNECTED){ + // Serial.printf("%u Q2\n", _clientId); delete dataMessage; return; } if(_messageQueue.length() >= WS_MAX_QUEUED_MESSAGES){ ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); + // Serial.printf("%u Q3\n", _clientId); delete dataMessage; } else { _messageQueue.add(dataMessage); + // Serial.printf("%u Q A %u\n", _clientId, _messageQueue.length()); } - if(_client->canSend()) + if(_client->canSend()) { + // Serial.printf("%u Q S\n", _clientId); + // Serial.println("RUN 3"); _runQueue(); + } } void AsyncWebSocketClient::_queueControl(AsyncWebSocketControl *controlMessage){ if(controlMessage == NULL) return; _controlQueue.add(controlMessage); - if(_client->canSend()) + if(_client->canSend()) { + // Serial.println("RUN 4"); _runQueue(); + } } void AsyncWebSocketClient::close(uint16_t code, const char * message){ @@ -607,19 +684,24 @@ void AsyncWebSocketClient::ping(uint8_t *data, size_t len){ _queueControl(new AsyncWebSocketControl(WS_PING, data, len)); } -void AsyncWebSocketClient::_onError(int8_t){} +void AsyncWebSocketClient::_onError(int8_t){ + //Serial.println("onErr"); +} void AsyncWebSocketClient::_onTimeout(uint32_t time){ + // Serial.println("onTime"); (void)time; _client->close(true); } void AsyncWebSocketClient::_onDisconnect(){ + // Serial.println("onDis"); _client = NULL; _server->_handleDisconnect(this); } void AsyncWebSocketClient::_onData(void *pbuf, size_t plen){ + // Serial.println("onData"); _lastMessageTime = millis(); uint8_t *data = (uint8_t*)pbuf; while(plen > 0){ @@ -959,6 +1041,7 @@ void AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer * buffer){ void AsyncWebSocket::textAll(const char * message, size_t len){ + //if (_buffers.length()) return; AsyncWebSocketMessageBuffer * WSBuffer = makeBuffer((uint8_t *)message, len); textAll(WSBuffer); } @@ -1231,6 +1314,7 @@ AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(uint8_t * data, size_t if (buffer) { AsyncWebLockGuard l(_lock); + // Serial.printf("Add to global buffers = %u\n", _buffers.length() + 1); _buffers.add(buffer); } @@ -1240,9 +1324,9 @@ AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(uint8_t * data, size_t void AsyncWebSocket::_cleanBuffers() { AsyncWebLockGuard l(_lock); - for(AsyncWebSocketMessageBuffer * c: _buffers){ if(c && c->canDelete()){ + // Serial.printf("Remove from global buffers = %u\n", _buffers.length() - 1); _buffers.remove(c); } } diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index f06af2c..5ebf1cc 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -84,16 +84,16 @@ class AsyncWebSocketMessageBuffer { private: uint8_t * _data; size_t _len; - bool _lock; - uint32_t _count; + bool _lock; + uint32_t _count; public: AsyncWebSocketMessageBuffer(); AsyncWebSocketMessageBuffer(size_t size); - AsyncWebSocketMessageBuffer(uint8_t * data, size_t size); - AsyncWebSocketMessageBuffer(const AsyncWebSocketMessageBuffer &); - AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBuffer &&); - ~AsyncWebSocketMessageBuffer(); + AsyncWebSocketMessageBuffer(uint8_t * data, size_t size); + AsyncWebSocketMessageBuffer(const AsyncWebSocketMessageBuffer &); + AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBuffer &&); + ~AsyncWebSocketMessageBuffer(); void operator ++(int i) { (void)i; _count++; } void operator --(int i) { (void)i; if (_count > 0) { _count--; } ; } bool reserve(size_t size); @@ -102,9 +102,9 @@ class AsyncWebSocketMessageBuffer { uint8_t * get() { return _data; } size_t length() { return _len; } uint32_t count() { return _count; } - bool canDelete() { return (!_count && !_lock); } + bool canDelete() { return (!_count && !_lock); } - friend AsyncWebSocket; + friend AsyncWebSocket; }; @@ -145,9 +145,9 @@ class AsyncWebSocketMultiMessage: public AsyncWebSocketMessage { size_t _sent; size_t _ack; size_t _acked; - AsyncWebSocketMessageBuffer * _WSbuffer; + AsyncWebSocketMessageBuffer * _WSbuffer; public: - AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuffer * buffer, uint8_t opcode=WS_TEXT, bool mask=false); + AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuffer * buffer, uint8_t opcode=WS_TEXT, bool mask=false); virtual ~AsyncWebSocketMultiMessage() override; virtual bool betweenFrames() const override { return _acked == _ack; } virtual void ack(size_t len, uint32_t time) override ; @@ -173,6 +173,7 @@ class AsyncWebSocketClient { void _queueMessage(AsyncWebSocketMessage *dataMessage); void _queueControl(AsyncWebSocketControl *controlMessage); void _runQueue(); + void _clearQueue(); public: void *_tempObject; @@ -205,6 +206,7 @@ class AsyncWebSocketClient { //data packets void message(AsyncWebSocketMessage *message){ _queueMessage(message); } bool queueIsFull(); + size_t queueLen() { return _messageQueue.length() + _controlQueue.length(); } size_t printf(const char *format, ...) __attribute__ ((format (printf, 2, 3))); #ifndef ESP32 @@ -216,7 +218,7 @@ class AsyncWebSocketClient { void text(char * message); void text(const String &message); void text(const __FlashStringHelper *data); - void text(AsyncWebSocketMessageBuffer *buffer); + void text(AsyncWebSocketMessageBuffer *buffer); void binary(const char * message, size_t len); void binary(const char * message); @@ -224,7 +226,7 @@ class AsyncWebSocketClient { void binary(char * message); void binary(const String &message); void binary(const __FlashStringHelper *data, size_t len); - void binary(AsyncWebSocketMessageBuffer *buffer); + void binary(AsyncWebSocketMessageBuffer *buffer); bool canSend() { return _messageQueue.length() < WS_MAX_QUEUED_MESSAGES; } @@ -286,7 +288,7 @@ class AsyncWebSocket: public AsyncWebHandler { void textAll(char * message); void textAll(const String &message); void textAll(const __FlashStringHelper *message); // need to convert - void textAll(AsyncWebSocketMessageBuffer * buffer); + void textAll(AsyncWebSocketMessageBuffer * buffer); void binary(uint32_t id, const char * message, size_t len); void binary(uint32_t id, const char * message); @@ -301,7 +303,7 @@ class AsyncWebSocket: public AsyncWebHandler { void binaryAll(char * message); void binaryAll(const String &message); void binaryAll(const __FlashStringHelper *message, size_t len); - void binaryAll(AsyncWebSocketMessageBuffer * buffer); + void binaryAll(AsyncWebSocketMessageBuffer * buffer); void message(uint32_t id, AsyncWebSocketMessage *message); void messageAll(AsyncWebSocketMultiMessage *message); @@ -332,11 +334,11 @@ class AsyncWebSocket: public AsyncWebHandler { virtual void handleRequest(AsyncWebServerRequest *request) override final; - // messagebuffer functions/objects. - AsyncWebSocketMessageBuffer * makeBuffer(size_t size = 0); - AsyncWebSocketMessageBuffer * makeBuffer(uint8_t * data, size_t size); + // messagebuffer functions/objects. + AsyncWebSocketMessageBuffer * makeBuffer(size_t size = 0); + AsyncWebSocketMessageBuffer * makeBuffer(uint8_t * data, size_t size); LinkedList _buffers; - void _cleanBuffers(); + void _cleanBuffers(); AsyncWebSocketClientLinkedList getClients() const; };