From 9a4a58a0db67dd232f78733ae08d39bac3d0980b Mon Sep 17 00:00:00 2001 From: Matt Date: Mon, 23 Sep 2019 19:41:29 +0100 Subject: [PATCH 1/3] Allow for multiple close events. --- src/AsyncTCP.cpp | 77 ++++++++++++++++++++++++++++++------------------ src/AsyncTCP.h | 1 + 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 353feee..9eeba1a 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -78,7 +78,9 @@ typedef struct { static xQueueHandle _async_queue; static TaskHandle_t _async_service_task_handle = NULL; -static tcp_pcb * pcb_recently_closed = NULL; +const int _number_of_closed_slots = 16; +static int _closed_index = 1; +static int _closed_slots[_number_of_closed_slots] = { 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1 }; static inline bool _init_async_event_queue(){ if(!_async_queue){ @@ -328,6 +330,7 @@ static int8_t _tcp_accept(void * arg, AsyncClient * client) { typedef struct { struct tcpip_api_call_data call; tcp_pcb * pcb; + int8_t closed_slot; int8_t err; union { struct { @@ -352,19 +355,19 @@ typedef struct { static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if(msg->pcb != pcb_recently_closed) { + if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) { msg->err = tcp_output(msg->pcb); } - pcb_recently_closed = NULL; return msg->err; } -static esp_err_t _tcp_output(tcp_pcb * pcb) { +static esp_err_t _tcp_output(tcp_pcb * pcb, int8_t closed_slot) { if(!pcb){ return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = closed_slot; tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data*)&msg); return msg.err; } @@ -372,19 +375,19 @@ static esp_err_t _tcp_output(tcp_pcb * pcb) { static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if(msg->pcb != pcb_recently_closed) { + if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) { msg->err = tcp_write(msg->pcb, msg->write.data, msg->write.size, msg->write.apiflags); } - pcb_recently_closed = NULL; return msg->err; } -static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_t apiflags) { +static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data, size_t size, uint8_t apiflags) { if(!pcb){ return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = closed_slot; msg.write.data = data; msg.write.size = size; msg.write.apiflags = apiflags; @@ -395,20 +398,20 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_ static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if(msg->pcb != pcb_recently_closed) { + if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) { msg->err = 0; tcp_recved(msg->pcb, msg->received); } - pcb_recently_closed = NULL; return msg->err; } -static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len) { +static esp_err_t _tcp_recved(tcp_pcb * pcb, int8_t closed_slot, size_t len) { if(!pcb){ return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = closed_slot; msg.received = len; tcpip_api_call(_tcp_recved_api, (struct tcpip_api_call_data*)&msg); return msg.err; @@ -417,19 +420,19 @@ static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len) { static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if(msg->pcb != pcb_recently_closed) { + if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) { msg->err = tcp_close(msg->pcb); } - pcb_recently_closed = NULL; return msg->err; } -static esp_err_t _tcp_close(tcp_pcb * pcb) { +static esp_err_t _tcp_close(tcp_pcb * pcb, int8_t closed_slot) { if(!pcb){ return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = closed_slot; tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data*)&msg); return msg.err; } @@ -437,19 +440,19 @@ static esp_err_t _tcp_close(tcp_pcb * pcb) { static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if(msg->pcb != pcb_recently_closed) { + if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) { tcp_abort(msg->pcb); } - pcb_recently_closed = NULL; return msg->err; } -static esp_err_t _tcp_abort(tcp_pcb * pcb) { +static esp_err_t _tcp_abort(tcp_pcb * pcb, int8_t closed_slot) { if(!pcb){ return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = closed_slot; tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call_data*)&msg); return msg.err; } @@ -460,12 +463,13 @@ static err_t _tcp_connect_api(struct tcpip_api_call_data *api_call_msg){ return msg->err; } -static esp_err_t _tcp_connect(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port, tcp_connected_fn cb) { +static esp_err_t _tcp_connect(tcp_pcb * pcb, int8_t closed_slot, ip_addr_t * addr, uint16_t port, tcp_connected_fn cb) { if(!pcb){ return ESP_FAIL; } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = closed_slot; msg.connect.addr = addr; msg.connect.port = port; msg.connect.cb = cb; @@ -485,6 +489,7 @@ static esp_err_t _tcp_bind(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port) { } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = -1; msg.bind.addr = addr; msg.bind.port = port; tcpip_api_call(_tcp_bind_api, (struct tcpip_api_call_data*)&msg); @@ -504,6 +509,7 @@ static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) { } tcp_api_call_t msg; msg.pcb = pcb; + msg.closed_slot = -1; msg.backlog = backlog?backlog:0xFF; tcpip_api_call(_tcp_listen_api, (struct tcpip_api_call_data*)&msg); return msg.pcb; @@ -541,7 +547,18 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) , next(NULL) { _pcb = pcb; + _closed_slot = -1; if(_pcb){ + _closed_slot = 0; + int closed_slot_min_index = _closed_slots[0]; + for (int i = 0; i < _number_of_closed_slots; ++ i) { + if (_closed_slots[i] <= closed_slot_min_index && _closed_slots[i] != 0) { + closed_slot_min_index = _closed_slots[i]; + _closed_slot = i; + } + } + _closed_slots[_closed_slot] = 0; + _rx_last_packet = millis(); tcp_arg(_pcb, this); tcp_recv(_pcb, &_tcp_recv); @@ -567,6 +584,7 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other){ } _pcb = other._pcb; + _closed_slot = other._closed_slot; if (_pcb) { _rx_last_packet = millis(); tcp_arg(_pcb, this); @@ -671,7 +689,7 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){ tcp_sent(pcb, &_tcp_sent); tcp_poll(pcb, &_tcp_poll, 1); //_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected); - _tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_tcp_connected); + _tcp_connect(pcb, _closed_slot, &addr, port,(tcp_connected_fn)&_tcp_connected); return true; } @@ -697,14 +715,14 @@ bool AsyncClient::connect(const char* host, uint16_t port){ void AsyncClient::close(bool now){ if(_pcb){ - _tcp_recved(_pcb, _rx_ack_len); + _tcp_recved(_pcb, _closed_slot, _rx_ack_len); } _close(); } int8_t AsyncClient::abort(){ if(_pcb) { - _tcp_abort(_pcb); + _tcp_abort(_pcb, _closed_slot ); _pcb = NULL; } return ERR_ABRT; @@ -727,7 +745,7 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) { } size_t will_send = (room < size) ? room : size; int8_t err = ERR_OK; - err = _tcp_write(_pcb, data, will_send, apiflags); + err = _tcp_write(_pcb, _closed_slot, data, will_send, apiflags); if(err != ERR_OK) { return 0; } @@ -736,7 +754,7 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) { bool AsyncClient::send(){ int8_t err = ERR_OK; - err = _tcp_output(_pcb); + err = _tcp_output(_pcb, _closed_slot); if(err == ERR_OK){ _pcb_busy = true; _pcb_sent_at = millis(); @@ -749,7 +767,7 @@ size_t AsyncClient::ack(size_t len){ if(len > _rx_ack_len) len = _rx_ack_len; if(len){ - _tcp_recved(_pcb, len); + _tcp_recved(_pcb, _closed_slot, len); } _rx_ack_len -= len; return len; @@ -759,7 +777,7 @@ void AsyncClient::ackPacket(struct pbuf * pb){ if(!pb){ return; } - _tcp_recved(_pcb, pb->len); + _tcp_recved(_pcb, _closed_slot, pb->len); pbuf_free(pb); } @@ -778,7 +796,7 @@ int8_t AsyncClient::_close(){ tcp_err(_pcb, NULL); tcp_poll(_pcb, NULL, 0); _tcp_clear_events(this); - err = _tcp_close(_pcb); + err = _tcp_close(_pcb, _closed_slot); if(err != ERR_OK) { err = abort(); } @@ -840,7 +858,8 @@ int8_t AsyncClient::_lwip_fin(tcp_pcb* pcb, int8_t err) { if(tcp_close(_pcb) != ERR_OK) { tcp_abort(_pcb); } - pcb_recently_closed = _pcb; + _closed_slots[_closed_slot] = _closed_index; + ++ _closed_index; _pcb = NULL; return ERR_OK; } @@ -881,7 +900,7 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { if(!_ack_pcb) { _rx_ack_len += b->len; } else if(_pcb) { - _tcp_recved(_pcb, b->len); + _tcp_recved(_pcb, _closed_slot, b->len); } pbuf_free(b); } @@ -1228,7 +1247,7 @@ void AsyncServer::begin(){ err = _tcp_bind(_pcb, &local_addr, _port); if (err != ERR_OK) { - _tcp_close(_pcb); + _tcp_close(_pcb, -1); log_e("bind error: %d", err); return; } @@ -1247,7 +1266,7 @@ void AsyncServer::end(){ if(_pcb){ tcp_arg(_pcb, NULL); tcp_accept(_pcb, NULL); - _tcp_abort(_pcb); + _tcp_abort(_pcb, -1); _pcb = NULL; } } diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index 05650fc..fef890b 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -141,6 +141,7 @@ class AsyncClient { protected: tcp_pcb* _pcb; + int8_t _closed_slot; AcConnectHandler _connect_cb; void* _connect_cb_arg; From 15356c330089869a05a3453cf46a8b42cd840f3d Mon Sep 17 00:00:00 2001 From: Matt Date: Mon, 23 Sep 2019 20:08:33 +0100 Subject: [PATCH 2/3] Fix closed_slots initialization. --- src/AsyncTCP.cpp | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 6c4f955..2d0917b 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -79,8 +79,8 @@ typedef struct { static xQueueHandle _async_queue; static TaskHandle_t _async_service_task_handle = NULL; const int _number_of_closed_slots = 16; -static int _closed_index = 1; -static int _closed_slots[_number_of_closed_slots] = { 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1 }; +static int _closed_index = 0; +static int _closed_slots[_number_of_closed_slots]; static inline bool _init_async_event_queue(){ if(!_async_queue){ @@ -555,11 +555,18 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) _closed_slot = -1; if(_pcb){ _closed_slot = 0; - int closed_slot_min_index = _closed_slots[0]; - for (int i = 0; i < _number_of_closed_slots; ++ i) { - if (_closed_slots[i] <= closed_slot_min_index && _closed_slots[i] != 0) { - closed_slot_min_index = _closed_slots[i]; - _closed_slot = i; + if (_closed_index == 0) { + _closed_index = 1; + for (int i = 0; i < _number_of_closed_slots; ++ i) { + _closed_slots[i] = 1; + } + } else { + int closed_slot_min_index = _closed_slots[0]; + for (int i = 0; i < _number_of_closed_slots; ++ i) { + if (_closed_slots[i] <= closed_slot_min_index && _closed_slots[i] != 0) { + closed_slot_min_index = _closed_slots[i]; + _closed_slot = i; + } } } _closed_slots[_closed_slot] = 0; From 13361c8da67ad737393fd32bea1e02fe0b32b281 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 24 Sep 2019 17:26:59 +0100 Subject: [PATCH 3/3] Use the system defined parameter to set the size of the closed_slots buffer. --- src/AsyncTCP.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index e11d9aa..5259400 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -78,7 +78,7 @@ typedef struct { static xQueueHandle _async_queue; static TaskHandle_t _async_service_task_handle = NULL; -const int _number_of_closed_slots = 16; +const int _number_of_closed_slots = CONFIG_LWIP_MAX_ACTIVE_TCP; static int _closed_index = 0; static int _closed_slots[_number_of_closed_slots];