diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 18cd24a..12486a0 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -61,6 +61,12 @@ extern "C" { #define INVALID_CLOSED_SLOT -1 +/* + TCP poll interval is specified in terms of the TCP coarse timer interval, which is called twice a second + https://github.com/espressif/esp-lwip/blob/2acf959a2bb559313cd2bf9306c24612ba3d0e19/src/core/tcp.c#L1895 +*/ +#define CONFIG_ASYNC_TCP_POLL_TIMER 1 + /* * TCP/IP Event Task * */ @@ -139,16 +145,67 @@ static inline bool _init_async_event_queue() { return true; } -static inline bool _send_async_event(lwip_event_packet_t** e) { - return _async_queue && xQueueSend(_async_queue, e, portMAX_DELAY) == pdPASS; +static inline bool _send_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) { + return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS; } -static inline bool _prepend_async_event(lwip_event_packet_t** e) { - return _async_queue && xQueueSendToFront(_async_queue, e, portMAX_DELAY) == pdPASS; +static inline bool _prepend_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) { + return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS; } static inline bool _get_async_event(lwip_event_packet_t** e) { - return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS; + if (!_async_queue) { + return false; + } + +#if CONFIG_ASYNC_TCP_USE_WDT + // need to return periodically to feed the dog + if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS) + return false; +#else + if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS) + return false; +#endif + + if ((*e)->event != LWIP_TCP_POLL) + return true; + + /* + Let's try to coalesce two (or more) consecutive poll events into one + this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue + if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks. + This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time. + It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving. + todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections + */ + lwip_event_packet_t* next_pkt = NULL; + while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) { + if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) { + if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) { + free(next_pkt); + next_pkt = NULL; + log_d("coalescing polls, network congestion or async callbacks might be too slow!"); + continue; + } + } else { + /* + poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events. + We can try to mitigate it by discarding poll events when queue grows too much. + Let's discard poll events using linear probability curve starting from 3/4 of queue length + Poll events are periodic and connection could get another chance next time + */ + if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) { + free(next_pkt); + next_pkt = NULL; + log_d("discarding poll due to queue congestion"); + // evict next event from a queue + return _get_async_event(e); + } + } + return true; + } + // last resort return + return true; } static bool _remove_events_with_arg(void* arg) { @@ -167,8 +224,11 @@ static bool _remove_events_with_arg(void* arg) { if ((int)first_packet->arg == (int)arg) { free(first_packet); first_packet = NULL; - // return first packet to the back of the queue - } else if (xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS) { + + // try to return first packet to the back of the queue + } else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) { + // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue + // otherwise it would deadlock, we have to discard the event return false; } } @@ -180,7 +240,9 @@ static bool _remove_events_with_arg(void* arg) { if ((int)packet->arg == (int)arg) { free(packet); packet = NULL; - } else if (xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS) { + } else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) { + // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue + // otherwise it would deadlock, we have to discard the event return false; } } @@ -222,22 +284,23 @@ static void _handle_async_event(lwip_event_packet_t* e) { } static void _async_service_task(void* pvParameters) { +#if CONFIG_ASYNC_TCP_USE_WDT + if (esp_task_wdt_add(NULL) != ESP_OK) { + log_w("Failed to add async task to WDT"); + } +#endif lwip_event_packet_t* packet = NULL; for (;;) { if (_get_async_event(&packet)) { -#if CONFIG_ASYNC_TCP_USE_WDT - if (esp_task_wdt_add(NULL) != ESP_OK) { - log_e("Failed to add async task to WDT"); - } -#endif _handle_async_event(packet); -#if CONFIG_ASYNC_TCP_USE_WDT - if (esp_task_wdt_delete(NULL) != ESP_OK) { - log_e("Failed to remove loop task from WDT"); - } -#endif } +#if CONFIG_ASYNC_TCP_USE_WDT + esp_task_wdt_reset(); +#endif } +#if CONFIG_ASYNC_TCP_USE_WDT + esp_task_wdt_delete(NULL); +#endif vTaskDelete(NULL); _async_service_task_handle = NULL; } @@ -311,6 +374,7 @@ static int8_t _tcp_connected(void* arg, tcp_pcb* pcb, int8_t err) { static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) { // throttle polling events queing when event queue is getting filled up, let it handle _onack's + // log_d("qs:%u", uxQueueMessagesWaiting(_async_queue)); if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { log_d("throttling"); return ERR_OK; @@ -321,7 +385,8 @@ static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) { e->event = LWIP_TCP_POLL; e->arg = arg; e->poll.pcb = pcb; - if (!_send_async_event(&e)) { + // poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case + if (!_send_async_event(&e, 0)) { free((void*)(e)); } return ERR_OK; @@ -612,7 +677,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) tcp_recv(_pcb, &_tcp_recv); tcp_sent(_pcb, &_tcp_sent); tcp_err(_pcb, &_tcp_error); - tcp_poll(_pcb, &_tcp_poll, 1); + tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); if (!_allocate_closed_slot()) { _close(); } @@ -643,7 +708,7 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other) { tcp_recv(_pcb, &_tcp_recv); tcp_sent(_pcb, &_tcp_sent); tcp_err(_pcb, &_tcp_error); - tcp_poll(_pcb, &_tcp_poll, 1); + tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); } return *this; } @@ -741,7 +806,7 @@ bool AsyncClient::_connect(ip_addr_t addr, uint16_t port) { tcp_err(pcb, &_tcp_error); tcp_recv(pcb, &_tcp_recv); tcp_sent(pcb, &_tcp_sent); - tcp_poll(pcb, &_tcp_poll, 1); + tcp_poll(pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); TCP_MUTEX_UNLOCK(); esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected); @@ -1090,10 +1155,6 @@ void AsyncClient::_dns_found(struct ip_addr* ipaddr) { * Public Helper Methods * */ -void AsyncClient::stop() { - close(false); -} - bool AsyncClient::free() { if (!_pcb) { return true; @@ -1104,13 +1165,6 @@ bool AsyncClient::free() { return false; } -size_t AsyncClient::write(const char* data) { - if (data == NULL) { - return 0; - } - return write(data, strlen(data)); -} - size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) { size_t will_send = add(data, size, apiflags); if (!will_send || !send()) { diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index ab2609f..ee81ff5 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -48,13 +48,16 @@ extern "C" { #include } #define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core - #define CONFIG_ASYNC_TCP_USE_WDT 0 #endif // If core is not defined, then we are running in Arduino or PIO #ifndef CONFIG_ASYNC_TCP_RUNNING_CORE #define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core - #define CONFIG_ASYNC_TCP_USE_WDT 1 // if enabled, adds between 33us and 200us per event +#endif + +// guard AsyncTCP task with watchdog +#ifndef CONFIG_ASYNC_TCP_USE_WDT + #define CONFIG_ASYNC_TCP_USE_WDT 1 #endif #ifndef CONFIG_ASYNC_TCP_STACK_SIZE @@ -106,34 +109,86 @@ class AsyncClient { bool connect(const IPv6Address& ip, uint16_t port); #endif bool connect(const char* host, uint16_t port); + /** + * @brief close connection + * + * @param now - ignored + */ void close(bool now = false); - void stop(); + // same as close() + void stop() { close(false); }; int8_t abort(); bool free(); - bool canSend(); // ack is not pending - size_t space(); // space available in the TCP window - size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // add for sending - bool send(); // send all data added with the method above + // ack is not pending + bool canSend(); + // TCP buffer space available + size_t space(); - // write equals add()+send() - size_t write(const char* data); - size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // only when canSend() == true + /** + * @brief add data to be send (but do not send yet) + * @note add() would call lwip's tcp_write() + By default apiflags=ASYNC_WRITE_FLAG_COPY + You could try to use apiflags with this flag unset to pass data by reference and avoid copy to socket buffer, + but looks like it does not work for Arduino's lwip in ESP32/IDF at least + it is enforced in https://github.com/espressif/esp-lwip/blob/0606eed9d8b98a797514fdf6eabb4daf1c8c8cd9/src/core/tcp_out.c#L422C5-L422C30 + if LWIP_NETIF_TX_SINGLE_PBUF is set, and it is set indeed in IDF + https://github.com/espressif/esp-idf/blob/a0f798cfc4bbd624aab52b2c194d219e242d80c1/components/lwip/port/include/lwipopts.h#L744 + * + * @param data + * @param size + * @param apiflags + * @return size_t amount of data that has been copied + */ + size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); + + /** + * @brief send data previously add()'ed + * + * @return true on success + * @return false on error + */ + bool send(); + + /** + * @brief add and enqueue data for sending + * @note it is same as add() + send() + * @note only make sense when canSend() == true + * + * @param data + * @param size + * @param apiflags + * @return size_t + */ + size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); + + /** + * @brief add and enque data for sending + * @note treats data as null-terminated string + * + * @param data + * @return size_t + */ + size_t write(const char* data) { return data == NULL ? 0 : write(data, strlen(data)); }; uint8_t state(); bool connecting(); bool connected(); bool disconnecting(); bool disconnected(); - bool freeable(); // disconnected or disconnecting + + // disconnected or disconnecting + bool freeable(); uint16_t getMss(); uint32_t getRxTimeout(); - void setRxTimeout(uint32_t timeout); // no RX data timeout for the connection in seconds + // no RX data timeout for the connection in seconds + void setRxTimeout(uint32_t timeout); uint32_t getAckTimeout(); - void setAckTimeout(uint32_t timeout); // no ACK timeout for the last sent packet in milliseconds + // no ACK timeout for the last sent packet in milliseconds + void setAckTimeout(uint32_t timeout); void setNoDelay(bool nodelay); bool getNoDelay(); @@ -162,23 +217,34 @@ class AsyncClient { IPAddress localIP(); uint16_t localPort(); - void onConnect(AcConnectHandler cb, void* arg = 0); // on successful connect - void onDisconnect(AcConnectHandler cb, void* arg = 0); // disconnected - void onAck(AcAckHandler cb, void* arg = 0); // ack received - void onError(AcErrorHandler cb, void* arg = 0); // unsuccessful connect or error - void onData(AcDataHandler cb, void* arg = 0); // data received (called if onPacket is not used) - void onPacket(AcPacketHandler cb, void* arg = 0); // data received - void onTimeout(AcTimeoutHandler cb, void* arg = 0); // ack timeout - void onPoll(AcConnectHandler cb, void* arg = 0); // every 125ms when connected + // set callback - on successful connect + void onConnect(AcConnectHandler cb, void* arg = 0); + // set callback - disconnected + void onDisconnect(AcConnectHandler cb, void* arg = 0); + // set callback - ack received + void onAck(AcAckHandler cb, void* arg = 0); + // set callback - unsuccessful connect or error + void onError(AcErrorHandler cb, void* arg = 0); + // set callback - data received (called if onPacket is not used) + void onData(AcDataHandler cb, void* arg = 0); + // set callback - data received + void onPacket(AcPacketHandler cb, void* arg = 0); + // set callback - ack timeout + void onTimeout(AcTimeoutHandler cb, void* arg = 0); + // set callback - every 125ms when connected + void onPoll(AcConnectHandler cb, void* arg = 0); - void ackPacket(struct pbuf* pb); // ack pbuf from onPacket - size_t ack(size_t len); // ack data that you have not acked using the method below - void ackLater() { _ack_pcb = false; } // will not ack the current packet. Call from onData + // ack pbuf from onPacket + void ackPacket(struct pbuf* pb); + // ack data that you have not acked using the method below + size_t ack(size_t len); + // will not ack the current packet. Call from onData + void ackLater() { _ack_pcb = false; } const char* errorToString(int8_t error); const char* stateToString(); - // Do not use any of the functions below! + // internal callbacks - Do NOT call any of the functions below in user code! static int8_t _s_poll(void* arg, struct tcp_pcb* tpcb); static int8_t _s_recv(void* arg, struct tcp_pcb* tpcb, struct pbuf* pb, int8_t err); static int8_t _s_fin(void* arg, struct tcp_pcb* tpcb, int8_t err);