diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index e09c058..6a290e1 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -141,17 +141,19 @@ size_t AsyncEventSourceMessage::ack(size_t len, __attribute__((unused)) uint32_t return 0; } -// This could also return void as the return value is not used. -// Leaving as-is for compatibility... -size_t AsyncEventSourceMessage::send(AsyncClient* client) { - if (_sent >= _len) { +size_t AsyncEventSourceMessage::write(AsyncClient* client) { + if (_sent >= _len || !client->canSend()) { return 0; } - const size_t len_to_send = _len - _sent; - auto position = reinterpret_cast(_data + _sent); - const size_t sent_now = client->write(position, len_to_send); - _sent += sent_now; - return sent_now; + size_t len = min(_len - _sent, client->space()); + size_t sent = client->add((const char*)_data + _sent, len); + _sent += sent; + return sent; +} + +size_t AsyncEventSourceMessage::send(AsyncClient* client) { + size_t sent = write(client); + return sent && client->send() ? sent : 0; } // Client @@ -173,6 +175,8 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest* request, A _server->_addClient(this); delete request; + + _client->setNoDelay(true); } AsyncEventSourceClient::~AsyncEventSourceClient() { @@ -210,11 +214,6 @@ void AsyncEventSourceClient::_onAck(size_t len, uint32_t time) { // Same here, acquiring the lock early std::lock_guard lock(_lockmq); #endif - while (len && _messageQueue.size()) { - len = _messageQueue.front().ack(len, time); - if (_messageQueue.front().finished()) - _messageQueue.pop_front(); - } _runQueue(); } @@ -263,11 +262,24 @@ size_t AsyncEventSourceClient::packetsWaiting() const { } void AsyncEventSourceClient::_runQueue() { - // Calls to this private method now already protected by _lockmq acquisition - // so no extra call of _lockmq.lock() here.. - for (auto& i : _messageQueue) { - if (!i.sent()) - i.send(_client); + size_t total_bytes_written = 0; + for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) { + if (!i->sent()) { + const size_t bytes_written = i->write(_client); + total_bytes_written += bytes_written; + if (bytes_written == 0) + break; + } + } + if (total_bytes_written > 0) + _client->send(); + + size_t len = total_bytes_written; + while (len && _messageQueue.size()) { + len = _messageQueue.front().ack(len); + if (_messageQueue.front().finished()) { + _messageQueue.pop_front(); + } } } diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index 986f83a..2eae211 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -67,6 +67,7 @@ class AsyncEventSourceMessage { AsyncEventSourceMessage(const char* data, size_t len); ~AsyncEventSourceMessage(); size_t ack(size_t len, uint32_t time = 0); + size_t write(AsyncClient* client); size_t send(AsyncClient* client); bool finished() { return _acked == _len; } bool sent() { return _sent == _len; }