implement in-flight buffer credits and event moderation for large/chunked responses

Referer to https://github.com/mathieucarbou/ESPAsyncWebServer/discussions/165
Relates to #169

in-flight buffer credits are intended to moderate buffer fill callbacks in AsyncAbstractResponse
it could prevent bad designed slow user-callbacks to flood the queue in chunked responces.

for response data we need to control the queue and in-flight fragmentation. Sending small chunks could give low latency,
but flood asynctcp's queue and fragment socket buffer space for large responses.
Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space.
That way we could balance on having half the buffer in-flight while another half is filling up, while minimizing events in asynctcp q
This commit is contained in:
Emil Muratov
2024-12-13 16:23:09 +09:00
parent 359cc6871c
commit 3d3456e9e8
2 changed files with 35 additions and 0 deletions

View File

@ -47,6 +47,10 @@ class AsyncBasicResponse : public AsyncWebServerResponse {
class AsyncAbstractResponse : public AsyncWebServerResponse {
private:
// amount of responce data in-flight, i.e. sent, but not acked yet
size_t _in_flight{0};
// in-flight queue credits
size_t _in_flight_credit{2};
String _head;
// Data is inserted into cache at begin().
// This is inefficient with vector, but if we use some other container,

View File

@ -352,7 +352,21 @@ size_t AsyncAbstractResponse::_ack(AsyncWebServerRequest* request, size_t len, u
request->client()->close();
return 0;
}
// return a credit for each chunk of acked data (polls does not give any credits)
if (len)
++_in_flight_credit;
// for chunked responses ignore acks if there are no _in_flight_credits left
if (_chunked && !_in_flight_credit){
#ifdef ESP32
log_d("(chunk) out of in-flight credits");
#endif
return 0;
}
_ackedLength += len;
_in_flight -= (_in_flight > len) ? len : _in_flight;
// get the size of available sock space
size_t space = request->client()->space();
size_t headLen = _head.length();
@ -364,16 +378,31 @@ size_t AsyncAbstractResponse::_ack(AsyncWebServerRequest* request, size_t len, u
String out = _head.substring(0, space);
_head = _head.substring(space);
_writtenLength += request->client()->write(out.c_str(), out.length());
_in_flight += out.length();
--_in_flight_credit; // take a credit
return out.length();
}
}
if (_state == RESPONSE_CONTENT) {
// for response data we need to control the queue and in-flight fragmentation. Sending small chunks could give low latency,
// but flood asynctcp's queue and fragment socket buffer space for large responses.
// Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space.
// That way we could balance on having half the buffer in-flight while another half is filling up, while minimizing events in asynctcp q
if (_in_flight > space){
//log_d("defer user call %u/%u", _in_flight, space);
// take the credit back since we are ignoring this ack and rely on other inflight data
if (len)
--_in_flight_credit;
return 0;
}
size_t outLen;
if (_chunked) {
if (space <= 8) {
return 0;
}
outLen = space;
} else if (!_sendContentLength) {
outLen = space;
@ -422,6 +451,8 @@ size_t AsyncAbstractResponse::_ack(AsyncWebServerRequest* request, size_t len, u
if (outLen) {
_writtenLength += request->client()->write((const char*)buf, outLen);
_in_flight += outLen;
--_in_flight_credit; // take a credit
}
if (_chunked) {