From 0d02922e52ee4701ff61bf0bc705c0759389c8f1 Mon Sep 17 00:00:00 2001 From: Me No Dev Date: Wed, 29 Jun 2016 19:29:39 +0300 Subject: [PATCH] Add onConnect callback to EventSources usage: AsyncEventSource events("/events"); events.onConnect([](AsyncEventSourceClient *client){ client->send("Hello!",NULL,0,1000); }); server.addHandler(&events); ... events.send("boot finished","system");//send "system" event --- src/AsyncEventSource.cpp | 230 +++++++++++++++++++++------------------ src/AsyncEventSource.h | 6 +- 2 files changed, 128 insertions(+), 108 deletions(-) diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 4275e9f..6ca9bad 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -20,109 +20,7 @@ #include "Arduino.h" #include "AsyncEventSource.h" -// Client - -AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server){ - _client = request->client(); - _server = server; - next = NULL; - //_client->onError([](void *r, AsyncClient* c, int8_t error){ ((AsyncEventSourceClient*)(r))->_onError(error); }, this); - //_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this); - //_client->onPoll([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onPoll(); }, this); - _client->onError(NULL, NULL); - _client->onAck(NULL, NULL); - _client->onPoll(NULL, NULL); - _client->onData(NULL, NULL); - _client->onTimeout([](void *r, AsyncClient* c, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this); - _client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); }, this); - _server->_addClient(this); - delete request; -} - -AsyncEventSourceClient::~AsyncEventSourceClient(){ - close(); -} - -//void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){} -//void AsyncEventSourceClient::_onPoll(){} -//void AsyncEventSourceClient::_onError(int8_t){} -void AsyncEventSourceClient::_onTimeout(uint32_t time){ - _client->close(true); -} - -void AsyncEventSourceClient::_onDisconnect(){ - AsyncClient* cl = _client; - _client = NULL; - cl->free(); - delete cl; - _server->_handleDisconnect(this); -} - -void AsyncEventSourceClient::close(){ - if(_client != NULL) - _client->close(true); -} - -void AsyncEventSourceClient::send(const char * message, size_t len){ - if(!_client->canSend()){ - return; - } - if(_client->space() < len){ - return; - } - _client->write(message, len); -} - - -// Handler - -AsyncEventSource::AsyncEventSource(String url):_url(url){} - -AsyncEventSource::~AsyncEventSource(){ - close(); -} - -void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ - if(_clients == NULL){ - _clients = client; - return; - } - AsyncEventSourceClient * c = _clients; - while(c->next != NULL) c = c->next; - c->next = client; -} - -void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){ - if(_clients == NULL){ - return; - } - if(_clients == client){ - _clients = client->next; - delete client; - return; - } - AsyncEventSourceClient * c = _clients; - while(c->next != NULL && c->next != client) c = c->next; - if(c->next == NULL){ - return; - } - c->next = client->next; - delete client; -} - -void AsyncEventSource::close(){ - AsyncEventSourceClient * c = _clients; - while(c != NULL){ - if(c->connected()) - c->close(); - c = c->next; - } -} - -void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ - if(_clients == NULL) - return; - +static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev = ""; if(reconnect){ @@ -151,7 +49,6 @@ void AsyncEventSource::send(const char *message, const char *event, uint32_t id, char * nextN = strchr(lineStart, '\n'); char * nextR = strchr(lineStart, '\r'); if(nextN == NULL && nextR == NULL){ - //last line size_t llen = ((char *)message + messageLen) - lineStart; char * ldata = (char *)malloc(llen+1); if(ldata != NULL){ @@ -204,15 +101,134 @@ void AsyncEventSource::send(const char *message, const char *event, uint32_t id, } while(lineStart < ((char *)message + messageLen)); } - //os_printf("EVENT_SOURCE:\n%s", ev.c_str()); + return ev; +} +// Client + +AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server){ + _client = request->client(); + _server = server; + next = NULL; + //_client->onError([](void *r, AsyncClient* c, int8_t error){ ((AsyncEventSourceClient*)(r))->_onError(error); }, this); + //_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this); + //_client->onPoll([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onPoll(); }, this); + _client->onError(NULL, NULL); + _client->onAck(NULL, NULL); + _client->onPoll(NULL, NULL); + _client->onData(NULL, NULL); + _client->onTimeout([](void *r, AsyncClient* c, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this); + _client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); }, this); + _server->_addClient(this); + delete request; +} + +AsyncEventSourceClient::~AsyncEventSourceClient(){ + close(); +} + +//void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){} +//void AsyncEventSourceClient::_onPoll(){} +//void AsyncEventSourceClient::_onError(int8_t){} +void AsyncEventSourceClient::_onTimeout(uint32_t time){ + _client->close(true); +} + +void AsyncEventSourceClient::_onDisconnect(){ + AsyncClient* cl = _client; + _client = NULL; + cl->free(); + delete cl; + _server->_handleDisconnect(this); +} + +void AsyncEventSourceClient::close(){ + if(_client != NULL) + _client->close(true); +} + +void AsyncEventSourceClient::write(const char * message, size_t len){ + if(!_client->canSend()){ + return; + } + if(_client->space() < len){ + return; + } + _client->write(message, len); +} + +void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ + String ev = generateEventMessage(message, event, id, reconnect); + write(ev.c_str(), ev.length()); +} + + +// Handler + +AsyncEventSource::AsyncEventSource(String url) + : _url(url) + , _clients(NULL) + , _connectcb(NULL) +{} + +AsyncEventSource::~AsyncEventSource(){ + close(); +} + +void AsyncEventSource::onConnect(ArEventHandlerFunction cb){ + _connectcb = cb; +} + +void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ + if(_clients == NULL){ + _clients = client; + return; + } + AsyncEventSourceClient * c = _clients; + while(c->next != NULL) c = c->next; + c->next = client; + if(_connectcb) + _connectcb(client); +} + +void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){ + if(_clients == NULL){ + return; + } + if(_clients == client){ + _clients = client->next; + delete client; + return; + } + AsyncEventSourceClient * c = _clients; + while(c->next != NULL && c->next != client) c = c->next; + if(c->next == NULL){ + return; + } + c->next = client->next; + delete client; +} + +void AsyncEventSource::close(){ AsyncEventSourceClient * c = _clients; while(c != NULL){ if(c->connected()) - c->send(ev.c_str(), ev.length()); + c->close(); + c = c->next; + } +} + +void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ + if(_clients == NULL) + return; + + String ev = generateEventMessage(message, event, id, reconnect); + AsyncEventSourceClient * c = _clients; + while(c != NULL){ + if(c->connected()) + c->write(ev.c_str(), ev.length()); c = c->next; } - ev = String(); } bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){ diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index bb5cdcb..1464f31 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -27,6 +27,7 @@ class AsyncEventSource; class AsyncEventSourceResponse; class AsyncEventSourceClient; +typedef std::function ArEventHandlerFunction; class AsyncEventSourceClient { private: @@ -41,7 +42,8 @@ class AsyncEventSourceClient { AsyncClient* client(){ return _client; } void close(); - void send(const char * message, size_t len); + void write(const char * message, size_t len); + void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); bool connected(){ return (_client != NULL) && _client->connected(); } //system callbacks (do not call) @@ -56,6 +58,7 @@ class AsyncEventSource: public AsyncWebHandler { private: String _url; AsyncEventSourceClient * _clients; + ArEventHandlerFunction _connectcb; uint32_t _cNextId; public: AsyncEventSource(String url); @@ -63,6 +66,7 @@ class AsyncEventSource: public AsyncWebHandler { const char * url(){ return _url.c_str(); } void close(); + void onConnect(ArEventHandlerFunction cb); void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); //system callbacks (do not call)