Code cleanup and restructure

This commit is contained in:
me-no-dev
2019-06-24 13:20:23 +02:00
parent 4b20e84027
commit 1ad1280623

View File

@@ -127,7 +127,6 @@ static bool _remove_events_with_arg(void * arg){
return false;
}
if((int)packet->arg == (int)arg){
//ets_printf("X: 0x%08x\n", (uint32_t)packet->arg);
free(packet);
packet = NULL;
} else if(xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS){
@@ -142,22 +141,18 @@ static void _handle_async_event(lwip_event_packet_t * e){
_remove_events_with_arg(e->arg);
} else if(e->event == LWIP_TCP_RECV){
//ets_printf("-R: 0x%08x\n", e->recv.pcb);
//ets_printf("R: 0x%08x 0x%08x %d\n", e->arg, e->recv.pcb, e->recv.err);
AsyncClient::_s_recv(e->arg, e->recv.pcb, e->recv.pb, e->recv.err);
} else if(e->event == LWIP_TCP_FIN){
//ets_printf("-F: 0x%08x\n", e->fin.pcb);
//ets_printf("F: 0x%08x 0x%08x %d\n", e->arg, e->fin.pcb, e->fin.err);
AsyncClient::_s_fin(e->arg, e->fin.pcb, e->fin.err);
} else if(e->event == LWIP_TCP_SENT){
//ets_printf("-S: 0x%08x\n", e->sent.pcb);
//ets_printf("S: 0x%08x 0x%08x\n", e->arg, e->sent.pcb);
AsyncClient::_s_sent(e->arg, e->sent.pcb, e->sent.len);
} else if(e->event == LWIP_TCP_POLL){
//ets_printf("-P: 0x%08x\n", e->poll.pcb);
//ets_printf("P: 0x%08x 0x%08x\n", e->arg, e->poll.pcb);
AsyncClient::_s_poll(e->arg, e->poll.pcb);
} else if(e->event == LWIP_TCP_ERROR){
//ets_printf("E: 0x%08x %d\n", e->arg, e->error.err);
//ets_printf("-E: 0x%08x %d\n", e->arg, e->error.err);
AsyncClient::_s_error(e->arg, e->error.err);
} else if(e->event == LWIP_TCP_CONNECTED){
//ets_printf("C: 0x%08x 0x%08x %d\n", e->arg, e->connected.pcb, e->connected.err);
@@ -352,8 +347,8 @@ typedef struct {
static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = 0;
if(msg->client && msg->client->pcb()){
msg->err = ERR_CONN;
if(msg->client && msg->client->pcb() == msg->pcb){
msg->err = tcp_output(msg->pcb);
}
return msg->err;
@@ -361,8 +356,7 @@ static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg){
static esp_err_t _tcp_output(tcp_pcb * pcb, AsyncClient * client) {
if(!pcb){
log_w("pcb is NULL");
return ESP_FAIL;
return ERR_CONN;
}
tcp_api_call_t msg;
msg.pcb = pcb;
@@ -373,8 +367,8 @@ static esp_err_t _tcp_output(tcp_pcb * pcb, AsyncClient * client) {
static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = 0;
if(msg->client && msg->client->pcb()){
msg->err = ERR_CONN;
if(msg->client && msg->client->pcb() == msg->pcb){
msg->err = tcp_write(msg->pcb, msg->write.data, msg->write.size, msg->write.apiflags);
}
return msg->err;
@@ -382,8 +376,7 @@ static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg){
static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_t apiflags, AsyncClient * client) {
if(!pcb){
log_w("pcb is NULL");
return ESP_FAIL;
return ERR_CONN;
}
tcp_api_call_t msg;
msg.pcb = pcb;
@@ -397,8 +390,9 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, const char* data, size_t size, uint8_
static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = 0;
if(msg->client && msg->client->pcb()){
msg->err = ERR_CONN;
if(msg->client && msg->client->pcb() == msg->pcb){
msg->err = 0;
tcp_recved(msg->pcb, msg->received);
}
return msg->err;
@@ -406,8 +400,7 @@ static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){
static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len, AsyncClient * client) {
if(!pcb){
log_w("pcb is NULL");
return ESP_FAIL;
return ERR_CONN;
}
tcp_api_call_t msg;
msg.pcb = pcb;
@@ -419,8 +412,8 @@ static esp_err_t _tcp_recved(tcp_pcb * pcb, size_t len, AsyncClient * client) {
static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = 0;
if(!msg->client || msg->client->pcb()){
msg->err = ERR_CONN;
if(!msg->client || msg->client->pcb() == msg->pcb){
msg->err = tcp_close(msg->pcb);
}
return msg->err;
@@ -428,8 +421,7 @@ static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg){
static esp_err_t _tcp_close(tcp_pcb * pcb, AsyncClient * client) {
if(!pcb){
log_w("pcb is NULL");
return ESP_FAIL;
return ERR_CONN;
}
tcp_api_call_t msg;
msg.pcb = pcb;
@@ -440,8 +432,8 @@ static esp_err_t _tcp_close(tcp_pcb * pcb, AsyncClient * client) {
static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = 0;
if(!msg->client || msg->client->pcb()){
msg->err = ERR_CONN;
if(!msg->client || msg->client->pcb() == msg->pcb){
tcp_abort(msg->pcb);
}
return msg->err;
@@ -449,8 +441,7 @@ static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg){
static esp_err_t _tcp_abort(tcp_pcb * pcb, AsyncClient * client) {
if(!pcb){
log_w("pcb is NULL");
return ESP_FAIL;
return ERR_CONN;
}
tcp_api_call_t msg;
msg.pcb = pcb;
@@ -467,7 +458,6 @@ static err_t _tcp_connect_api(struct tcpip_api_call_data *api_call_msg){
static esp_err_t _tcp_connect(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port, tcp_connected_fn cb) {
if(!pcb){
log_w("pcb is NULL");
return ESP_FAIL;
}
tcp_api_call_t msg;
@@ -487,7 +477,6 @@ static err_t _tcp_bind_api(struct tcpip_api_call_data *api_call_msg){
static esp_err_t _tcp_bind(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port) {
if(!pcb){
log_w("pcb is NULL");
return ESP_FAIL;
}
tcp_api_call_t msg;
@@ -507,7 +496,6 @@ static err_t _tcp_listen_api(struct tcpip_api_call_data *api_call_msg){
static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) {
if(!pcb){
log_w("pcb is NULL");
return NULL;
}
tcp_api_call_t msg;
@@ -565,6 +553,10 @@ AsyncClient::~AsyncClient(){
}
}
/*
* Operators
* */
AsyncClient& AsyncClient::operator=(const AsyncClient& other){
if (_pcb) {
_close();
@@ -582,7 +574,72 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other){
return *this;
}
// Methods
bool AsyncClient::operator==(const AsyncClient &other) {
return _pcb == other._pcb;
}
AsyncClient & AsyncClient::operator+=(const AsyncClient &other) {
if(next == NULL){
next = (AsyncClient*)(&other);
next->prev = this;
} else {
AsyncClient *c = next;
while(c->next != NULL) {
c = c->next;
}
c->next =(AsyncClient*)(&other);
c->next->prev = c;
}
return *this;
}
/*
* Callback Setters
* */
void AsyncClient::onConnect(AcConnectHandler cb, void* arg){
_connect_cb = cb;
_connect_cb_arg = arg;
}
void AsyncClient::onDisconnect(AcConnectHandler cb, void* arg){
_discard_cb = cb;
_discard_cb_arg = arg;
}
void AsyncClient::onAck(AcAckHandler cb, void* arg){
_sent_cb = cb;
_sent_cb_arg = arg;
}
void AsyncClient::onError(AcErrorHandler cb, void* arg){
_error_cb = cb;
_error_cb_arg = arg;
}
void AsyncClient::onData(AcDataHandler cb, void* arg){
_recv_cb = cb;
_recv_cb_arg = arg;
}
void AsyncClient::onPacket(AcPacketHandler cb, void* arg){
_pb_cb = cb;
_pb_cb_arg = arg;
}
void AsyncClient::onTimeout(AcTimeoutHandler cb, void* arg){
_timeout_cb = cb;
_timeout_cb_arg = arg;
}
void AsyncClient::onPoll(AcConnectHandler cb, void* arg){
_poll_cb = cb;
_poll_cb_arg = arg;
}
/*
* Main Public Methods
* */
bool AsyncClient::connect(IPAddress ip, uint16_t port){
if (_pcb){
@@ -614,6 +671,91 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){
return true;
}
bool AsyncClient::connect(const char* host, uint16_t port){
ip_addr_t addr;
err_t err = dns_gethostbyname(host, &addr, (dns_found_callback)&_tcp_dns_found, this);
if(err == ERR_OK) {
return connect(IPAddress(addr.u_addr.ip4.addr), port);
} else if(err == ERR_INPROGRESS) {
_connect_port = port;
return true;
}
log_e("error: %d", err);
return false;
}
void AsyncClient::close(bool now){
if(_pcb){
_tcp_recved(_pcb, _rx_ack_len, this);
}
_close();
}
int8_t AsyncClient::abort(){
if(_pcb) {
_tcp_abort(_pcb, this);
_pcb = NULL;
}
return ERR_ABRT;
}
size_t AsyncClient::space(){
if((_pcb != NULL) && (_pcb->state == 4)){
return tcp_sndbuf(_pcb);
}
return 0;
}
size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) {
if(!_pcb || size == 0 || data == NULL) {
return 0;
}
size_t room = space();
if(!room) {
return 0;
}
size_t will_send = (room < size) ? room : size;
int8_t err = ERR_OK;
err = _tcp_write(_pcb, data, will_send, apiflags, this);
if(err != ERR_OK) {
return 0;
}
return will_send;
}
bool AsyncClient::send(){
int8_t err = ERR_OK;
err = _tcp_output(_pcb, this);
if(err == ERR_OK){
_pcb_busy = true;
_pcb_sent_at = millis();
return true;
}
return false;
}
size_t AsyncClient::ack(size_t len){
if(len > _rx_ack_len)
len = _rx_ack_len;
if(len){
_tcp_recved(_pcb, len, this);
}
_rx_ack_len -= len;
return len;
}
void AsyncClient::ackPacket(struct pbuf * pb){
if(!pb){
return;
}
_tcp_recved(_pcb, pb->len, this);
pbuf_free(pb);
}
/*
* Main Private Methods
* */
int8_t AsyncClient::_close(){
//ets_printf("X: 0x%08x\n", (uint32_t)this);
int8_t err = ERR_OK;
@@ -637,6 +779,10 @@ int8_t AsyncClient::_close(){
return err;
}
/*
* Private Callbacks
* */
int8_t AsyncClient::_connected(void* pcb, int8_t err){
_pcb = reinterpret_cast<tcp_pcb*>(pcb);
if(_pcb){
@@ -669,16 +815,6 @@ void AsyncClient::_error(int8_t err) {
}
}
int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) {
_rx_last_packet = millis();
//log_i("%u", len);
_pcb_busy = false;
if(_sent_cb) {
_sent_cb(_sent_cb_arg, this, len, (millis() - _pcb_sent_at));
}
return ERR_OK;
}
//In LwIP Thread
int8_t AsyncClient::_lwip_fin(tcp_pcb* pcb, int8_t err) {
if(!_pcb || pcb != _pcb){
@@ -706,6 +842,16 @@ int8_t AsyncClient::_fin(tcp_pcb* pcb, int8_t err) {
return ERR_OK;
}
int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) {
_rx_last_packet = millis();
//log_i("%u", len);
_pcb_busy = false;
if(_sent_cb) {
_sent_cb(_sent_cb_arg, this, len, (millis() - _pcb_sent_at));
}
return ERR_OK;
}
int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
while(pb != NULL) {
_rx_last_packet = millis();
@@ -777,33 +923,9 @@ void AsyncClient::_dns_found(struct ip_addr *ipaddr){
}
}
bool AsyncClient::connect(const char* host, uint16_t port){
ip_addr_t addr;
err_t err = dns_gethostbyname(host, &addr, (dns_found_callback)&_tcp_dns_found, this);
if(err == ERR_OK) {
return connect(IPAddress(addr.u_addr.ip4.addr), port);
} else if(err == ERR_INPROGRESS) {
_connect_port = port;
return true;
}
log_e("error: %d", err);
return false;
}
int8_t AsyncClient::abort(){
if(_pcb) {
_tcp_abort(_pcb, this);
_pcb = NULL;
}
return ERR_ABRT;
}
void AsyncClient::close(bool now){
if(_pcb){
_tcp_recved(_pcb, _rx_ack_len, this);
}
_close();
}
/*
* Public Helper Methods
* */
void AsyncClient::stop() {
close(false);
@@ -819,13 +941,6 @@ bool AsyncClient::free(){
return false;
}
size_t AsyncClient::space(){
if((_pcb != NULL) && (_pcb->state == 4)){
return tcp_sndbuf(_pcb);
}
return 0;
}
size_t AsyncClient::write(const char* data) {
if(data == NULL) {
return 0;
@@ -841,45 +956,6 @@ size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) {
return will_send;
}
size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) {
if(!_pcb || size == 0 || data == NULL) {
return 0;
}
size_t room = space();
if(!room) {
return 0;
}
size_t will_send = (room < size) ? room : size;
int8_t err = ERR_OK;
err = _tcp_write(_pcb, data, will_send, apiflags, this);
if(err != ERR_OK) {
return 0;
}
return will_send;
}
bool AsyncClient::send(){
int8_t err = ERR_OK;
err = _tcp_output(_pcb, this);
if(err == ERR_OK){
_pcb_busy = true;
_pcb_sent_at = millis();
return true;
}
return false;
}
size_t AsyncClient::ack(size_t len){
if(len > _rx_ack_len)
len = _rx_ack_len;
if(len){
_tcp_recved(_pcb, len, this);
}
_rx_ack_len -= len;
return len;
}
void AsyncClient::setRxTimeout(uint32_t timeout){
_rx_since_timeout = timeout;
}
@@ -1011,111 +1087,6 @@ bool AsyncClient::canSend(){
return space() > 0;
}
void AsyncClient::ackPacket(struct pbuf * pb){
if(!pb){
return;
}
_tcp_recved(_pcb, pb->len, this);
pbuf_free(pb);
}
// Operators
bool AsyncClient::operator==(const AsyncClient &other) {
return _pcb == other._pcb;
}
AsyncClient & AsyncClient::operator+=(const AsyncClient &other) {
if(next == NULL){
next = (AsyncClient*)(&other);
next->prev = this;
} else {
AsyncClient *c = next;
while(c->next != NULL) {
c = c->next;
}
c->next =(AsyncClient*)(&other);
c->next->prev = c;
}
return *this;
}
// Callback Setters
void AsyncClient::onConnect(AcConnectHandler cb, void* arg){
_connect_cb = cb;
_connect_cb_arg = arg;
}
void AsyncClient::onDisconnect(AcConnectHandler cb, void* arg){
_discard_cb = cb;
_discard_cb_arg = arg;
}
void AsyncClient::onAck(AcAckHandler cb, void* arg){
_sent_cb = cb;
_sent_cb_arg = arg;
}
void AsyncClient::onError(AcErrorHandler cb, void* arg){
_error_cb = cb;
_error_cb_arg = arg;
}
void AsyncClient::onData(AcDataHandler cb, void* arg){
_recv_cb = cb;
_recv_cb_arg = arg;
}
void AsyncClient::onPacket(AcPacketHandler cb, void* arg){
_pb_cb = cb;
_pb_cb_arg = arg;
}
void AsyncClient::onTimeout(AcTimeoutHandler cb, void* arg){
_timeout_cb = cb;
_timeout_cb_arg = arg;
}
void AsyncClient::onPoll(AcConnectHandler cb, void* arg){
_poll_cb = cb;
_poll_cb_arg = arg;
}
void AsyncClient::_s_dns_found(const char * name, struct ip_addr * ipaddr, void * arg){
reinterpret_cast<AsyncClient*>(arg)->_dns_found(ipaddr);
}
int8_t AsyncClient::_s_poll(void * arg, struct tcp_pcb * pcb) {
return reinterpret_cast<AsyncClient*>(arg)->_poll(pcb);
}
int8_t AsyncClient::_s_recv(void * arg, struct tcp_pcb * pcb, struct pbuf *pb, int8_t err) {
return reinterpret_cast<AsyncClient*>(arg)->_recv(pcb, pb, err);
}
int8_t AsyncClient::_s_fin(void * arg, struct tcp_pcb * pcb, int8_t err) {
return reinterpret_cast<AsyncClient*>(arg)->_fin(pcb, err);
}
int8_t AsyncClient::_s_lwip_fin(void * arg, struct tcp_pcb * pcb, int8_t err) {
return reinterpret_cast<AsyncClient*>(arg)->_lwip_fin(pcb, err);
}
int8_t AsyncClient::_s_sent(void * arg, struct tcp_pcb * pcb, uint16_t len) {
return reinterpret_cast<AsyncClient*>(arg)->_sent(pcb, len);
}
void AsyncClient::_s_error(void * arg, int8_t err) {
reinterpret_cast<AsyncClient*>(arg)->_error(err);
}
int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){
return reinterpret_cast<AsyncClient*>(arg)->_connected(pcb, err);
}
const char * AsyncClient::errorToString(int8_t error){
switch(error){
case 0: return "OK";
@@ -1156,14 +1127,45 @@ const char * AsyncClient::stateToString(){
}
}
/*
* Static Callbacks (LwIP C2C++ interconnect)
* */
void AsyncClient::_s_dns_found(const char * name, struct ip_addr * ipaddr, void * arg){
reinterpret_cast<AsyncClient*>(arg)->_dns_found(ipaddr);
}
int8_t AsyncClient::_s_poll(void * arg, struct tcp_pcb * pcb) {
return reinterpret_cast<AsyncClient*>(arg)->_poll(pcb);
}
int8_t AsyncClient::_s_recv(void * arg, struct tcp_pcb * pcb, struct pbuf *pb, int8_t err) {
return reinterpret_cast<AsyncClient*>(arg)->_recv(pcb, pb, err);
}
int8_t AsyncClient::_s_fin(void * arg, struct tcp_pcb * pcb, int8_t err) {
return reinterpret_cast<AsyncClient*>(arg)->_fin(pcb, err);
}
int8_t AsyncClient::_s_lwip_fin(void * arg, struct tcp_pcb * pcb, int8_t err) {
return reinterpret_cast<AsyncClient*>(arg)->_lwip_fin(pcb, err);
}
int8_t AsyncClient::_s_sent(void * arg, struct tcp_pcb * pcb, uint16_t len) {
return reinterpret_cast<AsyncClient*>(arg)->_sent(pcb, len);
}
void AsyncClient::_s_error(void * arg, int8_t err) {
reinterpret_cast<AsyncClient*>(arg)->_error(err);
}
int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){
return reinterpret_cast<AsyncClient*>(arg)->_connected(pcb, err);
}
/*
Async TCP Server
*/
struct pending_pcb {
tcp_pcb* pcb;
pbuf *pb;
struct pending_pcb * next;
};
AsyncServer::AsyncServer(IPAddress addr, uint16_t port)
: _port(port)
@@ -1192,43 +1194,6 @@ void AsyncServer::onClient(AcConnectHandler cb, void* arg){
_connect_cb_arg = arg;
}
int8_t AsyncServer::_s_accept(void * arg, tcp_pcb * pcb, int8_t err){
return reinterpret_cast<AsyncServer*>(arg)->_accept(pcb, err);
}
int8_t AsyncServer::_s_accepted(void *arg, AsyncClient* client){
return reinterpret_cast<AsyncServer*>(arg)->_accepted(client);
}
//runs on LwIP thread
int8_t AsyncServer::_accept(tcp_pcb* pcb, int8_t err){
//ets_printf("+A: 0x%08x\n", pcb);
if(_connect_cb){
if (_noDelay) {
tcp_nagle_disable(pcb);
} else {
tcp_nagle_enable(pcb);
}
AsyncClient *c = new AsyncClient(pcb);
if(c){
return _tcp_accept(this, c);
}
}
if(tcp_close(pcb) != ERR_OK){
tcp_abort(pcb);
}
log_e("FAIL");
return ERR_OK;
}
int8_t AsyncServer::_accepted(AsyncClient* client){
if(_connect_cb){
_connect_cb(_connect_cb_arg, client);
}
return ERR_OK;
}
void AsyncServer::begin(){
if(_pcb) {
return;
@@ -1275,6 +1240,30 @@ void AsyncServer::end(){
}
}
//runs on LwIP thread
int8_t AsyncServer::_accept(tcp_pcb* pcb, int8_t err){
//ets_printf("+A: 0x%08x\n", pcb);
if(_connect_cb){
AsyncClient *c = new AsyncClient(pcb);
if(c){
c->setNoDelay(_noDelay);
return _tcp_accept(this, c);
}
}
if(tcp_close(pcb) != ERR_OK){
tcp_abort(pcb);
}
log_e("FAIL");
return ERR_OK;
}
int8_t AsyncServer::_accepted(AsyncClient* client){
if(_connect_cb){
_connect_cb(_connect_cb_arg, client);
}
return ERR_OK;
}
void AsyncServer::setNoDelay(bool nodelay){
_noDelay = nodelay;
}
@@ -1289,3 +1278,11 @@ uint8_t AsyncServer::status(){
}
return _pcb->state;
}
int8_t AsyncServer::_s_accept(void * arg, tcp_pcb * pcb, int8_t err){
return reinterpret_cast<AsyncServer*>(arg)->_accept(pcb, err);
}
int8_t AsyncServer::_s_accepted(void *arg, AsyncClient* client){
return reinterpret_cast<AsyncServer*>(arg)->_accepted(client);
}