From 34a03208344beeb90dd3a27d301bead1b988b94f Mon Sep 17 00:00:00 2001 From: me-no-dev Date: Thu, 16 Aug 2018 22:19:50 +0200 Subject: [PATCH] Connection close stability improvements Should help with most of the exceptions that have been plaguing the ESP32 port --- src/AsyncTCP.cpp | 96 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 23 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index ebeb875..9764ecb 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -83,10 +83,13 @@ static TaskHandle_t _async_service_task_handle = NULL; static void _handle_async_event(lwip_event_packet_t * e){ if(e->event == LWIP_TCP_RECV){ + //ets_printf("%c: 0x%08x 0x%08x\n", e->recv.pb?'R':'D', e->arg, e->recv.pcb); AsyncClient::_s_recv(e->arg, e->recv.pcb, e->recv.pb, e->recv.err); } else if(e->event == LWIP_TCP_SENT){ + //ets_printf("S: 0x%08x 0x%08x\n", e->arg, e->sent.pcb); AsyncClient::_s_sent(e->arg, e->sent.pcb, e->sent.len); } else if(e->event == LWIP_TCP_POLL){ + //ets_printf("P: 0x%08x 0x%08x\n", e->arg, e->poll.pcb); AsyncClient::_s_poll(e->arg, e->poll.pcb); } else if(e->event == LWIP_TCP_ERROR){ AsyncClient::_s_error(e->arg, e->error.err); @@ -94,11 +97,66 @@ static void _handle_async_event(lwip_event_packet_t * e){ free((void*)(e)); } +static inline bool _init_async_event_queue(){ + if(!_async_queue){ + _async_queue = xQueueCreate(32, sizeof(lwip_event_packet_t *)); + if(!_async_queue){ + return false; + } + } + return true; +} + +static inline bool _send_async_event(lwip_event_packet_t ** e){ + return _async_queue && xQueueSend(_async_queue, e, portMAX_DELAY) == pdPASS; +} + +static inline bool _get_async_event(lwip_event_packet_t ** e){ + return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS; +} + +static bool _remove_events_with_arg(void * arg){ + lwip_event_packet_t * first_packet = NULL; + lwip_event_packet_t * packet = NULL; + + if(!_async_queue){ + return false; + } + //figure out which is the first packet so we can keep the order + while(!first_packet){ + if(xQueueReceive(_async_queue, &first_packet, 0) != pdPASS){ + return false; + } + //discard packet if matching + if((int)first_packet->arg == (int)arg){ + //ets_printf("X: 0x%08x\n", (uint32_t)first_packet->arg); + free(first_packet); + first_packet = NULL; + //return first packet to the back of the queue + } else if(xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS){ + return false; + } + } + + while(xQueuePeek(_async_queue, &packet, 0) == pdPASS && packet != first_packet){ + if(xQueueReceive(_async_queue, &packet, 0) != pdPASS){ + return false; + } + if((int)packet->arg == (int)arg){ + //ets_printf("X: 0x%08x\n", (uint32_t)packet->arg); + free(packet); + packet = NULL; + } else if(xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS){ + return false; + } + } + return true; +} + static void _async_service_task(void *pvParameters){ lwip_event_packet_t * packet = NULL; for (;;) { - if(xQueueReceive(_async_queue, &packet, portMAX_DELAY) == pdTRUE){ - //dispatch packet + if(_get_async_event(&packet)){ _handle_async_event(packet); } } @@ -114,11 +172,8 @@ static void _stop_async_task(){ } */ static bool _start_async_task(){ - if(!_async_queue){ - _async_queue = xQueueCreate(32, sizeof(lwip_event_packet_t *)); - if(!_async_queue){ - return false; - } + if(!_init_async_event_queue()){ + return false; } if(!_async_service_task_handle){ xTaskCreatePinnedToCore(_async_service_task, "async_tcp", 8192, NULL, 3, &_async_service_task_handle, ASYNCTCP_RUNNING_CORE); @@ -134,59 +189,47 @@ static bool _start_async_task(){ * */ static int8_t _tcp_poll(void * arg, struct tcp_pcb * pcb) { - if(!_async_queue){ - return ERR_OK; - } lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t)); e->event = LWIP_TCP_POLL; e->arg = arg; e->poll.pcb = pcb; - if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) { + if (!_send_async_event(&e)) { free((void*)(e)); } return ERR_OK; } static int8_t _tcp_recv(void * arg, struct tcp_pcb * pcb, struct pbuf *pb, int8_t err) { - if(!_async_queue){ - return ERR_OK; - } lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t)); e->event = LWIP_TCP_RECV; e->arg = arg; e->recv.pcb = pcb; e->recv.pb = pb; e->recv.err = err; - if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) { + if (!_send_async_event(&e)) { free((void*)(e)); } return ERR_OK; } static int8_t _tcp_sent(void * arg, struct tcp_pcb * pcb, uint16_t len) { - if(!_async_queue){ - return ERR_OK; - } lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t)); e->event = LWIP_TCP_SENT; e->arg = arg; e->sent.pcb = pcb; e->sent.len = len; - if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) { + if (!_send_async_event(&e)) { free((void*)(e)); } return ERR_OK; } static void _tcp_error(void * arg, int8_t err) { - if(!_async_queue){ - return; - } lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t)); e->event = LWIP_TCP_ERROR; e->arg = arg; e->error.err = err; - if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) { + if (!_send_async_event(&e)) { free((void*)(e)); } } @@ -290,6 +333,7 @@ static err_t _tcp_close_api(struct tcpip_api_call *api_call_msg){ static esp_err_t _tcp_close(tcp_pcb * pcb) { tcp_api_call_t msg; msg.pcb = pcb; + //ets_printf("close 0x%08x\n", (uint32_t)pcb); tcpip_api_call(_tcp_close_api, (struct tcpip_api_call*)&msg); return msg.err; } @@ -304,6 +348,7 @@ static err_t _tcp_abort_api(struct tcpip_api_call *api_call_msg){ static esp_err_t _tcp_abort(tcp_pcb * pcb) { tcp_api_call_t msg; msg.pcb = pcb; + //ets_printf("abort 0x%08x\n", (uint32_t)pcb); tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call*)&msg); return msg.err; } @@ -380,6 +425,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) tcp_sent(_pcb, &_tcp_sent); tcp_err(_pcb, &_tcp_error); tcp_poll(_pcb, &_tcp_poll, 1); + //ets_printf("accept 0x%08x\n", (uint32_t)_pcb); } } @@ -468,6 +514,7 @@ int8_t AsyncClient::_close(){ err = abort(); } _pcb = NULL; + _remove_events_with_arg(this); if(_discard_cb) _discard_cb(_discard_cb_arg, this); } @@ -490,6 +537,7 @@ void AsyncClient::_error(int8_t err) { } int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { + _in_lwip_thread = false; _rx_last_packet = millis(); //log_i("%u", len); _pcb_busy = false; @@ -499,6 +547,7 @@ int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { } int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { + _in_lwip_thread = false; if(pb == NULL){ return _close(); } @@ -528,6 +577,7 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { } int8_t AsyncClient::_poll(tcp_pcb* pcb){ + _in_lwip_thread = false; // Close requested if(_close_pcb){ _close_pcb = false;