Merge pull request #33 from mathieucarbou/coalescedq

Coalesce poll events on queue eviction
This commit is contained in:
Mathieu Carbou
2024-12-15 18:15:00 +01:00
committed by GitHub
2 changed files with 178 additions and 58 deletions

View File

@@ -61,6 +61,12 @@ extern "C" {
#define INVALID_CLOSED_SLOT -1 #define INVALID_CLOSED_SLOT -1
/*
TCP poll interval is specified in terms of the TCP coarse timer interval, which is called twice a second
https://github.com/espressif/esp-lwip/blob/2acf959a2bb559313cd2bf9306c24612ba3d0e19/src/core/tcp.c#L1895
*/
#define CONFIG_ASYNC_TCP_POLL_TIMER 1
/* /*
* TCP/IP Event Task * TCP/IP Event Task
* */ * */
@@ -139,16 +145,67 @@ static inline bool _init_async_event_queue() {
return true; return true;
} }
static inline bool _send_async_event(lwip_event_packet_t** e) { static inline bool _send_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) {
return _async_queue && xQueueSend(_async_queue, e, portMAX_DELAY) == pdPASS; return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS;
} }
static inline bool _prepend_async_event(lwip_event_packet_t** e) { static inline bool _prepend_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) {
return _async_queue && xQueueSendToFront(_async_queue, e, portMAX_DELAY) == pdPASS; return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS;
} }
static inline bool _get_async_event(lwip_event_packet_t** e) { static inline bool _get_async_event(lwip_event_packet_t** e) {
return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS; if (!_async_queue) {
return false;
}
#if CONFIG_ASYNC_TCP_USE_WDT
// need to return periodically to feed the dog
if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS)
return false;
#else
if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS)
return false;
#endif
if ((*e)->event != LWIP_TCP_POLL)
return true;
/*
Let's try to coalesce two (or more) consecutive poll events into one
this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections
*/
lwip_event_packet_t* next_pkt = NULL;
while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) {
if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) {
if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) {
free(next_pkt);
next_pkt = NULL;
log_d("coalescing polls, network congestion or async callbacks might be too slow!");
continue;
}
} else {
/*
poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
We can try to mitigate it by discarding poll events when queue grows too much.
Let's discard poll events using linear probability curve starting from 3/4 of queue length
Poll events are periodic and connection could get another chance next time
*/
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
free(next_pkt);
next_pkt = NULL;
log_d("discarding poll due to queue congestion");
// evict next event from a queue
return _get_async_event(e);
}
}
return true;
}
// last resort return
return true;
} }
static bool _remove_events_with_arg(void* arg) { static bool _remove_events_with_arg(void* arg) {
@@ -167,8 +224,11 @@ static bool _remove_events_with_arg(void* arg) {
if ((int)first_packet->arg == (int)arg) { if ((int)first_packet->arg == (int)arg) {
free(first_packet); free(first_packet);
first_packet = NULL; first_packet = NULL;
// return first packet to the back of the queue
} else if (xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS) { // try to return first packet to the back of the queue
} else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) {
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
// otherwise it would deadlock, we have to discard the event
return false; return false;
} }
} }
@@ -180,7 +240,9 @@ static bool _remove_events_with_arg(void* arg) {
if ((int)packet->arg == (int)arg) { if ((int)packet->arg == (int)arg) {
free(packet); free(packet);
packet = NULL; packet = NULL;
} else if (xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS) { } else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) {
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
// otherwise it would deadlock, we have to discard the event
return false; return false;
} }
} }
@@ -222,22 +284,23 @@ static void _handle_async_event(lwip_event_packet_t* e) {
} }
static void _async_service_task(void* pvParameters) { static void _async_service_task(void* pvParameters) {
#if CONFIG_ASYNC_TCP_USE_WDT
if (esp_task_wdt_add(NULL) != ESP_OK) {
log_w("Failed to add async task to WDT");
}
#endif
lwip_event_packet_t* packet = NULL; lwip_event_packet_t* packet = NULL;
for (;;) { for (;;) {
if (_get_async_event(&packet)) { if (_get_async_event(&packet)) {
#if CONFIG_ASYNC_TCP_USE_WDT
if (esp_task_wdt_add(NULL) != ESP_OK) {
log_e("Failed to add async task to WDT");
}
#endif
_handle_async_event(packet); _handle_async_event(packet);
#if CONFIG_ASYNC_TCP_USE_WDT
if (esp_task_wdt_delete(NULL) != ESP_OK) {
log_e("Failed to remove loop task from WDT");
}
#endif
} }
#if CONFIG_ASYNC_TCP_USE_WDT
esp_task_wdt_reset();
#endif
} }
#if CONFIG_ASYNC_TCP_USE_WDT
esp_task_wdt_delete(NULL);
#endif
vTaskDelete(NULL); vTaskDelete(NULL);
_async_service_task_handle = NULL; _async_service_task_handle = NULL;
} }
@@ -311,6 +374,7 @@ static int8_t _tcp_connected(void* arg, tcp_pcb* pcb, int8_t err) {
static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) { static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
// throttle polling events queing when event queue is getting filled up, let it handle _onack's // throttle polling events queing when event queue is getting filled up, let it handle _onack's
// log_d("qs:%u", uxQueueMessagesWaiting(_async_queue));
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) {
log_d("throttling"); log_d("throttling");
return ERR_OK; return ERR_OK;
@@ -321,7 +385,8 @@ static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
e->event = LWIP_TCP_POLL; e->event = LWIP_TCP_POLL;
e->arg = arg; e->arg = arg;
e->poll.pcb = pcb; e->poll.pcb = pcb;
if (!_send_async_event(&e)) { // poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case
if (!_send_async_event(&e, 0)) {
free((void*)(e)); free((void*)(e));
} }
return ERR_OK; return ERR_OK;
@@ -612,7 +677,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
tcp_recv(_pcb, &_tcp_recv); tcp_recv(_pcb, &_tcp_recv);
tcp_sent(_pcb, &_tcp_sent); tcp_sent(_pcb, &_tcp_sent);
tcp_err(_pcb, &_tcp_error); tcp_err(_pcb, &_tcp_error);
tcp_poll(_pcb, &_tcp_poll, 1); tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
if (!_allocate_closed_slot()) { if (!_allocate_closed_slot()) {
_close(); _close();
} }
@@ -643,7 +708,7 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other) {
tcp_recv(_pcb, &_tcp_recv); tcp_recv(_pcb, &_tcp_recv);
tcp_sent(_pcb, &_tcp_sent); tcp_sent(_pcb, &_tcp_sent);
tcp_err(_pcb, &_tcp_error); tcp_err(_pcb, &_tcp_error);
tcp_poll(_pcb, &_tcp_poll, 1); tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
} }
return *this; return *this;
} }
@@ -741,7 +806,7 @@ bool AsyncClient::_connect(ip_addr_t addr, uint16_t port) {
tcp_err(pcb, &_tcp_error); tcp_err(pcb, &_tcp_error);
tcp_recv(pcb, &_tcp_recv); tcp_recv(pcb, &_tcp_recv);
tcp_sent(pcb, &_tcp_sent); tcp_sent(pcb, &_tcp_sent);
tcp_poll(pcb, &_tcp_poll, 1); tcp_poll(pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
TCP_MUTEX_UNLOCK(); TCP_MUTEX_UNLOCK();
esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected); esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected);
@@ -1090,10 +1155,6 @@ void AsyncClient::_dns_found(struct ip_addr* ipaddr) {
* Public Helper Methods * Public Helper Methods
* */ * */
void AsyncClient::stop() {
close(false);
}
bool AsyncClient::free() { bool AsyncClient::free() {
if (!_pcb) { if (!_pcb) {
return true; return true;
@@ -1104,13 +1165,6 @@ bool AsyncClient::free() {
return false; return false;
} }
size_t AsyncClient::write(const char* data) {
if (data == NULL) {
return 0;
}
return write(data, strlen(data));
}
size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) { size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) {
size_t will_send = add(data, size, apiflags); size_t will_send = add(data, size, apiflags);
if (!will_send || !send()) { if (!will_send || !send()) {

View File

@@ -48,13 +48,16 @@ extern "C" {
#include <semphr.h> #include <semphr.h>
} }
#define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core #define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core
#define CONFIG_ASYNC_TCP_USE_WDT 0
#endif #endif
// If core is not defined, then we are running in Arduino or PIO // If core is not defined, then we are running in Arduino or PIO
#ifndef CONFIG_ASYNC_TCP_RUNNING_CORE #ifndef CONFIG_ASYNC_TCP_RUNNING_CORE
#define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core #define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core
#define CONFIG_ASYNC_TCP_USE_WDT 1 // if enabled, adds between 33us and 200us per event #endif
// guard AsyncTCP task with watchdog
#ifndef CONFIG_ASYNC_TCP_USE_WDT
#define CONFIG_ASYNC_TCP_USE_WDT 1
#endif #endif
#ifndef CONFIG_ASYNC_TCP_STACK_SIZE #ifndef CONFIG_ASYNC_TCP_STACK_SIZE
@@ -106,34 +109,86 @@ class AsyncClient {
bool connect(const IPv6Address& ip, uint16_t port); bool connect(const IPv6Address& ip, uint16_t port);
#endif #endif
bool connect(const char* host, uint16_t port); bool connect(const char* host, uint16_t port);
/**
* @brief close connection
*
* @param now - ignored
*/
void close(bool now = false); void close(bool now = false);
void stop(); // same as close()
void stop() { close(false); };
int8_t abort(); int8_t abort();
bool free(); bool free();
bool canSend(); // ack is not pending // ack is not pending
size_t space(); // space available in the TCP window bool canSend();
size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // add for sending // TCP buffer space available
bool send(); // send all data added with the method above size_t space();
// write equals add()+send() /**
size_t write(const char* data); * @brief add data to be send (but do not send yet)
size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // only when canSend() == true * @note add() would call lwip's tcp_write()
By default apiflags=ASYNC_WRITE_FLAG_COPY
You could try to use apiflags with this flag unset to pass data by reference and avoid copy to socket buffer,
but looks like it does not work for Arduino's lwip in ESP32/IDF at least
it is enforced in https://github.com/espressif/esp-lwip/blob/0606eed9d8b98a797514fdf6eabb4daf1c8c8cd9/src/core/tcp_out.c#L422C5-L422C30
if LWIP_NETIF_TX_SINGLE_PBUF is set, and it is set indeed in IDF
https://github.com/espressif/esp-idf/blob/a0f798cfc4bbd624aab52b2c194d219e242d80c1/components/lwip/port/include/lwipopts.h#L744
*
* @param data
* @param size
* @param apiflags
* @return size_t amount of data that has been copied
*/
size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY);
/**
* @brief send data previously add()'ed
*
* @return true on success
* @return false on error
*/
bool send();
/**
* @brief add and enqueue data for sending
* @note it is same as add() + send()
* @note only make sense when canSend() == true
*
* @param data
* @param size
* @param apiflags
* @return size_t
*/
size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY);
/**
* @brief add and enque data for sending
* @note treats data as null-terminated string
*
* @param data
* @return size_t
*/
size_t write(const char* data) { return data == NULL ? 0 : write(data, strlen(data)); };
uint8_t state(); uint8_t state();
bool connecting(); bool connecting();
bool connected(); bool connected();
bool disconnecting(); bool disconnecting();
bool disconnected(); bool disconnected();
bool freeable(); // disconnected or disconnecting
// disconnected or disconnecting
bool freeable();
uint16_t getMss(); uint16_t getMss();
uint32_t getRxTimeout(); uint32_t getRxTimeout();
void setRxTimeout(uint32_t timeout); // no RX data timeout for the connection in seconds // no RX data timeout for the connection in seconds
void setRxTimeout(uint32_t timeout);
uint32_t getAckTimeout(); uint32_t getAckTimeout();
void setAckTimeout(uint32_t timeout); // no ACK timeout for the last sent packet in milliseconds // no ACK timeout for the last sent packet in milliseconds
void setAckTimeout(uint32_t timeout);
void setNoDelay(bool nodelay); void setNoDelay(bool nodelay);
bool getNoDelay(); bool getNoDelay();
@@ -162,23 +217,34 @@ class AsyncClient {
IPAddress localIP(); IPAddress localIP();
uint16_t localPort(); uint16_t localPort();
void onConnect(AcConnectHandler cb, void* arg = 0); // on successful connect // set callback - on successful connect
void onDisconnect(AcConnectHandler cb, void* arg = 0); // disconnected void onConnect(AcConnectHandler cb, void* arg = 0);
void onAck(AcAckHandler cb, void* arg = 0); // ack received // set callback - disconnected
void onError(AcErrorHandler cb, void* arg = 0); // unsuccessful connect or error void onDisconnect(AcConnectHandler cb, void* arg = 0);
void onData(AcDataHandler cb, void* arg = 0); // data received (called if onPacket is not used) // set callback - ack received
void onPacket(AcPacketHandler cb, void* arg = 0); // data received void onAck(AcAckHandler cb, void* arg = 0);
void onTimeout(AcTimeoutHandler cb, void* arg = 0); // ack timeout // set callback - unsuccessful connect or error
void onPoll(AcConnectHandler cb, void* arg = 0); // every 125ms when connected void onError(AcErrorHandler cb, void* arg = 0);
// set callback - data received (called if onPacket is not used)
void onData(AcDataHandler cb, void* arg = 0);
// set callback - data received
void onPacket(AcPacketHandler cb, void* arg = 0);
// set callback - ack timeout
void onTimeout(AcTimeoutHandler cb, void* arg = 0);
// set callback - every 125ms when connected
void onPoll(AcConnectHandler cb, void* arg = 0);
void ackPacket(struct pbuf* pb); // ack pbuf from onPacket // ack pbuf from onPacket
size_t ack(size_t len); // ack data that you have not acked using the method below void ackPacket(struct pbuf* pb);
void ackLater() { _ack_pcb = false; } // will not ack the current packet. Call from onData // ack data that you have not acked using the method below
size_t ack(size_t len);
// will not ack the current packet. Call from onData
void ackLater() { _ack_pcb = false; }
const char* errorToString(int8_t error); const char* errorToString(int8_t error);
const char* stateToString(); const char* stateToString();
// Do not use any of the functions below! // internal callbacks - Do NOT call any of the functions below in user code!
static int8_t _s_poll(void* arg, struct tcp_pcb* tpcb); static int8_t _s_poll(void* arg, struct tcp_pcb* tpcb);
static int8_t _s_recv(void* arg, struct tcp_pcb* tpcb, struct pbuf* pb, int8_t err); static int8_t _s_recv(void* arg, struct tcp_pcb* tpcb, struct pbuf* pb, int8_t err);
static int8_t _s_fin(void* arg, struct tcp_pcb* tpcb, int8_t err); static int8_t _s_fin(void* arg, struct tcp_pcb* tpcb, int8_t err);