mirror of
https://github.com/airgradienthq/arduino.git
synced 2025-07-20 04:02:08 +02:00
fix: Mqtt sending interval
This commit is contained in:
@ -93,6 +93,7 @@ enum {
|
|||||||
#define DISP_UPDATE_INTERVAL 5000 /** ms */
|
#define DISP_UPDATE_INTERVAL 5000 /** ms */
|
||||||
#define SERVER_CONFIG_UPDATE_INTERVAL 30000 /** ms */
|
#define SERVER_CONFIG_UPDATE_INTERVAL 30000 /** ms */
|
||||||
#define SERVER_SYNC_INTERVAL 60000 /** ms */
|
#define SERVER_SYNC_INTERVAL 60000 /** ms */
|
||||||
|
#define MQTT_SYNC_INTERVAL 60000 /** ms */
|
||||||
#define SENSOR_CO2_CALIB_COUNTDOWN_MAX 5 /** sec */
|
#define SENSOR_CO2_CALIB_COUNTDOWN_MAX 5 /** sec */
|
||||||
#define SENSOR_TVOC_UPDATE_INTERVAL 1000 /** ms */
|
#define SENSOR_TVOC_UPDATE_INTERVAL 1000 /** ms */
|
||||||
#define SENSOR_CO2_UPDATE_INTERVAL 5000 /** ms */
|
#define SENSOR_CO2_UPDATE_INTERVAL 5000 /** ms */
|
||||||
@ -631,6 +632,7 @@ public:
|
|||||||
int connectionFailedCount(void) { return connectFailedCount; }
|
int connectionFailedCount(void) { return connectFailedCount; }
|
||||||
};
|
};
|
||||||
AgMqtt agMqtt;
|
AgMqtt agMqtt;
|
||||||
|
static TaskHandle_t mqttTask = NULL;
|
||||||
|
|
||||||
/** Create airgradient instance for 'ONE_INDOOR' board */
|
/** Create airgradient instance for 'ONE_INDOOR' board */
|
||||||
AirGradient ag(ONE_INDOOR);
|
AirGradient ag(ONE_INDOOR);
|
||||||
@ -684,6 +686,7 @@ static void co2Poll(void);
|
|||||||
static void showNr(void);
|
static void showNr(void);
|
||||||
static void webServerInit(void);
|
static void webServerInit(void);
|
||||||
static String getServerSyncData(bool localServer);
|
static String getServerSyncData(bool localServer);
|
||||||
|
static void createMqttTask(void);
|
||||||
|
|
||||||
/** Init schedule */
|
/** Init schedule */
|
||||||
bool hasSensorS8 = true;
|
bool hasSensorS8 = true;
|
||||||
@ -754,6 +757,7 @@ void setup() {
|
|||||||
/** MQTT init */
|
/** MQTT init */
|
||||||
if (agServer.getMqttBroker().isEmpty() == false) {
|
if (agServer.getMqttBroker().isEmpty() == false) {
|
||||||
if (agMqtt.begin(agServer.getMqttBroker())) {
|
if (agMqtt.begin(agServer.getMqttBroker())) {
|
||||||
|
createMqttTask();
|
||||||
Serial.println("MQTT client init success");
|
Serial.println("MQTT client init success");
|
||||||
} else {
|
} else {
|
||||||
Serial.println("MQTT client init failure");
|
Serial.println("MQTT client init failure");
|
||||||
@ -965,6 +969,37 @@ static String getServerSyncData(bool localServer) {
|
|||||||
return JSON.stringify(root);
|
return JSON.stringify(root);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void createMqttTask(void) {
|
||||||
|
if (mqttTask) {
|
||||||
|
vTaskDelete(mqttTask);
|
||||||
|
mqttTask = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
xTaskCreate(
|
||||||
|
[](void *param) {
|
||||||
|
for (;;) {
|
||||||
|
delay(MQTT_SYNC_INTERVAL);
|
||||||
|
|
||||||
|
/** Send data */
|
||||||
|
if (agMqtt.isConnected()) {
|
||||||
|
String syncData = getServerSyncData(false);
|
||||||
|
String topic = "airgradient/readings/" + getDevId();
|
||||||
|
if (agMqtt.publish(topic.c_str(), syncData.c_str(),
|
||||||
|
syncData.length())) {
|
||||||
|
Serial.println("Mqtt sync success");
|
||||||
|
} else {
|
||||||
|
Serial.println("Mqtt sync failure");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"mqtt-task", 1024 * 3, NULL, 6, &mqttTask);
|
||||||
|
|
||||||
|
if (mqttTask == NULL) {
|
||||||
|
Serial.println("Creat mqttTask failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void sendPing() {
|
static void sendPing() {
|
||||||
JSONVar root;
|
JSONVar root;
|
||||||
root["wifi"] = WiFi.RSSI();
|
root["wifi"] = WiFi.RSSI();
|
||||||
@ -1518,8 +1553,14 @@ static void serverConfigPoll(void) {
|
|||||||
String mqttUri = agServer.getMqttBroker();
|
String mqttUri = agServer.getMqttBroker();
|
||||||
if (mqttUri != agMqtt.getUri()) {
|
if (mqttUri != agMqtt.getUri()) {
|
||||||
agMqtt.end();
|
agMqtt.end();
|
||||||
|
|
||||||
|
if (mqttTask != NULL) {
|
||||||
|
vTaskDelete(mqttTask);
|
||||||
|
mqttTask = NULL;
|
||||||
|
}
|
||||||
if (agMqtt.begin(mqttUri)) {
|
if (agMqtt.begin(mqttUri)) {
|
||||||
Serial.println("Connect to new mqtt broker success");
|
Serial.println("Connect to new mqtt broker success");
|
||||||
|
createMqttTask();
|
||||||
} else {
|
} else {
|
||||||
Serial.println("Connect to new mqtt broker failed");
|
Serial.println("Connect to new mqtt broker failed");
|
||||||
}
|
}
|
||||||
@ -1967,14 +2008,6 @@ static void sendDataToServer(void) {
|
|||||||
resetWatchdog();
|
resetWatchdog();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (agMqtt.isConnected()) {
|
|
||||||
String topic = "airgradient/readings/" + getDevId();
|
|
||||||
if (agMqtt.publish(topic.c_str(), syncData.c_str(), syncData.length())) {
|
|
||||||
Serial.println("Mqtt sync success");
|
|
||||||
} else {
|
|
||||||
Serial.println("Mqtt sync failure");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bootCount++;
|
bootCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,6 +92,7 @@ enum {
|
|||||||
#define DISP_UPDATE_INTERVAL 5000 /** ms */
|
#define DISP_UPDATE_INTERVAL 5000 /** ms */
|
||||||
#define SERVER_CONFIG_UPDATE_INTERVAL 30000 /** ms */
|
#define SERVER_CONFIG_UPDATE_INTERVAL 30000 /** ms */
|
||||||
#define SERVER_SYNC_INTERVAL 60000 /** ms */
|
#define SERVER_SYNC_INTERVAL 60000 /** ms */
|
||||||
|
#define MQTT_SYNC_INTERVAL 60000 /** ms */
|
||||||
#define SENSOR_CO2_CALIB_COUNTDOWN_MAX 5 /** sec */
|
#define SENSOR_CO2_CALIB_COUNTDOWN_MAX 5 /** sec */
|
||||||
#define SENSOR_TVOC_UPDATE_INTERVAL 1000 /** ms */
|
#define SENSOR_TVOC_UPDATE_INTERVAL 1000 /** ms */
|
||||||
#define SENSOR_CO2_UPDATE_INTERVAL 5000 /** ms */
|
#define SENSOR_CO2_UPDATE_INTERVAL 5000 /** ms */
|
||||||
@ -631,6 +632,7 @@ public:
|
|||||||
int connectionFailedCount(void) { return connectFailedCount; }
|
int connectionFailedCount(void) { return connectFailedCount; }
|
||||||
};
|
};
|
||||||
AgMqtt agMqtt;
|
AgMqtt agMqtt;
|
||||||
|
static TaskHandle_t mqttTask = NULL;
|
||||||
|
|
||||||
/** Create airgradient instance for 'OPEN_AIR_OUTDOOR' board */
|
/** Create airgradient instance for 'OPEN_AIR_OUTDOOR' board */
|
||||||
AirGradient ag(OPEN_AIR_OUTDOOR);
|
AirGradient ag(OPEN_AIR_OUTDOOR);
|
||||||
@ -701,6 +703,7 @@ static const char *getFwMode(int mode);
|
|||||||
static void showNr(void);
|
static void showNr(void);
|
||||||
static void webServerInit(void);
|
static void webServerInit(void);
|
||||||
static String getServerSyncData(bool localServer);
|
static String getServerSyncData(bool localServer);
|
||||||
|
static void createMqttTask(void);
|
||||||
|
|
||||||
bool hasSensorS8 = true;
|
bool hasSensorS8 = true;
|
||||||
bool hasSensorPMS1 = true;
|
bool hasSensorPMS1 = true;
|
||||||
@ -734,6 +737,7 @@ void setup() {
|
|||||||
/** MQTT init */
|
/** MQTT init */
|
||||||
if (agServer.getMqttBroker().isEmpty() == false) {
|
if (agServer.getMqttBroker().isEmpty() == false) {
|
||||||
if (agMqtt.begin(agServer.getMqttBroker())) {
|
if (agMqtt.begin(agServer.getMqttBroker())) {
|
||||||
|
createMqttTask();
|
||||||
Serial.println("MQTT client init success");
|
Serial.println("MQTT client init success");
|
||||||
} else {
|
} else {
|
||||||
Serial.println("MQTT client init failure");
|
Serial.println("MQTT client init failure");
|
||||||
@ -793,14 +797,6 @@ static void sendDataToServer(void) {
|
|||||||
resetWatchdog();
|
resetWatchdog();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (agMqtt.isConnected()) {
|
|
||||||
String topic = "airgradient/readings/" + getDevId();
|
|
||||||
if (agMqtt.publish(topic.c_str(), syncData.c_str(), syncData.length())) {
|
|
||||||
Serial.println("Mqtt sync success");
|
|
||||||
} else {
|
|
||||||
Serial.println("Mqtt sync failure");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
loopCount++;
|
loopCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1162,8 +1158,13 @@ static void serverConfigPoll(void) {
|
|||||||
String mqttUri = agServer.getMqttBroker();
|
String mqttUri = agServer.getMqttBroker();
|
||||||
if (mqttUri != agMqtt.getUri()) {
|
if (mqttUri != agMqtt.getUri()) {
|
||||||
agMqtt.end();
|
agMqtt.end();
|
||||||
|
if (mqttTask != NULL) {
|
||||||
|
vTaskDelete(mqttTask);
|
||||||
|
mqttTask = NULL;
|
||||||
|
}
|
||||||
if (agMqtt.begin(mqttUri)) {
|
if (agMqtt.begin(mqttUri)) {
|
||||||
Serial.println("Connect to new mqtt broker success");
|
Serial.println("Connect to new mqtt broker success");
|
||||||
|
createMqttTask();
|
||||||
} else {
|
} else {
|
||||||
Serial.println("Connect to new mqtt broker failed");
|
Serial.println("Connect to new mqtt broker failed");
|
||||||
}
|
}
|
||||||
@ -1418,6 +1419,37 @@ static String getServerSyncData(bool localServer) {
|
|||||||
return JSON.stringify(root);
|
return JSON.stringify(root);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void createMqttTask(void) {
|
||||||
|
if (mqttTask) {
|
||||||
|
vTaskDelete(mqttTask);
|
||||||
|
mqttTask = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
xTaskCreate(
|
||||||
|
[](void *param) {
|
||||||
|
for (;;) {
|
||||||
|
delay(MQTT_SYNC_INTERVAL);
|
||||||
|
|
||||||
|
/** Send data */
|
||||||
|
if (agMqtt.isConnected()) {
|
||||||
|
String syncData = getServerSyncData(false);
|
||||||
|
String topic = "airgradient/readings/" + getDevId();
|
||||||
|
if (agMqtt.publish(topic.c_str(), syncData.c_str(),
|
||||||
|
syncData.length())) {
|
||||||
|
Serial.println("Mqtt sync success");
|
||||||
|
} else {
|
||||||
|
Serial.println("Mqtt sync failure");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"mqtt-task", 1024 * 3, NULL, 6, &mqttTask);
|
||||||
|
|
||||||
|
if (mqttTask == NULL) {
|
||||||
|
Serial.println("Creat mqttTask failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
|
static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
|
||||||
int32_t event_id, void *event_data) {
|
int32_t event_id, void *event_data) {
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user