From 13f6c2c7473f39f3979231417269efdc71141ee8 Mon Sep 17 00:00:00 2001 From: Phat Nguyen Date: Mon, 26 Feb 2024 15:55:33 +0700 Subject: [PATCH] fix: Mqtt sending interval --- examples/ONE_I-9PSL/ONE_I-9PSL.ino | 51 ++++++++++++++++++++++++------ examples/Open_Air/Open_Air.ino | 48 +++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 17 deletions(-) diff --git a/examples/ONE_I-9PSL/ONE_I-9PSL.ino b/examples/ONE_I-9PSL/ONE_I-9PSL.ino index 44e5c28..5215177 100644 --- a/examples/ONE_I-9PSL/ONE_I-9PSL.ino +++ b/examples/ONE_I-9PSL/ONE_I-9PSL.ino @@ -93,6 +93,7 @@ enum { #define DISP_UPDATE_INTERVAL 5000 /** ms */ #define SERVER_CONFIG_UPDATE_INTERVAL 30000 /** ms */ #define SERVER_SYNC_INTERVAL 60000 /** ms */ +#define MQTT_SYNC_INTERVAL 60000 /** ms */ #define SENSOR_CO2_CALIB_COUNTDOWN_MAX 5 /** sec */ #define SENSOR_TVOC_UPDATE_INTERVAL 1000 /** ms */ #define SENSOR_CO2_UPDATE_INTERVAL 5000 /** ms */ @@ -631,6 +632,7 @@ public: int connectionFailedCount(void) { return connectFailedCount; } }; AgMqtt agMqtt; +static TaskHandle_t mqttTask = NULL; /** Create airgradient instance for 'ONE_INDOOR' board */ AirGradient ag(ONE_INDOOR); @@ -684,6 +686,7 @@ static void co2Poll(void); static void showNr(void); static void webServerInit(void); static String getServerSyncData(bool localServer); +static void createMqttTask(void); /** Init schedule */ bool hasSensorS8 = true; @@ -754,6 +757,7 @@ void setup() { /** MQTT init */ if (agServer.getMqttBroker().isEmpty() == false) { if (agMqtt.begin(agServer.getMqttBroker())) { + createMqttTask(); Serial.println("MQTT client init success"); } else { Serial.println("MQTT client init failure"); @@ -965,6 +969,37 @@ static String getServerSyncData(bool localServer) { return JSON.stringify(root); } +static void createMqttTask(void) { + if (mqttTask) { + vTaskDelete(mqttTask); + mqttTask = NULL; + } + + xTaskCreate( + [](void *param) { + for (;;) { + delay(MQTT_SYNC_INTERVAL); + + /** Send data */ + if (agMqtt.isConnected()) { + String syncData = getServerSyncData(false); + String topic = "airgradient/readings/" + getDevId(); + if (agMqtt.publish(topic.c_str(), syncData.c_str(), + syncData.length())) { + Serial.println("Mqtt sync success"); + } else { + Serial.println("Mqtt sync failure"); + } + } + } + }, + "mqtt-task", 1024 * 3, NULL, 6, &mqttTask); + + if (mqttTask == NULL) { + Serial.println("Creat mqttTask failed"); + } +} + static void sendPing() { JSONVar root; root["wifi"] = WiFi.RSSI(); @@ -1518,8 +1553,14 @@ static void serverConfigPoll(void) { String mqttUri = agServer.getMqttBroker(); if (mqttUri != agMqtt.getUri()) { agMqtt.end(); + + if (mqttTask != NULL) { + vTaskDelete(mqttTask); + mqttTask = NULL; + } if (agMqtt.begin(mqttUri)) { Serial.println("Connect to new mqtt broker success"); + createMqttTask(); } else { Serial.println("Connect to new mqtt broker failed"); } @@ -1966,15 +2007,7 @@ static void sendDataToServer(void) { if (agServer.postToServer(getDevId(), syncData)) { resetWatchdog(); } - - if (agMqtt.isConnected()) { - String topic = "airgradient/readings/" + getDevId(); - if (agMqtt.publish(topic.c_str(), syncData.c_str(), syncData.length())) { - Serial.println("Mqtt sync success"); - } else { - Serial.println("Mqtt sync failure"); - } - } + bootCount++; } diff --git a/examples/Open_Air/Open_Air.ino b/examples/Open_Air/Open_Air.ino index 6838aa7..1132333 100644 --- a/examples/Open_Air/Open_Air.ino +++ b/examples/Open_Air/Open_Air.ino @@ -92,6 +92,7 @@ enum { #define DISP_UPDATE_INTERVAL 5000 /** ms */ #define SERVER_CONFIG_UPDATE_INTERVAL 30000 /** ms */ #define SERVER_SYNC_INTERVAL 60000 /** ms */ +#define MQTT_SYNC_INTERVAL 60000 /** ms */ #define SENSOR_CO2_CALIB_COUNTDOWN_MAX 5 /** sec */ #define SENSOR_TVOC_UPDATE_INTERVAL 1000 /** ms */ #define SENSOR_CO2_UPDATE_INTERVAL 5000 /** ms */ @@ -631,6 +632,7 @@ public: int connectionFailedCount(void) { return connectFailedCount; } }; AgMqtt agMqtt; +static TaskHandle_t mqttTask = NULL; /** Create airgradient instance for 'OPEN_AIR_OUTDOOR' board */ AirGradient ag(OPEN_AIR_OUTDOOR); @@ -701,6 +703,7 @@ static const char *getFwMode(int mode); static void showNr(void); static void webServerInit(void); static String getServerSyncData(bool localServer); +static void createMqttTask(void); bool hasSensorS8 = true; bool hasSensorPMS1 = true; @@ -734,6 +737,7 @@ void setup() { /** MQTT init */ if (agServer.getMqttBroker().isEmpty() == false) { if (agMqtt.begin(agServer.getMqttBroker())) { + createMqttTask(); Serial.println("MQTT client init success"); } else { Serial.println("MQTT client init failure"); @@ -793,14 +797,6 @@ static void sendDataToServer(void) { resetWatchdog(); } - if (agMqtt.isConnected()) { - String topic = "airgradient/readings/" + getDevId(); - if (agMqtt.publish(topic.c_str(), syncData.c_str(), syncData.length())) { - Serial.println("Mqtt sync success"); - } else { - Serial.println("Mqtt sync failure"); - } - } loopCount++; } @@ -1162,8 +1158,13 @@ static void serverConfigPoll(void) { String mqttUri = agServer.getMqttBroker(); if (mqttUri != agMqtt.getUri()) { agMqtt.end(); + if (mqttTask != NULL) { + vTaskDelete(mqttTask); + mqttTask = NULL; + } if (agMqtt.begin(mqttUri)) { Serial.println("Connect to new mqtt broker success"); + createMqttTask(); } else { Serial.println("Connect to new mqtt broker failed"); } @@ -1418,6 +1419,37 @@ static String getServerSyncData(bool localServer) { return JSON.stringify(root); } +static void createMqttTask(void) { + if (mqttTask) { + vTaskDelete(mqttTask); + mqttTask = NULL; + } + + xTaskCreate( + [](void *param) { + for (;;) { + delay(MQTT_SYNC_INTERVAL); + + /** Send data */ + if (agMqtt.isConnected()) { + String syncData = getServerSyncData(false); + String topic = "airgradient/readings/" + getDevId(); + if (agMqtt.publish(topic.c_str(), syncData.c_str(), + syncData.length())) { + Serial.println("Mqtt sync success"); + } else { + Serial.println("Mqtt sync failure"); + } + } + } + }, + "mqtt-task", 1024 * 3, NULL, 6, &mqttTask); + + if (mqttTask == NULL) { + Serial.println("Creat mqttTask failed"); + } +} + static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {