Merge pull request #34 from mathieucarbou/fixups

fix: _remove_events_with_arg() might probably leak mem on queue overflow
This commit is contained in:
Mathieu Carbou
2024-12-16 21:37:07 +01:00
committed by GitHub
2 changed files with 34 additions and 22 deletions

View File

@@ -187,35 +187,42 @@ static inline bool _get_async_event(lwip_event_packet_t** e) {
log_d("coalescing polls, network congestion or async callbacks might be too slow!"); log_d("coalescing polls, network congestion or async callbacks might be too slow!");
continue; continue;
} }
} else {
/*
poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
We can try to mitigate it by discarding poll events when queue grows too much.
Let's discard poll events using linear probability curve starting from 3/4 of queue length
Poll events are periodic and connection could get another chance next time
*/
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
free(next_pkt);
next_pkt = NULL;
log_d("discarding poll due to queue congestion");
// evict next event from a queue
return _get_async_event(e);
}
} }
return true;
// quit while loop if next event can't be discarded
break;
} }
/*
now we have to decide if to proceed with poll callback handler or discard it?
poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
I.e. on each poll app would try to generate more data to send, which in turn results in additional ack event triggering chain effect
for long connections. Or poll callback could take long time starving other connections. Anyway our goal is to keep the queue length
grows under control (if possible) and poll events are the safest to discard.
Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4
Poll events are periodic and connection could get another chance next time
*/
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
free(*e);
*e = NULL;
log_d("discarding poll due to queue congestion");
// evict next event from a queue
return _get_async_event(e);
}
// last resort return // last resort return
return true; return true;
} }
static bool _remove_events_with_arg(void* arg) { static bool _remove_events_with_arg(void* arg) {
lwip_event_packet_t* first_packet = NULL;
lwip_event_packet_t* packet = NULL;
if (!_async_queue) { if (!_async_queue) {
return false; return false;
} }
// figure out which is the first packet so we can keep the order
lwip_event_packet_t* first_packet = NULL;
lwip_event_packet_t* packet = NULL;
// figure out which is the first non-matching packet so we can keep the order
while (!first_packet) { while (!first_packet) {
if (xQueueReceive(_async_queue, &first_packet, 0) != pdPASS) { if (xQueueReceive(_async_queue, &first_packet, 0) != pdPASS) {
return false; return false;
@@ -224,11 +231,12 @@ static bool _remove_events_with_arg(void* arg) {
if ((int)first_packet->arg == (int)arg) { if ((int)first_packet->arg == (int)arg) {
free(first_packet); free(first_packet);
first_packet = NULL; first_packet = NULL;
// try to return first packet to the back of the queue
} else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) { } else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) {
// try to return first packet to the back of the queue
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
// otherwise it would deadlock, we have to discard the event // otherwise it would deadlock, we have to discard the event
free(first_packet);
first_packet = NULL;
return false; return false;
} }
} }
@@ -238,11 +246,15 @@ static bool _remove_events_with_arg(void* arg) {
return false; return false;
} }
if ((int)packet->arg == (int)arg) { if ((int)packet->arg == (int)arg) {
// remove matching event
free(packet); free(packet);
packet = NULL; packet = NULL;
// otherwise try to requeue it
} else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) { } else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) {
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
// otherwise it would deadlock, we have to discard the event // otherwise it would deadlock, we have to discard the event
free(packet);
packet = NULL;
return false; return false;
} }
} }

View File

@@ -241,7 +241,7 @@ class AsyncClient {
// will not ack the current packet. Call from onData // will not ack the current packet. Call from onData
void ackLater() { _ack_pcb = false; } void ackLater() { _ack_pcb = false; }
const char* errorToString(int8_t error); static const char* errorToString(int8_t error);
const char* stateToString(); const char* stateToString();
// internal callbacks - Do NOT call any of the functions below in user code! // internal callbacks - Do NOT call any of the functions below in user code!