forked from espressif/esp-mqtt
Add BEFORE_CONNECt event and refresh the connection option
This commit is contained in:
@ -45,6 +45,7 @@ typedef enum {
|
||||
- current_data_offset offset of the current data for this event
|
||||
- total_data_len total length of the data received
|
||||
*/
|
||||
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
|
||||
} esp_mqtt_event_id_t;
|
||||
|
||||
typedef enum {
|
||||
@ -103,6 +104,7 @@ typedef struct {
|
||||
const char *client_cert_pem; /*!< Pointer to certificate data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_key_pem` has to be provided. */
|
||||
const char *client_key_pem; /*!< Pointer to private key data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_cert_pem` has to be provided. */
|
||||
esp_mqtt_transport_t transport; /*!< overrides URI transport */
|
||||
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
|
||||
} esp_mqtt_client_config_t;
|
||||
|
||||
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config);
|
||||
@ -113,6 +115,7 @@ esp_err_t esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char
|
||||
esp_err_t esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic);
|
||||
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
|
||||
esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client);
|
||||
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
105
mqtt_client.c
105
mqtt_client.c
@ -44,6 +44,7 @@ typedef struct {
|
||||
bool auto_reconnect;
|
||||
void *user_context;
|
||||
int network_timeout_ms;
|
||||
int refresh_connection_after_ms;
|
||||
} mqtt_config_storage_t;
|
||||
|
||||
typedef enum {
|
||||
@ -61,8 +62,9 @@ struct esp_mqtt_client {
|
||||
mqtt_state_t mqtt_state;
|
||||
mqtt_connect_info_t connect_info;
|
||||
mqtt_client_state_t state;
|
||||
long long keepalive_tick;
|
||||
long long reconnect_tick;
|
||||
uint64_t refresh_connection_tick;
|
||||
uint64_t keepalive_tick;
|
||||
uint64_t reconnect_tick;
|
||||
int wait_timeout_ms;
|
||||
int auto_reconnect;
|
||||
esp_mqtt_event_t event;
|
||||
@ -75,98 +77,129 @@ struct esp_mqtt_client {
|
||||
const static int STOPPED_BIT = BIT0;
|
||||
|
||||
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
|
||||
static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config);
|
||||
static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client);
|
||||
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms);
|
||||
static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
|
||||
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
|
||||
static char *create_string(const char *ptr, int len);
|
||||
|
||||
static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
||||
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
|
||||
{
|
||||
//Copy user configurations to client context
|
||||
esp_err_t err = ESP_OK;
|
||||
mqtt_config_storage_t *cfg = calloc(1, sizeof(mqtt_config_storage_t));
|
||||
ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM);
|
||||
mqtt_config_storage_t *cfg;
|
||||
if (client->config) {
|
||||
cfg = client->config;
|
||||
} else {
|
||||
cfg = calloc(1, sizeof(mqtt_config_storage_t));
|
||||
ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM);
|
||||
client->config = cfg;
|
||||
}
|
||||
if (config->task_prio) {
|
||||
cfg->task_prio = config->task_prio;
|
||||
}
|
||||
|
||||
client->config = cfg;
|
||||
|
||||
cfg->task_prio = config->task_prio;
|
||||
if (cfg->task_prio <= 0) {
|
||||
cfg->task_prio = MQTT_TASK_PRIORITY;
|
||||
}
|
||||
|
||||
cfg->task_stack = config->task_stack;
|
||||
if (config->task_stack) {
|
||||
cfg->task_stack = config->task_stack;
|
||||
}
|
||||
if (cfg->task_stack == 0) {
|
||||
cfg->task_stack = MQTT_TASK_STACK;
|
||||
}
|
||||
if (config->port) {
|
||||
cfg->port = config->port;
|
||||
}
|
||||
|
||||
err = ESP_ERR_NO_MEM;
|
||||
if (config->host) {
|
||||
free(cfg->host);
|
||||
cfg->host = strdup(config->host);
|
||||
ESP_MEM_CHECK(TAG, cfg->host, goto _mqtt_set_config_failed);
|
||||
}
|
||||
cfg->port = config->port;
|
||||
|
||||
if (config->username) {
|
||||
free(client->connect_info.username);
|
||||
client->connect_info.username = strdup(config->username);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.username, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->password) {
|
||||
free(client->connect_info.password);
|
||||
client->connect_info.password = strdup(config->password);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.password, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->client_id) {
|
||||
free(client->connect_info.client_id);
|
||||
client->connect_info.client_id = strdup(config->client_id);
|
||||
} else {
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
|
||||
} else if (client->connect_info.client_id == NULL) {
|
||||
client->connect_info.client_id = platform_create_id_string();
|
||||
}
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
|
||||
ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id);
|
||||
|
||||
if (config->uri) {
|
||||
free(cfg->uri);
|
||||
cfg->uri = strdup(config->uri);
|
||||
ESP_MEM_CHECK(TAG, cfg->uri, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->lwt_topic) {
|
||||
free(client->connect_info.will_topic);
|
||||
client->connect_info.will_topic = strdup(config->lwt_topic);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_topic, goto _mqtt_set_config_failed);
|
||||
}
|
||||
|
||||
if (config->lwt_msg_len) {
|
||||
if (config->lwt_msg_len && config->lwt_msg) {
|
||||
free(client->connect_info.will_message);
|
||||
client->connect_info.will_message = malloc(config->lwt_msg_len);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
|
||||
memcpy(client->connect_info.will_message, config->lwt_msg, config->lwt_msg_len);
|
||||
client->connect_info.will_length = config->lwt_msg_len;
|
||||
} else if (config->lwt_msg) {
|
||||
free(client->connect_info.will_message);
|
||||
client->connect_info.will_message = strdup(config->lwt_msg);
|
||||
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
|
||||
client->connect_info.will_length = strlen(config->lwt_msg);
|
||||
}
|
||||
|
||||
client->connect_info.will_qos = config->lwt_qos;
|
||||
client->connect_info.will_retain = config->lwt_retain;
|
||||
|
||||
client->connect_info.clean_session = 1;
|
||||
if (config->disable_clean_session) {
|
||||
client->connect_info.clean_session = false;
|
||||
if (config->lwt_qos) {
|
||||
client->connect_info.will_qos = config->lwt_qos;
|
||||
}
|
||||
if (config->lwt_retain) {
|
||||
client->connect_info.will_retain = config->lwt_retain;
|
||||
}
|
||||
|
||||
if (config->disable_clean_session == client->connect_info.clean_session) {
|
||||
client->connect_info.clean_session = !config->disable_clean_session;
|
||||
}
|
||||
if (config->keepalive) {
|
||||
client->connect_info.keepalive = config->keepalive;
|
||||
}
|
||||
client->connect_info.keepalive = config->keepalive;
|
||||
if (client->connect_info.keepalive == 0) {
|
||||
client->connect_info.keepalive = MQTT_KEEPALIVE_TICK;
|
||||
}
|
||||
cfg->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS;
|
||||
cfg->user_context = config->user_context;
|
||||
cfg->event_handle = config->event_handle;
|
||||
cfg->auto_reconnect = true;
|
||||
if (config->disable_auto_reconnect) {
|
||||
cfg->auto_reconnect = false;
|
||||
if (config->user_context) {
|
||||
cfg->user_context = config->user_context;
|
||||
}
|
||||
|
||||
if (config->event_handle) {
|
||||
cfg->event_handle = config->event_handle;
|
||||
}
|
||||
|
||||
return err;
|
||||
if (config->refresh_connection_after_ms) {
|
||||
cfg->refresh_connection_after_ms = config->refresh_connection_after_ms;
|
||||
}
|
||||
|
||||
cfg->auto_reconnect = true;
|
||||
if (config->disable_auto_reconnect == cfg->auto_reconnect) {
|
||||
cfg->auto_reconnect = !config->disable_auto_reconnect;
|
||||
}
|
||||
|
||||
return ESP_OK;
|
||||
_mqtt_set_config_failed:
|
||||
esp_mqtt_destroy_config(client);
|
||||
return err;
|
||||
@ -179,11 +212,13 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
|
||||
free(cfg->uri);
|
||||
free(cfg->path);
|
||||
free(cfg->scheme);
|
||||
memset(cfg, 0, sizeof(mqtt_config_storage_t));
|
||||
free(client->connect_info.will_topic);
|
||||
free(client->connect_info.will_message);
|
||||
free(client->connect_info.client_id);
|
||||
free(client->connect_info.username);
|
||||
free(client->connect_info.password);
|
||||
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
|
||||
free(client->config);
|
||||
return ESP_OK;
|
||||
}
|
||||
@ -255,7 +290,7 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
|
||||
client->wait_timeout_ms = MQTT_RECONNECT_TIMEOUT_MS;
|
||||
client->reconnect_tick = platform_tick_get_ms();
|
||||
client->state = MQTT_STATE_WAIT_TIMEOUT;
|
||||
ESP_LOGI(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
|
||||
ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
|
||||
client->event.event_id = MQTT_EVENT_DISCONNECTED;
|
||||
client->wait_for_ping_resp = false;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
@ -335,6 +370,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
|
||||
|
||||
client->keepalive_tick = platform_tick_get_ms();
|
||||
client->reconnect_tick = platform_tick_get_ms();
|
||||
client->refresh_connection_tick = platform_tick_get_ms();
|
||||
client->wait_for_ping_resp = false;
|
||||
int buffer_size = config->buffer_size;
|
||||
if (buffer_size <= 0) {
|
||||
@ -685,6 +721,9 @@ static void esp_mqtt_task(void *pv)
|
||||
|
||||
switch ((int)client->state) {
|
||||
case MQTT_STATE_INIT:
|
||||
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
|
||||
if (client->transport == NULL) {
|
||||
ESP_LOGE(TAG, "There are no transport");
|
||||
client->run = false;
|
||||
@ -708,6 +747,7 @@ static void esp_mqtt_task(void *pv)
|
||||
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
|
||||
client->state = MQTT_STATE_CONNECTED;
|
||||
esp_mqtt_dispatch_event(client);
|
||||
client->refresh_connection_tick = platform_tick_get_ms();
|
||||
|
||||
break;
|
||||
case MQTT_STATE_CONNECTED:
|
||||
@ -735,6 +775,13 @@ static void esp_mqtt_task(void *pv)
|
||||
ESP_LOGD(TAG, "PING sent");
|
||||
}
|
||||
|
||||
if (client->config->refresh_connection_after_ms &&
|
||||
platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) {
|
||||
ESP_LOGD(TAG, "Refreshing the connection...");
|
||||
esp_mqtt_abort_connection(client);
|
||||
client->state = MQTT_STATE_INIT;
|
||||
}
|
||||
|
||||
//Delete mesaage after 30 senconds
|
||||
outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
|
||||
//
|
||||
|
Reference in New Issue
Block a user