Coalesce poll events on queue eviction

Refer: https://github.com/mathieucarbou/ESPAsyncWebServer/discussions/165

Let's try to coalesce two (or more) consecutive poll events into one
this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections
This commit is contained in:
Emil Muratov
2024-12-13 16:32:15 +09:00
parent 9355538a8f
commit afa162b73a

View File

@@ -61,6 +61,12 @@ extern "C" {
#define INVALID_CLOSED_SLOT -1 #define INVALID_CLOSED_SLOT -1
/*
TCP poll interval is specified in terms of the TCP coarse timer interval, which is called twice a second
https://github.com/espressif/esp-lwip/blob/2acf959a2bb559313cd2bf9306c24612ba3d0e19/src/core/tcp.c#L1895
*/
#define CONFIG_ASYNC_TCP_POLL_TIMER 1
/* /*
* TCP/IP Event Task * TCP/IP Event Task
* */ * */
@@ -148,7 +154,34 @@ static inline bool _prepend_async_event(lwip_event_packet_t** e) {
} }
static inline bool _get_async_event(lwip_event_packet_t** e) { static inline bool _get_async_event(lwip_event_packet_t** e) {
return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS; if (!_async_queue) {
return false;
}
if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS)
return false;
/*
Let's try to coalesce two (or more) consecutive poll events into one
this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections
*/
lwip_event_packet_t* next_pkt = NULL;
while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS){
if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL && (*e)->event == LWIP_TCP_POLL){
if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS){
free(next_pkt);
next_pkt = NULL;
log_w("coalescing polls, async callback might be too slow!");
} else
return true;
} else
return true;
}
return true;
} }
static bool _remove_events_with_arg(void* arg) { static bool _remove_events_with_arg(void* arg) {
@@ -311,6 +344,7 @@ static int8_t _tcp_connected(void* arg, tcp_pcb* pcb, int8_t err) {
static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) { static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
// throttle polling events queing when event queue is getting filled up, let it handle _onack's // throttle polling events queing when event queue is getting filled up, let it handle _onack's
//log_d("qs:%u", uxQueueMessagesWaiting(_async_queue));
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) {
log_d("throttling"); log_d("throttling");
return ERR_OK; return ERR_OK;
@@ -612,7 +646,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
tcp_recv(_pcb, &_tcp_recv); tcp_recv(_pcb, &_tcp_recv);
tcp_sent(_pcb, &_tcp_sent); tcp_sent(_pcb, &_tcp_sent);
tcp_err(_pcb, &_tcp_error); tcp_err(_pcb, &_tcp_error);
tcp_poll(_pcb, &_tcp_poll, 1); tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
if (!_allocate_closed_slot()) { if (!_allocate_closed_slot()) {
_close(); _close();
} }