Connection close stability improvements

Should help with most of the exceptions that have been plaguing the ESP32 port
This commit is contained in:
me-no-dev
2018-08-16 22:19:50 +02:00
parent 2f76a9f5b6
commit 34a0320834

View File

@@ -83,10 +83,13 @@ static TaskHandle_t _async_service_task_handle = NULL;
static void _handle_async_event(lwip_event_packet_t * e){
if(e->event == LWIP_TCP_RECV){
//ets_printf("%c: 0x%08x 0x%08x\n", e->recv.pb?'R':'D', e->arg, e->recv.pcb);
AsyncClient::_s_recv(e->arg, e->recv.pcb, e->recv.pb, e->recv.err);
} else if(e->event == LWIP_TCP_SENT){
//ets_printf("S: 0x%08x 0x%08x\n", e->arg, e->sent.pcb);
AsyncClient::_s_sent(e->arg, e->sent.pcb, e->sent.len);
} else if(e->event == LWIP_TCP_POLL){
//ets_printf("P: 0x%08x 0x%08x\n", e->arg, e->poll.pcb);
AsyncClient::_s_poll(e->arg, e->poll.pcb);
} else if(e->event == LWIP_TCP_ERROR){
AsyncClient::_s_error(e->arg, e->error.err);
@@ -94,11 +97,66 @@ static void _handle_async_event(lwip_event_packet_t * e){
free((void*)(e));
}
static inline bool _init_async_event_queue(){
if(!_async_queue){
_async_queue = xQueueCreate(32, sizeof(lwip_event_packet_t *));
if(!_async_queue){
return false;
}
}
return true;
}
static inline bool _send_async_event(lwip_event_packet_t ** e){
return _async_queue && xQueueSend(_async_queue, e, portMAX_DELAY) == pdPASS;
}
static inline bool _get_async_event(lwip_event_packet_t ** e){
return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS;
}
static bool _remove_events_with_arg(void * arg){
lwip_event_packet_t * first_packet = NULL;
lwip_event_packet_t * packet = NULL;
if(!_async_queue){
return false;
}
//figure out which is the first packet so we can keep the order
while(!first_packet){
if(xQueueReceive(_async_queue, &first_packet, 0) != pdPASS){
return false;
}
//discard packet if matching
if((int)first_packet->arg == (int)arg){
//ets_printf("X: 0x%08x\n", (uint32_t)first_packet->arg);
free(first_packet);
first_packet = NULL;
//return first packet to the back of the queue
} else if(xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS){
return false;
}
}
while(xQueuePeek(_async_queue, &packet, 0) == pdPASS && packet != first_packet){
if(xQueueReceive(_async_queue, &packet, 0) != pdPASS){
return false;
}
if((int)packet->arg == (int)arg){
//ets_printf("X: 0x%08x\n", (uint32_t)packet->arg);
free(packet);
packet = NULL;
} else if(xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS){
return false;
}
}
return true;
}
static void _async_service_task(void *pvParameters){
lwip_event_packet_t * packet = NULL;
for (;;) {
if(xQueueReceive(_async_queue, &packet, portMAX_DELAY) == pdTRUE){
//dispatch packet
if(_get_async_event(&packet)){
_handle_async_event(packet);
}
}
@@ -114,11 +172,8 @@ static void _stop_async_task(){
}
*/
static bool _start_async_task(){
if(!_async_queue){
_async_queue = xQueueCreate(32, sizeof(lwip_event_packet_t *));
if(!_async_queue){
return false;
}
if(!_init_async_event_queue()){
return false;
}
if(!_async_service_task_handle){
xTaskCreatePinnedToCore(_async_service_task, "async_tcp", 8192, NULL, 3, &_async_service_task_handle, ASYNCTCP_RUNNING_CORE);
@@ -134,59 +189,47 @@ static bool _start_async_task(){
* */
static int8_t _tcp_poll(void * arg, struct tcp_pcb * pcb) {
if(!_async_queue){
return ERR_OK;
}
lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t));
e->event = LWIP_TCP_POLL;
e->arg = arg;
e->poll.pcb = pcb;
if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) {
if (!_send_async_event(&e)) {
free((void*)(e));
}
return ERR_OK;
}
static int8_t _tcp_recv(void * arg, struct tcp_pcb * pcb, struct pbuf *pb, int8_t err) {
if(!_async_queue){
return ERR_OK;
}
lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t));
e->event = LWIP_TCP_RECV;
e->arg = arg;
e->recv.pcb = pcb;
e->recv.pb = pb;
e->recv.err = err;
if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) {
if (!_send_async_event(&e)) {
free((void*)(e));
}
return ERR_OK;
}
static int8_t _tcp_sent(void * arg, struct tcp_pcb * pcb, uint16_t len) {
if(!_async_queue){
return ERR_OK;
}
lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t));
e->event = LWIP_TCP_SENT;
e->arg = arg;
e->sent.pcb = pcb;
e->sent.len = len;
if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) {
if (!_send_async_event(&e)) {
free((void*)(e));
}
return ERR_OK;
}
static void _tcp_error(void * arg, int8_t err) {
if(!_async_queue){
return;
}
lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t));
e->event = LWIP_TCP_ERROR;
e->arg = arg;
e->error.err = err;
if (xQueueSend(_async_queue, &e, portMAX_DELAY) != pdPASS) {
if (!_send_async_event(&e)) {
free((void*)(e));
}
}
@@ -290,6 +333,7 @@ static err_t _tcp_close_api(struct tcpip_api_call *api_call_msg){
static esp_err_t _tcp_close(tcp_pcb * pcb) {
tcp_api_call_t msg;
msg.pcb = pcb;
//ets_printf("close 0x%08x\n", (uint32_t)pcb);
tcpip_api_call(_tcp_close_api, (struct tcpip_api_call*)&msg);
return msg.err;
}
@@ -304,6 +348,7 @@ static err_t _tcp_abort_api(struct tcpip_api_call *api_call_msg){
static esp_err_t _tcp_abort(tcp_pcb * pcb) {
tcp_api_call_t msg;
msg.pcb = pcb;
//ets_printf("abort 0x%08x\n", (uint32_t)pcb);
tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call*)&msg);
return msg.err;
}
@@ -380,6 +425,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
tcp_sent(_pcb, &_tcp_sent);
tcp_err(_pcb, &_tcp_error);
tcp_poll(_pcb, &_tcp_poll, 1);
//ets_printf("accept 0x%08x\n", (uint32_t)_pcb);
}
}
@@ -468,6 +514,7 @@ int8_t AsyncClient::_close(){
err = abort();
}
_pcb = NULL;
_remove_events_with_arg(this);
if(_discard_cb)
_discard_cb(_discard_cb_arg, this);
}
@@ -490,6 +537,7 @@ void AsyncClient::_error(int8_t err) {
}
int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) {
_in_lwip_thread = false;
_rx_last_packet = millis();
//log_i("%u", len);
_pcb_busy = false;
@@ -499,6 +547,7 @@ int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) {
}
int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
_in_lwip_thread = false;
if(pb == NULL){
return _close();
}
@@ -528,6 +577,7 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
}
int8_t AsyncClient::_poll(tcp_pcb* pcb){
_in_lwip_thread = false;
// Close requested
if(_close_pcb){
_close_pcb = false;