Add mqtt client

This commit is contained in:
Phat Nguyen
2024-02-17 17:19:29 +07:00
parent 17646f3067
commit 781fb51c6f
2 changed files with 372 additions and 4 deletions

View File

@ -35,6 +35,7 @@ CC BY-SA 4.0 Attribution-ShareAlike 4.0 International License
*/
#include "mqtt_client.h"
#include <AirGradient.h>
#include <Arduino_JSON.h>
#include <HTTPClient.h>
@ -396,6 +397,117 @@ private:
};
AgServer agServer;
static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
int32_t event_id, void *event_data);
class AgMqtt {
private:
bool _isBegin = false;
String uri;
String hostname;
String user;
String pass;
int port;
esp_mqtt_client_handle_t client;
bool clientConnected = false;
public:
AgMqtt() {}
~AgMqtt() {}
/**
* @brief Initialize mqtt
*
* @param uri Complete mqtt uri, ex:
* mqtts://username:password@my.broker.com:4711
* @return true Success
* @return false Failure
*/
bool begin(String uri) {
if (_isBegin) {
Serial.println("Mqtt already begin, call 'end' and try again");
return true;
}
this->uri = uri;
Serial.printf("mqtt init '%s'\r\n", uri.c_str());
/** config esp_mqtt client */
esp_mqtt_client_config_t config = {
.uri = this->uri.c_str(),
};
/** init client */
client = esp_mqtt_client_init(&config);
if (client == NULL) {
Serial.println("mqtt client init failed");
return false;
}
/** Register event */
if (esp_mqtt_client_register_event(
client, MQTT_EVENT_ANY,
mqtt_event_handler,
NULL) != ESP_OK) {
Serial.println("mqtt client register event failed");
return false;
}
if (esp_mqtt_client_start(client) != ESP_OK) {
Serial.println("mqtt client start failed");
return false;
}
_isBegin = true;
return true;
}
/**
* @brief Deinitialize mqtt
*
*/
void end(void) {
if (_isBegin == false) {
return;
}
esp_mqtt_client_disconnect(client);
esp_mqtt_client_stop(client);
esp_mqtt_client_destroy(client);
_isBegin = false;
Serial.println("mqtt de-init");
}
bool publish(const char *topic, const char *payload, int len) {
if ((_isBegin == false) || (clientConnected == false)) {
return false;
}
if (esp_mqtt_client_publish(client, topic, payload, len, 0, 0) == ESP_OK) {
return true;
}
return false;
}
/**
* @brief Get current complete mqtt uri
*
* @return String
*/
String getUri(void) { return uri; }
void _connectionHandler(bool connected) { clientConnected = connected; }
/**
* @brief Mqtt client connect status
*
* @return true Connected
* @return false Disconnected or Not initialize
*/
bool isConnected(void) { return (_isBegin && clientConnected); }
};
AgMqtt agMqtt;
/** Create airgradient instance for 'OPEN_AIR_OUTDOOR' board */
AirGradient ag(OPEN_AIR_OUTDOOR);
@ -469,6 +581,7 @@ AgSchedule tvocSchedule(SENSOR_TVOC_UPDATE_INTERVAL, tvocPoll);
void setup() {
Serial.begin(115200);
delay(100); /** For bester show log */
showNr();
/** Board init */
@ -551,7 +664,9 @@ static void sendDataToServer(void) {
if (hum_1 >= 0) {
root["rhum"] = hum_1;
}
} else if (fw_mode == FW_MODE_PPT) {
}
if ((fw_mode == FW_MODE_PPT) || (fw_mode == FW_MODE_PST)) {
if (tvocIndex > 0) {
root["tvoc_index"] = loopCount;
}
@ -584,9 +699,19 @@ static void sendDataToServer(void) {
}
/** Send data to sensor */
if (agServer.postToServer(getDevId(), JSON.stringify(root))) {
String syncData = JSON.stringify(root);
if (agServer.postToServer(getDevId(), syncData)) {
resetWatchdog();
}
if (agMqtt.isConnected()) {
String topic = "airgradient/readings/" + getDevId();
if (agMqtt.publish(topic.c_str(), syncData.c_str(), syncData.length())) {
Serial.println("Mqtt sync success");
} else {
Serial.println("Mqtt sync failure");
}
}
loopCount++;
}
@ -867,6 +992,16 @@ static void serverConfigPoll(void) {
}
}
}
String mqttUri = agServer.getMqttBroker();
if (mqttUri != agMqtt.getUri()) {
agMqtt.end();
if (agMqtt.begin(mqttUri)) {
Serial.println("Connect to new mqtt broker success");
} else {
Serial.println("Connect to new mqtt broker failed");
}
}
}
}
@ -988,3 +1123,54 @@ static const char *getFwMode(int mode) {
}
static void showNr(void) { Serial.println("Serial nr: " + getDevId()); }
static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
int32_t event_id, void *event_data) {
ESP_LOGD(TAG,
"Event dispatched from event loop base=%s, event_id=%" PRIi32 "",
base, event_id);
esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data;
esp_mqtt_client_handle_t client = event->client;
int msg_id;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
Serial.println("MQTT_EVENT_CONNECTED");
// msg_id = esp_mqtt_client_subscribe(client, "helloworld", 0);
// Serial.printf("sent subscribe successful, msg_id=%d\r\n", msg_id);
agMqtt._connectionHandler(true);
break;
case MQTT_EVENT_DISCONNECTED:
Serial.println("MQTT_EVENT_DISCONNECTED");
agMqtt._connectionHandler(false);
break;
case MQTT_EVENT_SUBSCRIBED:
break;
case MQTT_EVENT_UNSUBSCRIBED:
Serial.printf("MQTT_EVENT_UNSUBSCRIBED, msg_id=%d\r\n", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
Serial.printf("MQTT_EVENT_PUBLISHED, msg_id=%d\r\n", event->msg_id);
break;
case MQTT_EVENT_DATA:
Serial.println("MQTT_EVENT_DATA");
// add null terminal to data
// event->data[event->data_len] = 0;
// rpc_attritbutes_handler(event->data, event->data_len);
break;
case MQTT_EVENT_ERROR:
Serial.println("MQTT_EVENT_ERROR");
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
Serial.printf("reported from esp-tls: %d",
event->error_handle->esp_tls_last_esp_err);
Serial.printf("reported from tls stack: %d",
event->error_handle->esp_tls_stack_err);
Serial.printf("captured as transport's socket errno: %d",
event->error_handle->esp_transport_sock_errno);
}
break;
default:
Serial.printf("Other event id:%d\r\n", event->event_id);
break;
}
}