refactor AsyncEventSourceMessage

This commit is contained in:
Emil Muratov
2024-06-26 21:01:52 +09:00
committed by Mathieu Carbou
parent bb4eb89c8e
commit 48968b5be5
2 changed files with 19 additions and 23 deletions

View File

@@ -183,30 +183,25 @@ AsyncEventSourceClient::~AsyncEventSourceClient(){
close();
}
void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){
if(dataMessage == NULL)
return;
if(!connected()){
delete dataMessage;
return;
}
void AsyncEventSourceClient::_queueMessage(const char * message, size_t len){
#ifdef ESP32
//length() is not thread-safe, thus acquiring the lock before this call..
std::lock_guard<std::mutex> lock(_lockmq);
#endif
if(_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES){
#ifdef ESP8266
ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
#else
log_e("Too many messages queued: deleting message");
#endif
delete dataMessage;
} else {
_messageQueue.emplace_back(dataMessage);
// runqueue trigger when new messages added
if(_client->canSend()) {
_runQueue();
}
return;
}
_messageQueue.emplace_back(message, len);
// runqueue trigger when new messages added
if(_client->canSend()) {
_runQueue();
}
}
@@ -216,8 +211,8 @@ void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
std::lock_guard<std::mutex> lock(_lockmq);
#endif
while(len && _messageQueue.size()){
len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished())
len = _messageQueue.front().ack(len, time);
if(_messageQueue.front().finished())
_messageQueue.pop_front();
}
_runQueue();
@@ -248,12 +243,14 @@ void AsyncEventSourceClient::close(){
}
void AsyncEventSourceClient::write(const char * message, size_t len){
_queueMessage(new AsyncEventSourceMessage(message, len));
if(!connected()) return;
_queueMessage(message, len);
}
void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
if(!connected()) return;
String ev = generateEventMessage(message, event, id, reconnect);
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
_queueMessage(ev.c_str(), ev.length());
}
size_t AsyncEventSourceClient::packetsWaiting() const {
@@ -267,8 +264,8 @@ void AsyncEventSourceClient::_runQueue() {
// Calls to this private method now already protected by _lockmq acquisition
// so no extra call of _lockmq.lock() here..
for ( auto &i : _messageQueue){
if (!i->sent())
i->send(_client);
if (!i.sent())
i.send(_client);
}
}

View File

@@ -79,12 +79,11 @@ class AsyncEventSourceClient {
AsyncClient *_client;
AsyncEventSource *_server;
uint32_t _lastId;
std::list< std::unique_ptr<AsyncEventSourceMessage> > _messageQueue;
// ArFi 2020-08-27 for protecting/serializing _messageQueue
std::list< AsyncEventSourceMessage > _messageQueue;
#ifdef ESP32
mutable std::mutex _lockmq;
#endif
void _queueMessage(AsyncEventSourceMessage *dataMessage);
void _queueMessage(const char * message, size_t len);
void _runQueue();
public: