From 781fb51c6f2b0dfd01d7d0ef9a7d51802f87e8ab Mon Sep 17 00:00:00 2001 From: Phat Nguyen Date: Sat, 17 Feb 2024 17:19:29 +0700 Subject: [PATCH] Add mqtt client --- examples/ONE_I-9PSL/ONE_I-9PSL.ino | 186 +++++++++++++++++++++++++++- examples/Open_Air/Open_Air.ino | 190 ++++++++++++++++++++++++++++- 2 files changed, 372 insertions(+), 4 deletions(-) diff --git a/examples/ONE_I-9PSL/ONE_I-9PSL.ino b/examples/ONE_I-9PSL/ONE_I-9PSL.ino index 3d6273f..9e19de5 100644 --- a/examples/ONE_I-9PSL/ONE_I-9PSL.ino +++ b/examples/ONE_I-9PSL/ONE_I-9PSL.ino @@ -38,6 +38,7 @@ CC BY-SA 4.0 Attribution-ShareAlike 4.0 International License */ +#include "mqtt_client.h" #include #include #include @@ -430,6 +431,115 @@ private: }; AgServer agServer; +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, + int32_t event_id, void *event_data); + +class AgMqtt { +private: + bool _isBegin = false; + String uri; + String hostname; + String user; + String pass; + int port; + esp_mqtt_client_handle_t client; + bool clientConnected = false; + +public: + AgMqtt() {} + ~AgMqtt() {} + + /** + * @brief Initialize mqtt + * + * @param uri Complete mqtt uri, ex: + * mqtts://username:password@my.broker.com:4711 + * @return true Success + * @return false Failure + */ + bool begin(String uri) { + if (_isBegin) { + Serial.println("Mqtt already begin, call 'end' and try again"); + return true; + } + this->uri = uri; + Serial.printf("mqtt init '%s'\r\n", uri.c_str()); + + /** config esp_mqtt client */ + esp_mqtt_client_config_t config = { + .uri = this->uri.c_str(), + }; + + /** init client */ + client = esp_mqtt_client_init(&config); + if (client == NULL) { + Serial.println("mqtt client init failed"); + return false; + } + + /** Register event */ + if (esp_mqtt_client_register_event(client, MQTT_EVENT_ANY, + mqtt_event_handler, NULL) != ESP_OK) { + Serial.println("mqtt client register event failed"); + return false; + } + + if (esp_mqtt_client_start(client) != ESP_OK) { + Serial.println("mqtt client start failed"); + return false; + } + + _isBegin = true; + return true; + } + + /** + * @brief Deinitialize mqtt + * + */ + void end(void) { + if (_isBegin == false) { + return; + } + + esp_mqtt_client_disconnect(client); + esp_mqtt_client_stop(client); + esp_mqtt_client_destroy(client); + _isBegin = false; + + Serial.println("mqtt de-init"); + } + + bool publish(const char *topic, const char *payload, int len) { + if ((_isBegin == false) || (clientConnected == false)) { + return false; + } + + if (esp_mqtt_client_publish(client, topic, payload, len, 0, 0) == ESP_OK) { + return true; + } + return false; + } + + /** + * @brief Get current complete mqtt uri + * + * @return String + */ + String getUri(void) { return uri; } + + void _connectionHandler(bool connected) { clientConnected = connected; } + + /** + * @brief Mqtt client connect status + * + * @return true Connected + * @return false Disconnected or Not initialize + */ + bool isConnected(void) { return (_isBegin && clientConnected); } +}; +AgMqtt agMqtt; + /** Create airgradient instance for 'ONE_INDOOR' board */ AirGradient ag(ONE_INDOOR); @@ -490,6 +600,7 @@ AgSchedule tvocSchedule(SENSOR_TVOC_UPDATE_INTERVAL, tvocPoll); void setup() { /** Serial fore print debug message */ Serial.begin(115200); + delay(100); /** For bester show log */ showNr(); /** Init I2C */ @@ -1109,6 +1220,16 @@ static void serverConfigPoll(void) { ledTest(); } } + + String mqttUri = agServer.getMqttBroker(); + if (mqttUri != agMqtt.getUri()) { + agMqtt.end(); + if (agMqtt.begin(mqttUri)) { + Serial.println("Connect to new mqtt broker success"); + } else { + Serial.println("Connect to new mqtt broker failed"); + } + } } } @@ -1511,10 +1632,19 @@ static void sendDataToServer(void) { } root["boot"] = bootCount; - // NOTE Need determine offline mode to reset watchdog timer - if (agServer.postToServer(getDevId(), JSON.stringify(root))) { + String syncData = JSON.stringify(root); + if (agServer.postToServer(getDevId(), syncData)) { resetWatchdog(); } + + if (agMqtt.isConnected()) { + String topic = "airgradient/readings/" + getDevId(); + if (agMqtt.publish(topic.c_str(), syncData.c_str(), syncData.length())) { + Serial.println("Mqtt sync success"); + } else { + Serial.println("Mqtt sync failure"); + } + } bootCount++; } @@ -1533,3 +1663,55 @@ static void tempHumPoll(void) { Serial.println("Measure SHT failed"); } } + + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, + int32_t event_id, void *event_data) { + + ESP_LOGD(TAG, + "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", + base, event_id); + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data; + esp_mqtt_client_handle_t client = event->client; + int msg_id; + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + Serial.println("MQTT_EVENT_CONNECTED"); + // msg_id = esp_mqtt_client_subscribe(client, "helloworld", 0); + // Serial.printf("sent subscribe successful, msg_id=%d\r\n", msg_id); + agMqtt._connectionHandler(true); + break; + case MQTT_EVENT_DISCONNECTED: + Serial.println("MQTT_EVENT_DISCONNECTED"); + agMqtt._connectionHandler(false); + break; + case MQTT_EVENT_SUBSCRIBED: + break; + case MQTT_EVENT_UNSUBSCRIBED: + Serial.printf("MQTT_EVENT_UNSUBSCRIBED, msg_id=%d\r\n", event->msg_id); + break; + case MQTT_EVENT_PUBLISHED: + Serial.printf("MQTT_EVENT_PUBLISHED, msg_id=%d\r\n", event->msg_id); + break; + case MQTT_EVENT_DATA: + Serial.println("MQTT_EVENT_DATA"); + // add null terminal to data + // event->data[event->data_len] = 0; + // rpc_attritbutes_handler(event->data, event->data_len); + break; + case MQTT_EVENT_ERROR: + Serial.println("MQTT_EVENT_ERROR"); + if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { + Serial.printf("reported from esp-tls: %d", + event->error_handle->esp_tls_last_esp_err); + Serial.printf("reported from tls stack: %d", + event->error_handle->esp_tls_stack_err); + Serial.printf("captured as transport's socket errno: %d", + event->error_handle->esp_transport_sock_errno); + } + break; + default: + Serial.printf("Other event id:%d\r\n", event->event_id); + break; + } +} diff --git a/examples/Open_Air/Open_Air.ino b/examples/Open_Air/Open_Air.ino index 99d5002..bc1c867 100644 --- a/examples/Open_Air/Open_Air.ino +++ b/examples/Open_Air/Open_Air.ino @@ -35,6 +35,7 @@ CC BY-SA 4.0 Attribution-ShareAlike 4.0 International License */ +#include "mqtt_client.h" #include #include #include @@ -396,6 +397,117 @@ private: }; AgServer agServer; +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, + int32_t event_id, void *event_data); + +class AgMqtt { +private: + bool _isBegin = false; + String uri; + String hostname; + String user; + String pass; + int port; + esp_mqtt_client_handle_t client; + bool clientConnected = false; + +public: + AgMqtt() {} + ~AgMqtt() {} + + /** + * @brief Initialize mqtt + * + * @param uri Complete mqtt uri, ex: + * mqtts://username:password@my.broker.com:4711 + * @return true Success + * @return false Failure + */ + bool begin(String uri) { + if (_isBegin) { + Serial.println("Mqtt already begin, call 'end' and try again"); + return true; + } + this->uri = uri; + Serial.printf("mqtt init '%s'\r\n", uri.c_str()); + + /** config esp_mqtt client */ + esp_mqtt_client_config_t config = { + .uri = this->uri.c_str(), + }; + + /** init client */ + client = esp_mqtt_client_init(&config); + if (client == NULL) { + Serial.println("mqtt client init failed"); + return false; + } + + /** Register event */ + if (esp_mqtt_client_register_event( + client, MQTT_EVENT_ANY, + mqtt_event_handler, + NULL) != ESP_OK) { + Serial.println("mqtt client register event failed"); + return false; + } + + if (esp_mqtt_client_start(client) != ESP_OK) { + Serial.println("mqtt client start failed"); + return false; + } + + _isBegin = true; + return true; + } + + /** + * @brief Deinitialize mqtt + * + */ + void end(void) { + if (_isBegin == false) { + return; + } + + esp_mqtt_client_disconnect(client); + esp_mqtt_client_stop(client); + esp_mqtt_client_destroy(client); + _isBegin = false; + + Serial.println("mqtt de-init"); + } + + bool publish(const char *topic, const char *payload, int len) { + if ((_isBegin == false) || (clientConnected == false)) { + return false; + } + + if (esp_mqtt_client_publish(client, topic, payload, len, 0, 0) == ESP_OK) { + return true; + } + return false; + } + + /** + * @brief Get current complete mqtt uri + * + * @return String + */ + String getUri(void) { return uri; } + + void _connectionHandler(bool connected) { clientConnected = connected; } + + /** + * @brief Mqtt client connect status + * + * @return true Connected + * @return false Disconnected or Not initialize + */ + bool isConnected(void) { return (_isBegin && clientConnected); } +}; +AgMqtt agMqtt; + /** Create airgradient instance for 'OPEN_AIR_OUTDOOR' board */ AirGradient ag(OPEN_AIR_OUTDOOR); @@ -469,6 +581,7 @@ AgSchedule tvocSchedule(SENSOR_TVOC_UPDATE_INTERVAL, tvocPoll); void setup() { Serial.begin(115200); + delay(100); /** For bester show log */ showNr(); /** Board init */ @@ -551,7 +664,9 @@ static void sendDataToServer(void) { if (hum_1 >= 0) { root["rhum"] = hum_1; } - } else if (fw_mode == FW_MODE_PPT) { + } + + if ((fw_mode == FW_MODE_PPT) || (fw_mode == FW_MODE_PST)) { if (tvocIndex > 0) { root["tvoc_index"] = loopCount; } @@ -584,9 +699,19 @@ static void sendDataToServer(void) { } /** Send data to sensor */ - if (agServer.postToServer(getDevId(), JSON.stringify(root))) { + String syncData = JSON.stringify(root); + if (agServer.postToServer(getDevId(), syncData)) { resetWatchdog(); } + + if (agMqtt.isConnected()) { + String topic = "airgradient/readings/" + getDevId(); + if (agMqtt.publish(topic.c_str(), syncData.c_str(), syncData.length())) { + Serial.println("Mqtt sync success"); + } else { + Serial.println("Mqtt sync failure"); + } + } loopCount++; } @@ -867,6 +992,16 @@ static void serverConfigPoll(void) { } } } + + String mqttUri = agServer.getMqttBroker(); + if (mqttUri != agMqtt.getUri()) { + agMqtt.end(); + if (agMqtt.begin(mqttUri)) { + Serial.println("Connect to new mqtt broker success"); + } else { + Serial.println("Connect to new mqtt broker failed"); + } + } } } @@ -988,3 +1123,54 @@ static const char *getFwMode(int mode) { } static void showNr(void) { Serial.println("Serial nr: " + getDevId()); } + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, + int32_t event_id, void *event_data) { + + ESP_LOGD(TAG, + "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", + base, event_id); + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data; + esp_mqtt_client_handle_t client = event->client; + int msg_id; + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + Serial.println("MQTT_EVENT_CONNECTED"); + // msg_id = esp_mqtt_client_subscribe(client, "helloworld", 0); + // Serial.printf("sent subscribe successful, msg_id=%d\r\n", msg_id); + agMqtt._connectionHandler(true); + break; + case MQTT_EVENT_DISCONNECTED: + Serial.println("MQTT_EVENT_DISCONNECTED"); + agMqtt._connectionHandler(false); + break; + case MQTT_EVENT_SUBSCRIBED: + break; + case MQTT_EVENT_UNSUBSCRIBED: + Serial.printf("MQTT_EVENT_UNSUBSCRIBED, msg_id=%d\r\n", event->msg_id); + break; + case MQTT_EVENT_PUBLISHED: + Serial.printf("MQTT_EVENT_PUBLISHED, msg_id=%d\r\n", event->msg_id); + break; + case MQTT_EVENT_DATA: + Serial.println("MQTT_EVENT_DATA"); + // add null terminal to data + // event->data[event->data_len] = 0; + // rpc_attritbutes_handler(event->data, event->data_len); + break; + case MQTT_EVENT_ERROR: + Serial.println("MQTT_EVENT_ERROR"); + if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { + Serial.printf("reported from esp-tls: %d", + event->error_handle->esp_tls_last_esp_err); + Serial.printf("reported from tls stack: %d", + event->error_handle->esp_tls_stack_err); + Serial.printf("captured as transport's socket errno: %d", + event->error_handle->esp_transport_sock_errno); + } + break; + default: + Serial.printf("Other event id:%d\r\n", event->event_id); + break; + } +}