MeasurementCycle queue only applied for cellular

Cellular post measures payload different with wifi
Update submodule to support different cellular post endpoint
This commit is contained in:
samuelbles07
2025-03-21 04:40:27 +07:00
parent 7c2aa35e4f
commit 30622fca99
6 changed files with 1748 additions and 334 deletions

View File

@ -63,9 +63,9 @@ CC BY-SA 4.0 Attribution-ShareAlike 4.0 International License
#define WIFI_SERVER_CONFIG_SYNC_INTERVAL 1 * 60000 /** ms */
#define WIFI_MEASUREMENT_INTERVAL 1 * 60000 /** ms */
#define WIFI_TRANSMISSION_INTERVAL 1 * 60000 /** ms */
#define CELLULAR_SERVER_CONFIG_SYNC_INTERVAL 15 * 60000 /** ms */
#define CELLULAR_SERVER_CONFIG_SYNC_INTERVAL 30 * 60000 /** ms */
#define CELLULAR_MEASUREMENT_INTERVAL 3 * 60000 /** ms */
#define CELLULAR_TRANSMISSION_INTERVAL 3 * 60000 /** ms */
#define CELLULAR_TRANSMISSION_INTERVAL 9 * 60000 /** ms */
#define MQTT_SYNC_INTERVAL 60000 /** ms */
#define SENSOR_CO2_CALIB_COUNTDOWN_MAX 5 /** sec */
#define SENSOR_TVOC_UPDATE_INTERVAL 1000 /** ms */
@ -75,6 +75,8 @@ CC BY-SA 4.0 Attribution-ShareAlike 4.0 International License
#define DISPLAY_DELAY_SHOW_CONTENT_MS 2000 /** ms */
#define FIRMWARE_CHECK_FOR_UPDATE_MS (60 * 60 * 1000) /** ms */
#define MAXIMUM_MEASUREMENT_CYCLE_QUEUE 80
/** I2C define */
#define I2C_SDA_PIN 7
#define I2C_SCL_PIN 6
@ -259,10 +261,19 @@ void setup() {
oledDisplay.setBrightness(configuration.getDisplayBrightness());
}
// Allocate queue memory to avoid always reallocation
measurementCycleQueue.reserve(10);
// Initialize mutex to access mesurementCycleQueue
mutexMeasurementCycleQueue = xSemaphoreCreateMutex();
if (networkOption == UseCellular) {
// If using cellular re-set scheduler interval
configSchedule.setPeriod(CELLULAR_SERVER_CONFIG_SYNC_INTERVAL);
transmissionSchedule.setPeriod(CELLULAR_TRANSMISSION_INTERVAL);
measurementSchedule.setPeriod(CELLULAR_MEASUREMENT_INTERVAL);
measurementSchedule.update();
// Queue now only applied for cellular
// Allocate queue memory to avoid always reallocation
measurementCycleQueue.reserve(10);
// Initialize mutex to access mesurementCycleQueue
mutexMeasurementCycleQueue = xSemaphoreCreateMutex();
}
// Only run network task if monitor is not in offline mode
if (configuration.isOfflineMode() == false) {
@ -283,12 +294,6 @@ void setup() {
Serial.println("Running monitor without connection to AirGradient server");
}
// If using cellular re-set scheduler interval
if (networkOption == UseCellular) {
configSchedule.setPeriod(CELLULAR_SERVER_CONFIG_SYNC_INTERVAL);
transmissionSchedule.setPeriod(CELLULAR_TRANSMISSION_INTERVAL);
measurementSchedule.setPeriod(CELLULAR_MEASUREMENT_INTERVAL);
}
}
void loop() {
@ -304,10 +309,8 @@ void loop() {
// Schedule to update display and led
dispLedSchedule.run();
// No need to run measurement cycle schedule when mode is offline or connection to AG disabled
if (configuration.isOfflineMode() == false ||
configuration.isCloudConnectionDisabled() == false) {
// Schedule to take new measurement cycle
if (networkOption == UseCellular) {
// Queue now only applied for cellular
measurementSchedule.run();
}
@ -975,10 +978,10 @@ void initializeNetwork() {
if (configuration.isCloudConnectionDisabled()) {
return;
}
}
// Send data for the first time to AG server at boot
sendDataToAg();
// Send data for the first time to AG server at boot
sendDataToAg();
}
std::string config = agClient->httpFetchConfig(ag->getDeviceId());
@ -1287,13 +1290,23 @@ static void updatePm(void) {
}
}
void sendDataToServer(void) {
if (configuration.isPostDataToAirGradient() == false) {
Serial.println("Skipping transmission of data to AG server, post data to server disabled");
agClient->resetPostMeasuresStatus();
return;
void postUsingWifi() {
// Increment bootcount when send measurements data is scheduled
int bootCount = measurements.bootCount() + 1;
measurements.setBootCount(bootCount);
String payload = measurements.toString(false, fwMode, wifiConnector.RSSI());
if (agClient->httpPostMeasures(ag->getDeviceId(), payload.c_str()) == false) {
Serial.println();
Serial.println("Online mode and isPostToAirGradient = true");
Serial.println();
}
// Log current free heap size
Serial.printf("Free heap: %u\n", ESP.getFreeHeap());
}
void postUsingCellular() {
// Aquire queue mutex to get queue size
xSemaphoreTake(mutexMeasurementCycleQueue, portMAX_DELAY);
@ -1305,36 +1318,44 @@ void sendDataToServer(void) {
return;
}
Serial.printf("Measurement cycle queue size %d\n", queueSize);
// Build payload include all measurements from queue
String payload;
payload += String(CELLULAR_MEASUREMENT_INTERVAL);
for (int i = 0; i < queueSize; i++) {
auto mc = measurementCycleQueue.at(i);
payload += ",";
payload += measurements.buildMeasurementPayload(mc);
}
// Release before actually post measures that might takes too long
xSemaphoreGive(mutexMeasurementCycleQueue);
delay(10); // Wait for a moment in case new measurement schedule wait for it
for (int i = 1; i <= queueSize; i++) {
Serial.printf("Attempt post measurement cycle from queue %d\n", i);
// Aquire queue mutex to get oldest in queue
xSemaphoreTake(mutexMeasurementCycleQueue, portMAX_DELAY);
// Attempt to send
Serial.println(payload);
if (agClient->httpPostMeasures(ag->getDeviceId(), payload.c_str()) == false) {
// Consider network has a problem, retry in next schedule
Serial.println("Post measures failed, retry in next schedule");
return;
}
// Get the oldest queue
auto mc = measurementCycleQueue.front();
// Post success, remove the data that previously sent from queue
xSemaphoreTake(mutexMeasurementCycleQueue, portMAX_DELAY);
measurementCycleQueue.erase(measurementCycleQueue.begin(),
measurementCycleQueue.begin() + queueSize);
xSemaphoreGive(mutexMeasurementCycleQueue);
}
// Release before actually post measures that might takes too long
xSemaphoreGive(mutexMeasurementCycleQueue);
void sendDataToServer(void) {
if (configuration.isPostDataToAirGradient() == false) {
Serial.println("Skipping transmission of data to AG server, post data to server disabled");
agClient->resetPostMeasuresStatus();
return;
}
String payload = measurements.buildMeasurementPayload(mc, fwMode);
if (agClient->httpPostMeasures(ag->getDeviceId(), payload.c_str()) == false) {
// Consider network has a problem, retry in next schedule
Serial.println("Post measures failed, retry in next schedule");
break;
}
Serial.println();
// Post success, remove the oldest queue
xSemaphoreTake(mutexMeasurementCycleQueue, portMAX_DELAY);
measurementCycleQueue.erase(measurementCycleQueue.begin());
xSemaphoreGive(mutexMeasurementCycleQueue);
// Wait a moment before post next in queue
delay(2000);
if (networkOption == UseWifi) {
postUsingWifi();
} else {
postUsingCellular();
}
}
@ -1469,17 +1490,18 @@ void networkingTask(void *args) {
}
void newMeasurementCycle() {
// TODO: Need to check max queue
if (xSemaphoreTake(mutexMeasurementCycleQueue, portMAX_DELAY) == pdTRUE) {
// Make sure queue not overflow
if (measurementCycleQueue.size() >= MAXIMUM_MEASUREMENT_CYCLE_QUEUE) {
// Remove the oldest data from queue if queue reach max
measurementCycleQueue.erase(measurementCycleQueue.begin());
}
Measurements::MeasurementCycle mc = measurements.getMeasurementCycle();
mc.wifi = wifiConnector.RSSI();
measurementCycleQueue.push_back(mc);
Serial.println("New measurement cycle added to queue");
// Release mutex
xSemaphoreGive(mutexMeasurementCycleQueue);
// Increment bootcount for the next measurement cycle
int bootCount = measurements.bootCount() + 1;
measurements.setBootCount(bootCount);
// Log current free heap size
Serial.printf("Free heap: %u\n", ESP.getFreeHeap());
}