Allow for multiple close events.

This commit is contained in:
Matt
2019-09-23 19:41:29 +01:00
parent 911414ee98
commit 9a4a58a0db
2 changed files with 49 additions and 29 deletions

View File

@@ -78,7 +78,9 @@ typedef struct {
static xQueueHandle _async_queue; static xQueueHandle _async_queue;
static TaskHandle_t _async_service_task_handle = NULL; static TaskHandle_t _async_service_task_handle = NULL;
static tcp_pcb * pcb_recently_closed = NULL; const int _number_of_closed_slots = 16;
static int _closed_index = 1;
static int _closed_slots[_number_of_closed_slots] = { 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1 };
static inline bool _init_async_event_queue(){ static inline bool _init_async_event_queue(){
if(!_async_queue){ if(!_async_queue){
@@ -328,6 +330,7 @@ static int8_t _tcp_accept(void * arg, AsyncClient * client) {
typedef struct { typedef struct {
struct tcpip_api_call_data call; struct tcpip_api_call_data call;
tcp_pcb * pcb; tcp_pcb * pcb;
int8_t closed_slot;
int8_t err; int8_t err;
union { union {
struct { struct {
@@ -352,19 +355,19 @@ typedef struct {
static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg){ static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = ERR_CONN; msg->err = ERR_CONN;
if(msg->pcb != pcb_recently_closed) { if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) {
msg->err = tcp_output(msg->pcb); msg->err = tcp_output(msg->pcb);
} }
pcb_recently_closed = NULL;
return msg->err; return msg->err;
} }
static esp_err_t _tcp_output(tcp_pcb * pcb) { static esp_err_t _tcp_output(tcp_pcb * pcb, int8_t closed_slot) {
if(!pcb){ if(!pcb){
return ERR_CONN; return ERR_CONN;
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = closed_slot;
tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data*)&msg); tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data*)&msg);
return msg.err; return msg.err;
} }
@@ -372,19 +375,19 @@ static esp_err_t _tcp_output(tcp_pcb * pcb) {
static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg){ static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = ERR_CONN; msg->err = ERR_CONN;
if(msg->pcb != pcb_recently_closed) { if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) {
msg->err = tcp_write(msg->pcb, msg->write.data, msg->write.size, msg->write.apiflags); msg->err = tcp_write(msg->pcb, msg->write.data, msg->write.size, msg->write.apiflags);
} }
pcb_recently_closed = NULL;
return msg->err; return msg->err;
} }
static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_t apiflags) { static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data, size_t size, uint8_t apiflags) {
if(!pcb){ if(!pcb){
return ERR_CONN; return ERR_CONN;
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = closed_slot;
msg.write.data = data; msg.write.data = data;
msg.write.size = size; msg.write.size = size;
msg.write.apiflags = apiflags; msg.write.apiflags = apiflags;
@@ -395,20 +398,20 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_
static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){ static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = ERR_CONN; msg->err = ERR_CONN;
if(msg->pcb != pcb_recently_closed) { if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) {
msg->err = 0; msg->err = 0;
tcp_recved(msg->pcb, msg->received); tcp_recved(msg->pcb, msg->received);
} }
pcb_recently_closed = NULL;
return msg->err; return msg->err;
} }
static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len) { static esp_err_t _tcp_recved(tcp_pcb * pcb, int8_t closed_slot, size_t len) {
if(!pcb){ if(!pcb){
return ERR_CONN; return ERR_CONN;
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = closed_slot;
msg.received = len; msg.received = len;
tcpip_api_call(_tcp_recved_api, (struct tcpip_api_call_data*)&msg); tcpip_api_call(_tcp_recved_api, (struct tcpip_api_call_data*)&msg);
return msg.err; return msg.err;
@@ -417,19 +420,19 @@ static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len) {
static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg){ static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = ERR_CONN; msg->err = ERR_CONN;
if(msg->pcb != pcb_recently_closed) { if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) {
msg->err = tcp_close(msg->pcb); msg->err = tcp_close(msg->pcb);
} }
pcb_recently_closed = NULL;
return msg->err; return msg->err;
} }
static esp_err_t _tcp_close(tcp_pcb * pcb) { static esp_err_t _tcp_close(tcp_pcb * pcb, int8_t closed_slot) {
if(!pcb){ if(!pcb){
return ERR_CONN; return ERR_CONN;
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = closed_slot;
tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data*)&msg); tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data*)&msg);
return msg.err; return msg.err;
} }
@@ -437,19 +440,19 @@ static esp_err_t _tcp_close(tcp_pcb * pcb) {
static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg){ static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = ERR_CONN; msg->err = ERR_CONN;
if(msg->pcb != pcb_recently_closed) { if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) {
tcp_abort(msg->pcb); tcp_abort(msg->pcb);
} }
pcb_recently_closed = NULL;
return msg->err; return msg->err;
} }
static esp_err_t _tcp_abort(tcp_pcb * pcb) { static esp_err_t _tcp_abort(tcp_pcb * pcb, int8_t closed_slot) {
if(!pcb){ if(!pcb){
return ERR_CONN; return ERR_CONN;
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = closed_slot;
tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call_data*)&msg); tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call_data*)&msg);
return msg.err; return msg.err;
} }
@@ -460,12 +463,13 @@ static err_t _tcp_connect_api(struct tcpip_api_call_data *api_call_msg){
return msg->err; return msg->err;
} }
static esp_err_t _tcp_connect(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port, tcp_connected_fn cb) { static esp_err_t _tcp_connect(tcp_pcb * pcb, int8_t closed_slot, ip_addr_t * addr, uint16_t port, tcp_connected_fn cb) {
if(!pcb){ if(!pcb){
return ESP_FAIL; return ESP_FAIL;
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = closed_slot;
msg.connect.addr = addr; msg.connect.addr = addr;
msg.connect.port = port; msg.connect.port = port;
msg.connect.cb = cb; msg.connect.cb = cb;
@@ -485,6 +489,7 @@ static esp_err_t _tcp_bind(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port) {
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = -1;
msg.bind.addr = addr; msg.bind.addr = addr;
msg.bind.port = port; msg.bind.port = port;
tcpip_api_call(_tcp_bind_api, (struct tcpip_api_call_data*)&msg); tcpip_api_call(_tcp_bind_api, (struct tcpip_api_call_data*)&msg);
@@ -504,6 +509,7 @@ static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) {
} }
tcp_api_call_t msg; tcp_api_call_t msg;
msg.pcb = pcb; msg.pcb = pcb;
msg.closed_slot = -1;
msg.backlog = backlog?backlog:0xFF; msg.backlog = backlog?backlog:0xFF;
tcpip_api_call(_tcp_listen_api, (struct tcpip_api_call_data*)&msg); tcpip_api_call(_tcp_listen_api, (struct tcpip_api_call_data*)&msg);
return msg.pcb; return msg.pcb;
@@ -541,7 +547,18 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
, next(NULL) , next(NULL)
{ {
_pcb = pcb; _pcb = pcb;
_closed_slot = -1;
if(_pcb){ if(_pcb){
_closed_slot = 0;
int closed_slot_min_index = _closed_slots[0];
for (int i = 0; i < _number_of_closed_slots; ++ i) {
if (_closed_slots[i] <= closed_slot_min_index && _closed_slots[i] != 0) {
closed_slot_min_index = _closed_slots[i];
_closed_slot = i;
}
}
_closed_slots[_closed_slot] = 0;
_rx_last_packet = millis(); _rx_last_packet = millis();
tcp_arg(_pcb, this); tcp_arg(_pcb, this);
tcp_recv(_pcb, &_tcp_recv); tcp_recv(_pcb, &_tcp_recv);
@@ -567,6 +584,7 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other){
} }
_pcb = other._pcb; _pcb = other._pcb;
_closed_slot = other._closed_slot;
if (_pcb) { if (_pcb) {
_rx_last_packet = millis(); _rx_last_packet = millis();
tcp_arg(_pcb, this); tcp_arg(_pcb, this);
@@ -671,7 +689,7 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){
tcp_sent(pcb, &_tcp_sent); tcp_sent(pcb, &_tcp_sent);
tcp_poll(pcb, &_tcp_poll, 1); tcp_poll(pcb, &_tcp_poll, 1);
//_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected); //_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected);
_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_tcp_connected); _tcp_connect(pcb, _closed_slot, &addr, port,(tcp_connected_fn)&_tcp_connected);
return true; return true;
} }
@@ -697,14 +715,14 @@ bool AsyncClient::connect(const char* host, uint16_t port){
void AsyncClient::close(bool now){ void AsyncClient::close(bool now){
if(_pcb){ if(_pcb){
_tcp_recved(_pcb, _rx_ack_len); _tcp_recved(_pcb, _closed_slot, _rx_ack_len);
} }
_close(); _close();
} }
int8_t AsyncClient::abort(){ int8_t AsyncClient::abort(){
if(_pcb) { if(_pcb) {
_tcp_abort(_pcb); _tcp_abort(_pcb, _closed_slot );
_pcb = NULL; _pcb = NULL;
} }
return ERR_ABRT; return ERR_ABRT;
@@ -727,7 +745,7 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) {
} }
size_t will_send = (room < size) ? room : size; size_t will_send = (room < size) ? room : size;
int8_t err = ERR_OK; int8_t err = ERR_OK;
err = _tcp_write(_pcb, data, will_send, apiflags); err = _tcp_write(_pcb, _closed_slot, data, will_send, apiflags);
if(err != ERR_OK) { if(err != ERR_OK) {
return 0; return 0;
} }
@@ -736,7 +754,7 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) {
bool AsyncClient::send(){ bool AsyncClient::send(){
int8_t err = ERR_OK; int8_t err = ERR_OK;
err = _tcp_output(_pcb); err = _tcp_output(_pcb, _closed_slot);
if(err == ERR_OK){ if(err == ERR_OK){
_pcb_busy = true; _pcb_busy = true;
_pcb_sent_at = millis(); _pcb_sent_at = millis();
@@ -749,7 +767,7 @@ size_t AsyncClient::ack(size_t len){
if(len > _rx_ack_len) if(len > _rx_ack_len)
len = _rx_ack_len; len = _rx_ack_len;
if(len){ if(len){
_tcp_recved(_pcb, len); _tcp_recved(_pcb, _closed_slot, len);
} }
_rx_ack_len -= len; _rx_ack_len -= len;
return len; return len;
@@ -759,7 +777,7 @@ void AsyncClient::ackPacket(struct pbuf * pb){
if(!pb){ if(!pb){
return; return;
} }
_tcp_recved(_pcb, pb->len); _tcp_recved(_pcb, _closed_slot, pb->len);
pbuf_free(pb); pbuf_free(pb);
} }
@@ -778,7 +796,7 @@ int8_t AsyncClient::_close(){
tcp_err(_pcb, NULL); tcp_err(_pcb, NULL);
tcp_poll(_pcb, NULL, 0); tcp_poll(_pcb, NULL, 0);
_tcp_clear_events(this); _tcp_clear_events(this);
err = _tcp_close(_pcb); err = _tcp_close(_pcb, _closed_slot);
if(err != ERR_OK) { if(err != ERR_OK) {
err = abort(); err = abort();
} }
@@ -840,7 +858,8 @@ int8_t AsyncClient::_lwip_fin(tcp_pcb* pcb, int8_t err) {
if(tcp_close(_pcb) != ERR_OK) { if(tcp_close(_pcb) != ERR_OK) {
tcp_abort(_pcb); tcp_abort(_pcb);
} }
pcb_recently_closed = _pcb; _closed_slots[_closed_slot] = _closed_index;
++ _closed_index;
_pcb = NULL; _pcb = NULL;
return ERR_OK; return ERR_OK;
} }
@@ -881,7 +900,7 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
if(!_ack_pcb) { if(!_ack_pcb) {
_rx_ack_len += b->len; _rx_ack_len += b->len;
} else if(_pcb) { } else if(_pcb) {
_tcp_recved(_pcb, b->len); _tcp_recved(_pcb, _closed_slot, b->len);
} }
pbuf_free(b); pbuf_free(b);
} }
@@ -1228,7 +1247,7 @@ void AsyncServer::begin(){
err = _tcp_bind(_pcb, &local_addr, _port); err = _tcp_bind(_pcb, &local_addr, _port);
if (err != ERR_OK) { if (err != ERR_OK) {
_tcp_close(_pcb); _tcp_close(_pcb, -1);
log_e("bind error: %d", err); log_e("bind error: %d", err);
return; return;
} }
@@ -1247,7 +1266,7 @@ void AsyncServer::end(){
if(_pcb){ if(_pcb){
tcp_arg(_pcb, NULL); tcp_arg(_pcb, NULL);
tcp_accept(_pcb, NULL); tcp_accept(_pcb, NULL);
_tcp_abort(_pcb); _tcp_abort(_pcb, -1);
_pcb = NULL; _pcb = NULL;
} }
} }

View File

@@ -141,6 +141,7 @@ class AsyncClient {
protected: protected:
tcp_pcb* _pcb; tcp_pcb* _pcb;
int8_t _closed_slot;
AcConnectHandler _connect_cb; AcConnectHandler _connect_cb;
void* _connect_cb_arg; void* _connect_cb_arg;