diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 18cd24a..ae79d4c 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -61,6 +61,12 @@ extern "C" { #define INVALID_CLOSED_SLOT -1 +/* + TCP poll interval is specified in terms of the TCP coarse timer interval, which is called twice a second + https://github.com/espressif/esp-lwip/blob/2acf959a2bb559313cd2bf9306c24612ba3d0e19/src/core/tcp.c#L1895 +*/ +#define CONFIG_ASYNC_TCP_POLL_TIMER 1 + /* * TCP/IP Event Task * */ @@ -148,7 +154,34 @@ static inline bool _prepend_async_event(lwip_event_packet_t** e) { } static inline bool _get_async_event(lwip_event_packet_t** e) { - return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS; + if (!_async_queue) { + return false; + } + + if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS) + return false; + + /* + Let's try to coalesce two (or more) consecutive poll events into one + this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue + if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks. + This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time. + It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving. + todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections + */ + lwip_event_packet_t* next_pkt = NULL; + while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS){ + if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL && (*e)->event == LWIP_TCP_POLL){ + if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS){ + free(next_pkt); + next_pkt = NULL; + log_w("coalescing polls, async callback might be too slow!"); + } else + return true; + } else + return true; + } + return true; } static bool _remove_events_with_arg(void* arg) { @@ -311,6 +344,7 @@ static int8_t _tcp_connected(void* arg, tcp_pcb* pcb, int8_t err) { static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) { // throttle polling events queing when event queue is getting filled up, let it handle _onack's + //log_d("qs:%u", uxQueueMessagesWaiting(_async_queue)); if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { log_d("throttling"); return ERR_OK; @@ -612,7 +646,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) tcp_recv(_pcb, &_tcp_recv); tcp_sent(_pcb, &_tcp_sent); tcp_err(_pcb, &_tcp_error); - tcp_poll(_pcb, &_tcp_poll, 1); + tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); if (!_allocate_closed_slot()) { _close(); }