diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 099d50b..b92974e 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -127,7 +127,6 @@ static bool _remove_events_with_arg(void * arg){ return false; } if((int)packet->arg == (int)arg){ - //ets_printf("X: 0x%08x\n", (uint32_t)packet->arg); free(packet); packet = NULL; } else if(xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS){ @@ -142,22 +141,18 @@ static void _handle_async_event(lwip_event_packet_t * e){ _remove_events_with_arg(e->arg); } else if(e->event == LWIP_TCP_RECV){ //ets_printf("-R: 0x%08x\n", e->recv.pcb); - //ets_printf("R: 0x%08x 0x%08x %d\n", e->arg, e->recv.pcb, e->recv.err); AsyncClient::_s_recv(e->arg, e->recv.pcb, e->recv.pb, e->recv.err); } else if(e->event == LWIP_TCP_FIN){ //ets_printf("-F: 0x%08x\n", e->fin.pcb); - //ets_printf("F: 0x%08x 0x%08x %d\n", e->arg, e->fin.pcb, e->fin.err); AsyncClient::_s_fin(e->arg, e->fin.pcb, e->fin.err); } else if(e->event == LWIP_TCP_SENT){ //ets_printf("-S: 0x%08x\n", e->sent.pcb); - //ets_printf("S: 0x%08x 0x%08x\n", e->arg, e->sent.pcb); AsyncClient::_s_sent(e->arg, e->sent.pcb, e->sent.len); } else if(e->event == LWIP_TCP_POLL){ //ets_printf("-P: 0x%08x\n", e->poll.pcb); - //ets_printf("P: 0x%08x 0x%08x\n", e->arg, e->poll.pcb); AsyncClient::_s_poll(e->arg, e->poll.pcb); } else if(e->event == LWIP_TCP_ERROR){ - //ets_printf("E: 0x%08x %d\n", e->arg, e->error.err); + //ets_printf("-E: 0x%08x %d\n", e->arg, e->error.err); AsyncClient::_s_error(e->arg, e->error.err); } else if(e->event == LWIP_TCP_CONNECTED){ //ets_printf("C: 0x%08x 0x%08x %d\n", e->arg, e->connected.pcb, e->connected.err); @@ -352,8 +347,8 @@ typedef struct { static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; - msg->err = 0; - if(msg->client && msg->client->pcb()){ + msg->err = ERR_CONN; + if(msg->client && msg->client->pcb() == msg->pcb){ msg->err = tcp_output(msg->pcb); } return msg->err; @@ -361,8 +356,7 @@ static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg){ static esp_err_t _tcp_output(tcp_pcb * pcb, AsyncClient * client) { if(!pcb){ - log_w("pcb is NULL"); - return ESP_FAIL; + return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; @@ -373,8 +367,8 @@ static esp_err_t _tcp_output(tcp_pcb * pcb, AsyncClient * client) { static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; - msg->err = 0; - if(msg->client && msg->client->pcb()){ + msg->err = ERR_CONN; + if(msg->client && msg->client->pcb() == msg->pcb){ msg->err = tcp_write(msg->pcb, msg->write.data, msg->write.size, msg->write.apiflags); } return msg->err; @@ -382,8 +376,7 @@ static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg){ static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_t apiflags, AsyncClient * client) { if(!pcb){ - log_w("pcb is NULL"); - return ESP_FAIL; + return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; @@ -397,8 +390,9 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_ static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; - msg->err = 0; - if(msg->client && msg->client->pcb()){ + msg->err = ERR_CONN; + if(msg->client && msg->client->pcb() == msg->pcb){ + msg->err = 0; tcp_recved(msg->pcb, msg->received); } return msg->err; @@ -406,8 +400,7 @@ static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){ static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len, AsyncClient * client) { if(!pcb){ - log_w("pcb is NULL"); - return ESP_FAIL; + return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; @@ -419,8 +412,8 @@ static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len, AsyncClient * client) { static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; - msg->err = 0; - if(!msg->client || msg->client->pcb()){ + msg->err = ERR_CONN; + if(!msg->client || msg->client->pcb() == msg->pcb){ msg->err = tcp_close(msg->pcb); } return msg->err; @@ -428,8 +421,7 @@ static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg){ static esp_err_t _tcp_close(tcp_pcb * pcb, AsyncClient * client) { if(!pcb){ - log_w("pcb is NULL"); - return ESP_FAIL; + return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; @@ -440,8 +432,8 @@ static esp_err_t _tcp_close(tcp_pcb * pcb, AsyncClient * client) { static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; - msg->err = 0; - if(!msg->client || msg->client->pcb()){ + msg->err = ERR_CONN; + if(!msg->client || msg->client->pcb() == msg->pcb){ tcp_abort(msg->pcb); } return msg->err; @@ -449,8 +441,7 @@ static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg){ static esp_err_t _tcp_abort(tcp_pcb * pcb, AsyncClient * client) { if(!pcb){ - log_w("pcb is NULL"); - return ESP_FAIL; + return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; @@ -467,7 +458,6 @@ static err_t _tcp_connect_api(struct tcpip_api_call_data *api_call_msg){ static esp_err_t _tcp_connect(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port, tcp_connected_fn cb) { if(!pcb){ - log_w("pcb is NULL"); return ESP_FAIL; } tcp_api_call_t msg; @@ -487,7 +477,6 @@ static err_t _tcp_bind_api(struct tcpip_api_call_data *api_call_msg){ static esp_err_t _tcp_bind(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port) { if(!pcb){ - log_w("pcb is NULL"); return ESP_FAIL; } tcp_api_call_t msg; @@ -507,7 +496,6 @@ static err_t _tcp_listen_api(struct tcpip_api_call_data *api_call_msg){ static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) { if(!pcb){ - log_w("pcb is NULL"); return NULL; } tcp_api_call_t msg; @@ -565,6 +553,10 @@ AsyncClient::~AsyncClient(){ } } +/* + * Operators + * */ + AsyncClient& AsyncClient::operator=(const AsyncClient& other){ if (_pcb) { _close(); @@ -582,7 +574,72 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other){ return *this; } -// Methods +bool AsyncClient::operator==(const AsyncClient &other) { + return _pcb == other._pcb; +} + +AsyncClient & AsyncClient::operator+=(const AsyncClient &other) { + if(next == NULL){ + next = (AsyncClient*)(&other); + next->prev = this; + } else { + AsyncClient *c = next; + while(c->next != NULL) { + c = c->next; + } + c->next =(AsyncClient*)(&other); + c->next->prev = c; + } + return *this; +} + +/* + * Callback Setters + * */ + +void AsyncClient::onConnect(AcConnectHandler cb, void* arg){ + _connect_cb = cb; + _connect_cb_arg = arg; +} + +void AsyncClient::onDisconnect(AcConnectHandler cb, void* arg){ + _discard_cb = cb; + _discard_cb_arg = arg; +} + +void AsyncClient::onAck(AcAckHandler cb, void* arg){ + _sent_cb = cb; + _sent_cb_arg = arg; +} + +void AsyncClient::onError(AcErrorHandler cb, void* arg){ + _error_cb = cb; + _error_cb_arg = arg; +} + +void AsyncClient::onData(AcDataHandler cb, void* arg){ + _recv_cb = cb; + _recv_cb_arg = arg; +} + +void AsyncClient::onPacket(AcPacketHandler cb, void* arg){ + _pb_cb = cb; + _pb_cb_arg = arg; +} + +void AsyncClient::onTimeout(AcTimeoutHandler cb, void* arg){ + _timeout_cb = cb; + _timeout_cb_arg = arg; +} + +void AsyncClient::onPoll(AcConnectHandler cb, void* arg){ + _poll_cb = cb; + _poll_cb_arg = arg; +} + +/* + * Main Public Methods + * */ bool AsyncClient::connect(IPAddress ip, uint16_t port){ if (_pcb){ @@ -614,6 +671,91 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){ return true; } +bool AsyncClient::connect(const char* host, uint16_t port){ + ip_addr_t addr; + err_t err = dns_gethostbyname(host, &addr, (dns_found_callback)&_tcp_dns_found, this); + if(err == ERR_OK) { + return connect(IPAddress(addr.u_addr.ip4.addr), port); + } else if(err == ERR_INPROGRESS) { + _connect_port = port; + return true; + } + log_e("error: %d", err); + return false; +} + +void AsyncClient::close(bool now){ + if(_pcb){ + _tcp_recved(_pcb, _rx_ack_len, this); + } + _close(); +} + +int8_t AsyncClient::abort(){ + if(_pcb) { + _tcp_abort(_pcb, this); + _pcb = NULL; + } + return ERR_ABRT; +} + +size_t AsyncClient::space(){ + if((_pcb != NULL) && (_pcb->state == 4)){ + return tcp_sndbuf(_pcb); + } + return 0; +} + +size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) { + if(!_pcb || size == 0 || data == NULL) { + return 0; + } + size_t room = space(); + if(!room) { + return 0; + } + size_t will_send = (room < size) ? room : size; + int8_t err = ERR_OK; + err = _tcp_write(_pcb, data, will_send, apiflags, this); + if(err != ERR_OK) { + return 0; + } + return will_send; +} + +bool AsyncClient::send(){ + int8_t err = ERR_OK; + err = _tcp_output(_pcb, this); + if(err == ERR_OK){ + _pcb_busy = true; + _pcb_sent_at = millis(); + return true; + } + return false; +} + +size_t AsyncClient::ack(size_t len){ + if(len > _rx_ack_len) + len = _rx_ack_len; + if(len){ + _tcp_recved(_pcb, len, this); + } + _rx_ack_len -= len; + return len; +} + +void AsyncClient::ackPacket(struct pbuf * pb){ + if(!pb){ + return; + } + _tcp_recved(_pcb, pb->len, this); + pbuf_free(pb); +} + +/* + * Main Private Methods + * */ + int8_t AsyncClient::_close(){ //ets_printf("X: 0x%08x\n", (uint32_t)this); int8_t err = ERR_OK; @@ -637,6 +779,10 @@ int8_t AsyncClient::_close(){ return err; } +/* + * Private Callbacks + * */ + int8_t AsyncClient::_connected(void* pcb, int8_t err){ _pcb = reinterpret_cast(pcb); if(_pcb){ @@ -669,16 +815,6 @@ void AsyncClient::_error(int8_t err) { } } -int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { - _rx_last_packet = millis(); - //log_i("%u", len); - _pcb_busy = false; - if(_sent_cb) { - _sent_cb(_sent_cb_arg, this, len, (millis() - _pcb_sent_at)); - } - return ERR_OK; -} - //In LwIP Thread int8_t AsyncClient::_lwip_fin(tcp_pcb* pcb, int8_t err) { if(!_pcb || pcb != _pcb){ @@ -706,6 +842,16 @@ int8_t AsyncClient::_fin(tcp_pcb* pcb, int8_t err) { return ERR_OK; } +int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { + _rx_last_packet = millis(); + //log_i("%u", len); + _pcb_busy = false; + if(_sent_cb) { + _sent_cb(_sent_cb_arg, this, len, (millis() - _pcb_sent_at)); + } + return ERR_OK; +} + int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { while(pb != NULL) { _rx_last_packet = millis(); @@ -777,33 +923,9 @@ void AsyncClient::_dns_found(struct ip_addr *ipaddr){ } } -bool AsyncClient::connect(const char* host, uint16_t port){ - ip_addr_t addr; - err_t err = dns_gethostbyname(host, &addr, (dns_found_callback)&_tcp_dns_found, this); - if(err == ERR_OK) { - return connect(IPAddress(addr.u_addr.ip4.addr), port); - } else if(err == ERR_INPROGRESS) { - _connect_port = port; - return true; - } - log_e("error: %d", err); - return false; -} - -int8_t AsyncClient::abort(){ - if(_pcb) { - _tcp_abort(_pcb, this); - _pcb = NULL; - } - return ERR_ABRT; -} - -void AsyncClient::close(bool now){ - if(_pcb){ - _tcp_recved(_pcb, _rx_ack_len, this); - } - _close(); -} +/* + * Public Helper Methods + * */ void AsyncClient::stop() { close(false); @@ -819,13 +941,6 @@ bool AsyncClient::free(){ return false; } -size_t AsyncClient::space(){ - if((_pcb != NULL) && (_pcb->state == 4)){ - return tcp_sndbuf(_pcb); - } - return 0; -} - size_t AsyncClient::write(const char* data) { if(data == NULL) { return 0; @@ -841,45 +956,6 @@ size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) { return will_send; } - -size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) { - if(!_pcb || size == 0 || data == NULL) { - return 0; - } - size_t room = space(); - if(!room) { - return 0; - } - size_t will_send = (room < size) ? room : size; - int8_t err = ERR_OK; - err = _tcp_write(_pcb, data, will_send, apiflags, this); - if(err != ERR_OK) { - return 0; - } - return will_send; -} - -bool AsyncClient::send(){ - int8_t err = ERR_OK; - err = _tcp_output(_pcb, this); - if(err == ERR_OK){ - _pcb_busy = true; - _pcb_sent_at = millis(); - return true; - } - return false; -} - -size_t AsyncClient::ack(size_t len){ - if(len > _rx_ack_len) - len = _rx_ack_len; - if(len){ - _tcp_recved(_pcb, len, this); - } - _rx_ack_len -= len; - return len; -} - void AsyncClient::setRxTimeout(uint32_t timeout){ _rx_since_timeout = timeout; } @@ -1011,111 +1087,6 @@ bool AsyncClient::canSend(){ return space() > 0; } -void AsyncClient::ackPacket(struct pbuf * pb){ - if(!pb){ - return; - } - _tcp_recved(_pcb, pb->len, this); - pbuf_free(pb); -} - - -// Operators - -bool AsyncClient::operator==(const AsyncClient &other) { - return _pcb == other._pcb; -} - -AsyncClient & AsyncClient::operator+=(const AsyncClient &other) { - if(next == NULL){ - next = (AsyncClient*)(&other); - next->prev = this; - } else { - AsyncClient *c = next; - while(c->next != NULL) { - c = c->next; - } - c->next =(AsyncClient*)(&other); - c->next->prev = c; - } - return *this; -} - -// Callback Setters - -void AsyncClient::onConnect(AcConnectHandler cb, void* arg){ - _connect_cb = cb; - _connect_cb_arg = arg; -} - -void AsyncClient::onDisconnect(AcConnectHandler cb, void* arg){ - _discard_cb = cb; - _discard_cb_arg = arg; -} - -void AsyncClient::onAck(AcAckHandler cb, void* arg){ - _sent_cb = cb; - _sent_cb_arg = arg; -} - -void AsyncClient::onError(AcErrorHandler cb, void* arg){ - _error_cb = cb; - _error_cb_arg = arg; -} - -void AsyncClient::onData(AcDataHandler cb, void* arg){ - _recv_cb = cb; - _recv_cb_arg = arg; -} - -void AsyncClient::onPacket(AcPacketHandler cb, void* arg){ - _pb_cb = cb; - _pb_cb_arg = arg; -} - -void AsyncClient::onTimeout(AcTimeoutHandler cb, void* arg){ - _timeout_cb = cb; - _timeout_cb_arg = arg; -} - -void AsyncClient::onPoll(AcConnectHandler cb, void* arg){ - _poll_cb = cb; - _poll_cb_arg = arg; -} - - -void AsyncClient::_s_dns_found(const char * name, struct ip_addr * ipaddr, void * arg){ - reinterpret_cast(arg)->_dns_found(ipaddr); -} - -int8_t AsyncClient::_s_poll(void * arg, struct tcp_pcb * pcb) { - return reinterpret_cast(arg)->_poll(pcb); -} - -int8_t AsyncClient::_s_recv(void * arg, struct tcp_pcb * pcb, struct pbuf *pb, int8_t err) { - return reinterpret_cast(arg)->_recv(pcb, pb, err); -} - -int8_t AsyncClient::_s_fin(void * arg, struct tcp_pcb * pcb, int8_t err) { - return reinterpret_cast(arg)->_fin(pcb, err); -} - -int8_t AsyncClient::_s_lwip_fin(void * arg, struct tcp_pcb * pcb, int8_t err) { - return reinterpret_cast(arg)->_lwip_fin(pcb, err); -} - -int8_t AsyncClient::_s_sent(void * arg, struct tcp_pcb * pcb, uint16_t len) { - return reinterpret_cast(arg)->_sent(pcb, len); -} - -void AsyncClient::_s_error(void * arg, int8_t err) { - reinterpret_cast(arg)->_error(err); -} - -int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){ - return reinterpret_cast(arg)->_connected(pcb, err); -} - const char * AsyncClient::errorToString(int8_t error){ switch(error){ case 0: return "OK"; @@ -1156,14 +1127,45 @@ const char * AsyncClient::stateToString(){ } } +/* + * Static Callbacks (LwIP C2C++ interconnect) + * */ + +void AsyncClient::_s_dns_found(const char * name, struct ip_addr * ipaddr, void * arg){ + reinterpret_cast(arg)->_dns_found(ipaddr); +} + +int8_t AsyncClient::_s_poll(void * arg, struct tcp_pcb * pcb) { + return reinterpret_cast(arg)->_poll(pcb); +} + +int8_t AsyncClient::_s_recv(void * arg, struct tcp_pcb * pcb, struct pbuf *pb, int8_t err) { + return reinterpret_cast(arg)->_recv(pcb, pb, err); +} + +int8_t AsyncClient::_s_fin(void * arg, struct tcp_pcb * pcb, int8_t err) { + return reinterpret_cast(arg)->_fin(pcb, err); +} + +int8_t AsyncClient::_s_lwip_fin(void * arg, struct tcp_pcb * pcb, int8_t err) { + return reinterpret_cast(arg)->_lwip_fin(pcb, err); +} + +int8_t AsyncClient::_s_sent(void * arg, struct tcp_pcb * pcb, uint16_t len) { + return reinterpret_cast(arg)->_sent(pcb, len); +} + +void AsyncClient::_s_error(void * arg, int8_t err) { + reinterpret_cast(arg)->_error(err); +} + +int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){ + return reinterpret_cast(arg)->_connected(pcb, err); +} + /* Async TCP Server */ -struct pending_pcb { - tcp_pcb* pcb; - pbuf *pb; - struct pending_pcb * next; -}; AsyncServer::AsyncServer(IPAddress addr, uint16_t port) : _port(port) @@ -1192,43 +1194,6 @@ void AsyncServer::onClient(AcConnectHandler cb, void* arg){ _connect_cb_arg = arg; } -int8_t AsyncServer::_s_accept(void * arg, tcp_pcb * pcb, int8_t err){ - return reinterpret_cast(arg)->_accept(pcb, err); -} - -int8_t AsyncServer::_s_accepted(void *arg, AsyncClient* client){ - return reinterpret_cast(arg)->_accepted(client); -} - -//runs on LwIP thread -int8_t AsyncServer::_accept(tcp_pcb* pcb, int8_t err){ - //ets_printf("+A: 0x%08x\n", pcb); - if(_connect_cb){ - if (_noDelay) { - tcp_nagle_disable(pcb); - } else { - tcp_nagle_enable(pcb); - } - - AsyncClient *c = new AsyncClient(pcb); - if(c){ - return _tcp_accept(this, c); - } - } - if(tcp_close(pcb) != ERR_OK){ - tcp_abort(pcb); - } - log_e("FAIL"); - return ERR_OK; -} - -int8_t AsyncServer::_accepted(AsyncClient* client){ - if(_connect_cb){ - _connect_cb(_connect_cb_arg, client); - } - return ERR_OK; -} - void AsyncServer::begin(){ if(_pcb) { return; @@ -1275,6 +1240,30 @@ void AsyncServer::end(){ } } +//runs on LwIP thread +int8_t AsyncServer::_accept(tcp_pcb* pcb, int8_t err){ + //ets_printf("+A: 0x%08x\n", pcb); + if(_connect_cb){ + AsyncClient *c = new AsyncClient(pcb); + if(c){ + c->setNoDelay(_noDelay); + return _tcp_accept(this, c); + } + } + if(tcp_close(pcb) != ERR_OK){ + tcp_abort(pcb); + } + log_e("FAIL"); + return ERR_OK; +} + +int8_t AsyncServer::_accepted(AsyncClient* client){ + if(_connect_cb){ + _connect_cb(_connect_cb_arg, client); + } + return ERR_OK; +} + void AsyncServer::setNoDelay(bool nodelay){ _noDelay = nodelay; } @@ -1289,3 +1278,11 @@ uint8_t AsyncServer::status(){ } return _pcb->state; } + +int8_t AsyncServer::_s_accept(void * arg, tcp_pcb * pcb, int8_t err){ + return reinterpret_cast(arg)->_accept(pcb, err); +} + +int8_t AsyncServer::_s_accepted(void *arg, AsyncClient* client){ + return reinterpret_cast(arg)->_accepted(client); +}