rework SSE handling

implement AsyncEventSourceMessage using shared std::string container
one message instance could be shared for multiple connected clients

add zero-copy overloads for message queue in AsyncEventSourceClient
message queue for the client consists of possibly shared instances of AsyncEventSourceMessage,
adding a message to the queue won't create deep-copy object

AsyncEventSource will create a single instance of data message and populate connected clients queues with a shared pointer
This commit is contained in:
Emil Muratov
2024-11-09 00:31:01 +09:00
parent d86e0906bd
commit 43e0b5c5a6
4 changed files with 593 additions and 162 deletions

View File

@ -0,0 +1,257 @@
//
// SSE server with a load generator
// it will auto adjust message push rate to minimize discards across all connected clients
// per second stats is printed to a serial console and also published as SSE ping message
// open /sse URL to start events generator
#include <Arduino.h>
#ifdef ESP32
#include <AsyncTCP.h>
#include <WiFi.h>
#elif defined(ESP8266)
#include <ESP8266WiFi.h>
#include <ESPAsyncTCP.h>
#elif defined(TARGET_RP2040)
#include <WebServer.h>
#include <WiFi.h>
#endif
#include <ESPAsyncWebServer.h>
#if __has_include("ArduinoJson.h")
#include <ArduinoJson.h>
#include <AsyncJson.h>
#include <AsyncMessagePack.h>
#endif
#include <LittleFS.h>
const char* htmlContent PROGMEM = R"(
<!DOCTYPE html>
<html>
<head>
<title>Sample HTML</title>
</head>
<body>
<h1>Hello, World!</h1>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin euismod, purus a euismod
rhoncus, urna ipsum cursus massa, eu dictum tellus justo ac justo. Quisque ullamcorper
arcu nec tortor ullamcorper, vel fermentum justo fermentum. Vivamus sed velit ut elit
accumsan congue ut ut enim. Ut eu justo eu lacus varius gravida ut a tellus. Nulla facilisi.
Integer auctor consectetur ultricies. Fusce feugiat, mi sit amet bibendum viverra, orci leo
dapibus elit, id varius sem dui id lacus.</p>
</body>
</html>
)";
const char* staticContent PROGMEM = R"(
<!DOCTYPE html>
<html>
<head>
<title>Sample HTML</title>
</head>
<body>
<h1>Hello, %IP%</h1>
</body>
</html>
)";
AsyncWebServer server(80);
AsyncEventSource events("/events");
/////////////////////////////////////////////////////////////////////////////////////////////////////
const char* PARAM_MESSAGE PROGMEM = "message";
const char* SSE_HTLM PROGMEM = R"(
<!DOCTYPE html>
<html>
<head>
<title>Server-Sent Events</title>
<script>
if (!!window.EventSource) {
var source = new EventSource('/events');
source.addEventListener('open', function(e) {
console.log("Events Connected");
}, false);
source.addEventListener('error', function(e) {
if (e.target.readyState != EventSource.OPEN) {
console.log("Events Disconnected");
}
}, false);
source.addEventListener('message', function(e) {
console.log("message", e);
}, false);
source.addEventListener('heartbeat', function(e) {
console.log("heartbeat", e.data);
}, false);
}
</script>
</head>
<body>
<h1>Open your browser console!</h1>
</body>
</html>
)";
static const char* SSE_MSG = R"(Alice felt that this could not be denied, so she tried another question. 'What sort of people live about here?' 'In THAT direction,' the Cat said, waving its right paw round, 'lives a Hatter: and in THAT direction,' waving the other paw, 'lives a March Hare. Visit either you like: they're both mad.'
'But I don't want to go among mad people,' Alice remarked. 'Oh, you can't help that,' said the Cat: 'we're all mad here. I'm mad. You're mad.' 'How do you know I'm mad?' said Alice.
'You must be,' said the Cat, `or you wouldn't have come here.' Alice didn't think that proved it at all; however, she went on 'And how do you know that you're mad?' 'To begin with,' said the Cat, 'a dog's not mad. You grant that?'
)";
void notFound(AsyncWebServerRequest* request) {
request->send(404, "text/plain", "Not found");
}
static const char characters[] = "0123456789ABCDEF";
static size_t charactersIndex = 0;
void setup() {
Serial.begin(115200);
#ifndef CONFIG_IDF_TARGET_ESP32H2
/*
WiFi.mode(WIFI_STA);
WiFi.begin("SSID", "passwd");
if (WiFi.waitForConnectResult() != WL_CONNECTED) {
Serial.printf("WiFi Failed!\n");
return;
}
Serial.print("IP Address: ");
Serial.println(WiFi.localIP());
*/
WiFi.mode(WIFI_AP);
WiFi.softAP("esp-captive");
#endif
server.on("/", HTTP_GET, [](AsyncWebServerRequest* request) {
request->send(200, "text/html", staticContent);
});
events.onConnect([](AsyncEventSourceClient* client) {
if (client->lastId()) {
Serial.printf("SSE Client reconnected! Last message ID that it gat is: %" PRIu32 "\n", client->lastId());
}
client->send("hello!", NULL, millis(), 1000);
});
server.on("/sse", HTTP_GET, [](AsyncWebServerRequest* request) {
request->send(200, "text/html", SSE_HTLM);
});
// go to http://192.168.4.1/sse
server.addHandler(&events);
server.onNotFound(notFound);
server.begin();
}
uint32_t lastSSE = 0;
uint32_t deltaSSE = 25;
uint32_t messagesSSE = 4; // how many messages to q each time
uint32_t sse_disc{0}, sse_enq{0}, sse_penq{0}, sse_second{0};
AsyncEventSource::SendStatus enqueue() {
AsyncEventSource::SendStatus state = events.send(SSE_MSG, "message");
if (state == AsyncEventSource::SendStatus::DISCARDED)
++sse_disc;
else if (state == AsyncEventSource::SendStatus::ENQUEUED) {
++sse_enq;
} else
++sse_penq;
return state;
}
void loop() {
uint32_t now = millis();
if (now - lastSSE >= deltaSSE) {
// enqueue messages
for (uint32_t i = 0; i != messagesSSE; ++i) {
auto err = enqueue();
if (err == AsyncEventSource::SendStatus::DISCARDED || err == AsyncEventSource::SendStatus::PARTIALLY_ENQUEUED) {
// throttle messaging a bit
lastSSE = now + deltaSSE;
break;
}
}
lastSSE = millis();
}
if (now - sse_second > 1000) {
String s;
s.reserve(100);
s = "Ping:";
s += now / 1000;
s += " clients:";
s += events.count();
s += " disc:";
s += sse_disc;
s += " enq:";
s += sse_enq;
s += " partial:";
s += sse_penq;
s += " avg wait:";
s += events.avgPacketsWaiting();
s += " heap:";
s += ESP.getFreeHeap() / 1024;
events.send(s, "heartbeat", now);
Serial.println();
Serial.println(s);
// if we see discards or partial enqueues, let's decrease message rate, else - increase. So that we can come to a max sustained message rate
if (sse_disc || sse_penq)
++deltaSSE;
else if (deltaSSE > 5)
--deltaSSE;
sse_disc = sse_enq = sse_penq = 0;
sse_second = now;
}
}

View File

@ -25,114 +25,97 @@
using namespace asyncsrv;
static String generateEventMessage(const char* message, const char* event, uint32_t id, uint32_t reconnect) {
String ev;
static std::string generateEventMessage(const char* message, const char* event, uint32_t id, uint32_t reconnect) {
std::string str;
size_t len{0};
if (message)
len += strlen(message);
if (event)
len += strlen(event);
len += 42; // give it some overhead
str.reserve(len);
if (reconnect) {
ev += T_retry_;
ev += reconnect;
ev += T_rn;
str += T_retry_;
str += reconnect;
str += 0xa; // '\n'
}
if (id) {
ev += T_id__;
ev += id;
ev += T_rn;
str += T_id__;
char buff[16];
snprintf ( buff, 16, "%u", id );
str += buff;
str += 0xa; // '\n'
}
if (event != NULL) {
ev += T_event_;
ev += event;
ev += T_rn;
str += T_event_;
str += event;
str += 0xa; // '\n'
}
if (message != NULL) {
size_t messageLen = strlen(message);
char* lineStart = (char*)message;
char* lineEnd;
do {
char* nextN = strchr(lineStart, '\n');
char* nextR = strchr(lineStart, '\r');
if (nextN == NULL && nextR == NULL) {
size_t llen = ((char*)message + messageLen) - lineStart;
char* ldata = (char*)malloc(llen + 1);
if (ldata != NULL) {
memcpy(ldata, lineStart, llen);
ldata[llen] = 0;
ev += T_data_;
ev += ldata;
ev += T_rnrn;
free(ldata);
}
lineStart = (char*)message + messageLen;
if (!message)
return str;
size_t messageLen = strlen(message);
char* lineStart = (char*)message;
char* lineEnd;
do {
char* nextN = strchr(lineStart, '\n');
char* nextR = strchr(lineStart, '\r');
if (nextN == NULL && nextR == NULL) {
// a message is a single-line string
str += T_data_;
str += message;
str += T_nn;
return str;
}
// a message is a multi-line string
char* nextLine = NULL;
if (nextN != NULL && nextR != NULL) { // windows line-ending \r\n
if (nextR + 1 == nextN) {
// normal \r\n sequense
lineEnd = nextR;
nextLine = nextN + 1;
} else {
char* nextLine = NULL;
if (nextN != NULL && nextR != NULL) {
if (nextR < nextN) {
lineEnd = nextR;
if (nextN == (nextR + 1))
nextLine = nextN + 1;
else
nextLine = nextR + 1;
} else {
lineEnd = nextN;
if (nextR == (nextN + 1))
nextLine = nextR + 1;
else
nextLine = nextN + 1;
}
} else if (nextN != NULL) {
lineEnd = nextN;
nextLine = nextN + 1;
} else {
lineEnd = nextR;
nextLine = nextR + 1;
}
size_t llen = lineEnd - lineStart;
char* ldata = (char*)malloc(llen + 1);
if (ldata != NULL) {
memcpy(ldata, lineStart, llen);
ldata[llen] = 0;
ev += T_data_;
ev += ldata;
ev += T_rn;
free(ldata);
}
lineStart = nextLine;
if (lineStart == ((char*)message + messageLen))
ev += T_rn;
// some abnormal \n \r mixed sequence
lineEnd = std::min(nextR, nextN);
nextLine = lineEnd + 1;
}
} while (lineStart < ((char*)message + messageLen));
}
} else if (nextN != NULL) { // Unix/Mac OS X LF
lineEnd = nextN;
nextLine = nextN + 1;
} else { // some ancient garbage
lineEnd = nextR;
nextLine = nextR + 1;
}
return ev;
str += T_data_;
str.append(lineStart, lineEnd - lineStart);
str += 0xa; // \n
lineStart = nextLine;
} while (lineStart < ((char*)message + messageLen));
// append another \n to terminate message
str += 0xa; // '\n'
return str;
}
// Message
AsyncEventSourceMessage::AsyncEventSourceMessage(const char* data, size_t len)
: _data(nullptr), _len(len), _sent(0), _acked(0) {
_data = (uint8_t*)malloc(_len + 1);
if (_data == nullptr) {
_len = 0;
} else {
memcpy(_data, data, len);
_data[_len] = 0;
}
}
AsyncEventSourceMessage::~AsyncEventSourceMessage() {
if (_data != NULL)
free(_data);
}
size_t AsyncEventSourceMessage::ack(size_t len, __attribute__((unused)) uint32_t time) {
// If the whole message is now acked...
if (_acked + len > _len) {
if (_acked + len > _data->size()) {
// Return the number of extra bytes acked (they will be carried on to the next message)
const size_t extra = _acked + len - _len;
_acked = _len;
const size_t extra = _acked + len - _data->size();
_acked = _data->size();
return extra;
}
// Return that no extra bytes left.
@ -144,13 +127,25 @@ size_t AsyncEventSourceMessage::write(AsyncClient* client) {
if (!client)
return 0;
if (_sent >= _len || !client->canSend()) {
if (_sent >= _data->size() || !client->canSend()) {
return 0;
}
size_t len = min(_len - _sent, client->space());
size_t sent = client->add((const char*)_data + _sent, len);
_sent += sent;
return sent;
size_t len = std::min(_data->size() - _sent, client->space());
/*
add() would call lwip's tcp_write() under the AsyncTCP hood with apiflags argument.
By default apiflags=ASYNC_WRITE_FLAG_COPY
we could have used apiflags with this flag unset to pass data by reference and avoid copy to socket buffer,
but looks like it does not work for Arduino's lwip in ESP32/IDF
it is enforced in https://github.com/espressif/esp-lwip/blob/0606eed9d8b98a797514fdf6eabb4daf1c8c8cd9/src/core/tcp_out.c#L422C5-L422C30
if LWIP_NETIF_TX_SINGLE_PBUF is set, and it is set indeed in IDF
https://github.com/espressif/esp-idf/blob/a0f798cfc4bbd624aab52b2c194d219e242d80c1/components/lwip/port/include/lwipopts.h#L744
So let's just keep it enforced ASYNC_WRITE_FLAG_COPY and keep in mind that there is no zero-copy
*/
size_t written = client->add(_data->data() + _sent, len, ASYNC_WRITE_FLAG_COPY); // ASYNC_WRITE_FLAG_MORE
_sent += written;
return written;
}
size_t AsyncEventSourceMessage::send(AsyncClient* client) {
@ -160,20 +155,19 @@ size_t AsyncEventSourceMessage::send(AsyncClient* client) {
// Client
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest* request, AsyncEventSource* server) {
_client = request->client();
_server = server;
_lastId = 0;
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest* request, AsyncEventSource* server)
: _client(request->client()), _server(server) {
if (request->hasHeader(T_Last_Event_ID))
_lastId = atoi(request->getHeader(T_Last_Event_ID)->value().c_str());
_client->setRxTimeout(0);
_client->onError(NULL, NULL);
_client->onAck([](void* r, AsyncClient* c, size_t len, uint32_t time) { (void)c; ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
_client->onPoll([](void* r, AsyncClient* c) { (void)c; ((AsyncEventSourceClient*)(r))->_onPoll(); }, this);
_client->onAck([](void* r, AsyncClient* c, size_t len, uint32_t time) { (void)c; static_cast<AsyncEventSourceClient*>(r)->_onAck(len, time); }, this);
_client->onPoll([](void* r, AsyncClient* c) { (void)c; static_cast<AsyncEventSourceClient*>(r)->_onPoll(); }, this);
_client->onData(NULL, NULL);
_client->onTimeout([this](void* r, AsyncClient* c __attribute__((unused)), uint32_t time) { ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
_client->onDisconnect([this](void* r, AsyncClient* c) { ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this);
_client->onTimeout([this](void* r, AsyncClient* c __attribute__((unused)), uint32_t time) { static_cast<AsyncEventSourceClient*>(r)->_onTimeout(time); }, this);
_client->onDisconnect([this](void* r, AsyncClient* c) { static_cast<AsyncEventSourceClient*>(r)->_onDisconnect(); delete c; }, this);
_server->_addClient(this);
delete request;
@ -190,29 +184,61 @@ AsyncEventSourceClient::~AsyncEventSourceClient() {
}
bool AsyncEventSourceClient::_queueMessage(const char* message, size_t len) {
if (!_client)
if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
#ifdef ESP8266
ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
#elif defined(ESP32)
log_e("Event message queue overflow: discard message");
#endif
return false;
}
#ifdef ESP32
// length() is not thread-safe, thus acquiring the lock before this call..
std::lock_guard<std::mutex> lock(_lockmq);
#endif
_messageQueue.emplace_back(message, len);
/*
throttle queue run
if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
*/
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) {
_runQueue();
}
return true;
}
bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t&& msg) {
if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
#ifdef ESP8266
ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
#elif defined(ESP32)
log_e("Too many messages queued: deleting message");
log_e("Event message queue overflow: discard message");
#endif
return false;
}
_messageQueue.emplace_back(message, len);
// runqueue trigger when new messages added
if (_client->canSend()) {
#ifdef ESP32
// length() is not thread-safe, thus acquiring the lock before this call..
std::lock_guard<std::mutex> lock(_lockmq);
#endif
_messageQueue.emplace_back(std::move(msg));
/*
throttle queue run
if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
*/
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) {
_runQueue();
}
return true;
}
@ -221,15 +247,33 @@ void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t
// Same here, acquiring the lock early
std::lock_guard<std::mutex> lock(_lockmq);
#endif
_runQueue();
// adjust in-flight len
if (len < _inflight)
_inflight -= len;
else
_inflight = 0;
// acknowledge as much messages's data as we got confirmed len from a AsyncTCP
while (len && _messageQueue.size()) {
len = _messageQueue.front().ack(len);
if (_messageQueue.front().finished()) {
// now we could release full ack'ed messages, we were keeping it unless send confirmed from AsyncTCP
_messageQueue.pop_front();
}
}
// try to send another batch of data
if (_messageQueue.size())
_runQueue();
}
void AsyncEventSourceClient::_onPoll() {
#ifdef ESP32
// Same here, acquiring the lock early
std::lock_guard<std::mutex> lock(_lockmq);
#endif
if (_messageQueue.size()) {
#ifdef ESP32
// Same here, acquiring the lock early
std::lock_guard<std::mutex> lock(_lockmq);
#endif
_runQueue();
}
}
@ -251,50 +295,42 @@ void AsyncEventSourceClient::close() {
_client->close();
}
bool AsyncEventSourceClient::write(const char* message, size_t len) {
return connected() && _queueMessage(message, len);
}
bool AsyncEventSourceClient::send(const char* message, const char* event, uint32_t id, uint32_t reconnect) {
if (!connected())
return false;
String ev = generateEventMessage(message, event, id, reconnect);
return _queueMessage(ev.c_str(), ev.length());
}
size_t AsyncEventSourceClient::packetsWaiting() const {
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lockmq);
#endif
return _messageQueue.size();
return _queueMessage(std::make_shared<std::string>(generateEventMessage(message, event, id, reconnect)));
}
void AsyncEventSourceClient::_runQueue() {
if (!_client)
return;
// there is no need to lock the mutex here, 'cause all the calls to this method must be already lock'ed
size_t total_bytes_written = 0;
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
if (!i->sent()) {
const size_t bytes_written = i->write(_client);
total_bytes_written += bytes_written;
if (bytes_written == 0)
_inflight += bytes_written;
if (bytes_written == 0 || _inflight > _max_inflight) {
// Serial.print("_");
break;
}
}
}
if (total_bytes_written > 0)
// flush socket
if (total_bytes_written)
_client->send();
size_t len = total_bytes_written;
while (len && _messageQueue.size()) {
len = _messageQueue.front().ack(len);
if (_messageQueue.front().finished()) {
_messageQueue.pop_front();
}
}
}
void AsyncEventSourceClient::set_max_inflight_bytes(size_t value) {
if (value >= SSE_MIN_INFLIGH && value <= SSE_MAX_INFLIGH)
_max_inflight = value;
}
/* AsyncEventSource */
void AsyncEventSource::authorizeConnect(ArAuthorizeConnectHandler cb) {
AuthorizationMiddleware* m = new AuthorizationMiddleware(401, cb);
m->_freeOnRemoval = true;
@ -310,18 +346,21 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient* client) {
_clients.emplace_back(client);
if (_connectcb)
_connectcb(client);
_adjust_inflight_window();
}
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient* client) {
if (_disconnectcb)
_disconnectcb(client);
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
if (_disconnectcb)
_disconnectcb(client);
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
if (i->get() == client)
_clients.erase(i);
}
_adjust_inflight_window();
}
void AsyncEventSource::close() {
@ -358,14 +397,14 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
AsyncEventSource::SendStatus AsyncEventSource::send(
const char* message, const char* event, uint32_t id, uint32_t reconnect) {
String ev = generateEventMessage(message, event, id, reconnect);
AsyncEvent_SharedData_t shared_msg = std::make_shared<std::string>(generateEventMessage(message, event, id, reconnect));
#ifdef ESP32
std::lock_guard<std::mutex> lock(_client_queue_lock);
#endif
size_t hits = 0;
size_t miss = 0;
for (const auto& c : _clients) {
if (c->write(ev.c_str(), ev.length()))
if (c->write(shared_msg))
++hits;
else
++miss;
@ -393,7 +432,16 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest* request) {
request->send(new AsyncEventSourceResponse(this));
}
// Response
void AsyncEventSource::_adjust_inflight_window() {
if (_clients.size()) {
size_t inflight = SSE_MAX_INFLIGH / _clients.size();
for (const auto& c : _clients)
c->set_max_inflight_bytes(inflight);
// Serial.printf("adjusted inflight to: %u\n", inflight);
}
}
/* Response */
AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource* server) {
_server = server;

View File

@ -21,22 +21,29 @@
#define ASYNCEVENTSOURCE_H_
#include <Arduino.h>
#include <string>
#ifdef ESP32
#include <AsyncTCP.h>
#include <mutex>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 32
#endif
#define SSE_MIN_INFLIGH 2 * 1460 // allow 2 MSS packets
#define SSE_MAX_INFLIGH 16 * 1024 // but no more than 16k, no need to blow it, since same data is kept in local Q
#elif defined(ESP8266)
#include <ESPAsyncTCP.h>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 8
#endif
#define SSE_MIN_INFLIGH 2 * 1460 // allow 2 MSS packets
#define SSE_MAX_INFLIGH 8 * 1024 // but no more than 8k, no need to blow it, since same data is kept in local Q
#elif defined(TARGET_RP2040)
#include <AsyncTCP_RP2040W.h>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 32
#endif
#define SSE_MIN_INFLIGH 2 * 1460 // allow 2 MSS packets
#define SSE_MAX_INFLIGH 16 * 1024 // but no more than 16k, no need to blow it, since same data is kept in local Q
#endif
#include <ESPAsyncWebServer.h>
@ -53,58 +60,150 @@ class AsyncEventSourceResponse;
class AsyncEventSourceClient;
using ArEventHandlerFunction = std::function<void(AsyncEventSourceClient* client)>;
using ArAuthorizeConnectHandler = ArAuthorizeFunction;
// shared message object container
using AsyncEvent_SharedData_t = std::shared_ptr<std::string>;
/**
* @brief Async Event Message container with shared message content data
*
*/
class AsyncEventSourceMessage {
private:
uint8_t* _data;
size_t _len;
size_t _sent;
// size_t _ack;
size_t _acked;
const AsyncEvent_SharedData_t _data;
size_t _sent{0}; // num of bytes already sent
size_t _acked{0}; // num of bytes acked
public:
AsyncEventSourceMessage(const char* data, size_t len);
~AsyncEventSourceMessage();
AsyncEventSourceMessage(AsyncEvent_SharedData_t data) : _data(data) {};
AsyncEventSourceMessage(const char* data, size_t len) : _data(std::make_shared<std::string>(data, len)) {};
/**
* @brief acknowledge sending len bytes of data
* @note if num of bytes to ack is larger then the unacknowledged message length the number of carried over bytes are returned
*
* @param len bytes to acknowlegde
* @param time
* @return size_t number of extra bytes carried over
*/
size_t ack(size_t len, uint32_t time = 0);
/**
* @brief write message data to client's buffer
* @note this method does NOT call client's send
*
* @param client
* @return size_t number of bytes written
*/
size_t write(AsyncClient* client);
/**
* @brief writes message data to client's buffer and calls client's send method
*
* @param client
* @return size_t returns num of bytes the clien was able to send()
*/
size_t send(AsyncClient* client);
bool finished() { return _acked == _len; }
bool sent() { return _sent == _len; }
// returns true if full message's length were acked
bool finished() { return _acked == _data->length(); }
/**
* @brief returns true if all data has been sent already
*
*/
bool sent() { return _sent == _data->length(); }
};
/**
* @brief class holds a sse messages queue for a particular client's connection
*
*/
class AsyncEventSourceClient {
private:
AsyncClient* _client;
AsyncEventSource* _server;
uint32_t _lastId;
uint32_t _lastId{0};
size_t _inflight{0}; // num of unacknowledged bytes that has been written to socket buffer
size_t _max_inflight{SSE_MAX_INFLIGH}; // max num of unacknowledged bytes that could be written to socket buffer
std::list<AsyncEventSourceMessage> _messageQueue;
#ifdef ESP32
mutable std::mutex _lockmq;
#endif
bool _queueMessage(const char* message, size_t len);
bool _queueMessage(AsyncEvent_SharedData_t&& msg);
void _runQueue();
public:
AsyncEventSourceClient(AsyncWebServerRequest* request, AsyncEventSource* server);
~AsyncEventSourceClient();
AsyncClient* client() { return _client; }
void close();
bool write(const char* message, size_t len);
/**
* @brief Send an SSE message to client
* it will craft an SSE message and place it to client's message queue
*
* @param message body string, could be single or multi-line string sepprated by \n, \r, \r\n
* @param event body string, a sinle line string
* @param id sequence id
* @param reconnect client's reconnect timeout
* @return true if message was placed in a queue
* @return false if queue is full
*/
bool send(const char* message, const char* event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
bool send(const String& message, const String& event, uint32_t id = 0, uint32_t reconnect = 0) { return send(message.c_str(), event.c_str(), id, reconnect); }
bool send(const String& message, const char* event, uint32_t id = 0, uint32_t reconnect = 0) { return send(message.c_str(), event, id, reconnect); }
bool send(const char* message, const char* event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
/**
* @brief place supplied preformatted SSE message to the message queue
* @note message must a properly formatted SSE string according to https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
*
* @param message data
* @return true on success
* @return false on queue overflow or no client connected
*/
bool write(AsyncEvent_SharedData_t message) { return connected() && _queueMessage(std::move(message)); };
[[deprecated("Use _write(AsyncEvent_SharedData_t message) instead to share same data with multiple SSE clients")]]
bool write(const char* message, size_t len) { return connected() && _queueMessage(message, len); };
// close client's connection
void close();
// getters
AsyncClient* client() { return _client; }
bool connected() const { return _client && _client->connected(); }
uint32_t lastId() const { return _lastId; }
size_t packetsWaiting() const;
size_t packetsWaiting() const { return _messageQueue.size(); };
// system callbacks (do not call)
/**
* @brief Sets max amount of bytes that could be written to client's socket while awaiting delivery acknowledge
* used to throttle message delivery length to tradeoff memory consumption
* @note actual amount of data written could possible be a bit larger but no more than available socket buff space
*
* @param value
*/
void set_max_inflight_bytes(size_t value);
/**
* @brief Get current max inflight bytes value
*
* @return size_t
*/
size_t get_max_inflight_bytes() const { return _max_inflight; }
// system callbacks (do not call if from user code!)
void _onAck(size_t len, uint32_t time);
void _onPoll();
void _onTimeout(uint32_t time);
void _onDisconnect();
};
/**
* @brief a class that maintains all connected HTTP clients subscribed to SSE delivery
* dispatches supplied messages to the client's queues
*
*/
class AsyncEventSource : public AsyncWebHandler {
private:
String _url;
@ -117,6 +216,9 @@ class AsyncEventSource : public AsyncWebHandler {
ArEventHandlerFunction _connectcb = nullptr;
ArEventHandlerFunction _disconnectcb = nullptr;
// this method manipulates in-fligh data size for connected client depending on number of active connections
void _adjust_inflight_window();
public:
typedef enum {
DISCARDED = 0,
@ -124,23 +226,47 @@ class AsyncEventSource : public AsyncWebHandler {
PARTIALLY_ENQUEUED = 2,
} SendStatus;
AsyncEventSource(const char* url) : _url(url) {};
AsyncEventSource(const String& url) : _url(url) {};
~AsyncEventSource() { close(); };
const char* url() const { return _url.c_str(); }
// close all connected clients
void close();
/**
* @brief set on-connect callback for the client
* used to deliver messages to client on first connect
*
* @param cb
*/
void onConnect(ArEventHandlerFunction cb) { _connectcb = cb; }
/**
* @brief Send an SSE message to client
* it will craft an SSE message and place it to all connected client's message queues
*
* @param message body string, could be single or multi-line string sepprated by \n, \r, \r\n
* @param event body string, a sinle line string
* @param id sequence id
* @param reconnect client's reconnect timeout
* @return SendStatus if message was placed in any/all/part of the client's queues
*/
SendStatus send(const char* message, const char* event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
SendStatus send(const String& message, const String& event, uint32_t id = 0, uint32_t reconnect = 0) { return send(message.c_str(), event.c_str(), id, reconnect); }
SendStatus send(const String& message, const char* event, uint32_t id = 0, uint32_t reconnect = 0) { return send(message.c_str(), event, id, reconnect); }
// The client pointer sent to the callback is only for reference purposes. DO NOT CALL ANY METHOD ON IT !
void onDisconnect(ArEventHandlerFunction cb) { _disconnectcb = cb; }
void authorizeConnect(ArAuthorizeConnectHandler cb);
SendStatus send(const String& message, const String& event, uint32_t id = 0, uint32_t reconnect = 0) { return send(message.c_str(), event.c_str(), id, reconnect); }
SendStatus send(const String& message, const char* event, uint32_t id = 0, uint32_t reconnect = 0) { return send(message.c_str(), event, id, reconnect); }
SendStatus send(const char* message, const char* event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
// number of clients connected
// returns number of connected clients
size_t count() const;
// returns average number of messages pending in all client's queues
size_t avgPacketsWaiting() const;
// system callbacks (do not call)
// system callbacks (do not call from user code!)
void _addClient(AsyncEventSourceClient* client);
void _handleDisconnect(AsyncEventSourceClient* client);
bool canHandle(AsyncWebServerRequest* request) const override final;
@ -149,7 +275,6 @@ class AsyncEventSource : public AsyncWebHandler {
class AsyncEventSourceResponse : public AsyncWebServerResponse {
private:
String _content;
AsyncEventSource* _server;
public:

View File

@ -65,6 +65,7 @@ namespace asyncsrv {
static constexpr const char* T_response = "response";
static constexpr const char* T_retry_ = "retry: ";
static constexpr const char* T_retry_after = "retry-after";
static constexpr const char* T_nn = "\n\n";
static constexpr const char* T_rn = "\r\n";
static constexpr const char* T_rnrn = "\r\n\r\n";
static constexpr const char* T_Transfer_Encoding = "transfer-encoding";