diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 630ba17..56d6aea 100755 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -45,6 +45,7 @@ typedef enum { - current_data_offset offset of the current data for this event - total_data_len total length of the data received */ + MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */ } esp_mqtt_event_id_t; typedef enum { @@ -103,6 +104,7 @@ typedef struct { const char *client_cert_pem; /*!< Pointer to certificate data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_key_pem` has to be provided. */ const char *client_key_pem; /*!< Pointer to private key data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_cert_pem` has to be provided. */ esp_mqtt_transport_t transport; /*!< overrides URI transport */ + int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */ } esp_mqtt_client_config_t; esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config); @@ -113,6 +115,7 @@ esp_err_t esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char esp_err_t esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic); int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client); +esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config); #ifdef __cplusplus } diff --git a/mqtt_client.c b/mqtt_client.c index 7ee0126..0f8656e 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -44,6 +44,7 @@ typedef struct { bool auto_reconnect; void *user_context; int network_timeout_ms; + int refresh_connection_after_ms; } mqtt_config_storage_t; typedef enum { @@ -61,8 +62,9 @@ struct esp_mqtt_client { mqtt_state_t mqtt_state; mqtt_connect_info_t connect_info; mqtt_client_state_t state; - long long keepalive_tick; - long long reconnect_tick; + uint64_t refresh_connection_tick; + uint64_t keepalive_tick; + uint64_t reconnect_tick; int wait_timeout_ms; int auto_reconnect; esp_mqtt_event_t event; @@ -75,98 +77,129 @@ struct esp_mqtt_client { const static int STOPPED_BIT = BIT0; static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client); -static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config); static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms); static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client); static char *create_string(const char *ptr, int len); -static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) +esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) { //Copy user configurations to client context esp_err_t err = ESP_OK; - mqtt_config_storage_t *cfg = calloc(1, sizeof(mqtt_config_storage_t)); - ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM); + mqtt_config_storage_t *cfg; + if (client->config) { + cfg = client->config; + } else { + cfg = calloc(1, sizeof(mqtt_config_storage_t)); + ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM); + client->config = cfg; + } + if (config->task_prio) { + cfg->task_prio = config->task_prio; + } - client->config = cfg; - - cfg->task_prio = config->task_prio; if (cfg->task_prio <= 0) { cfg->task_prio = MQTT_TASK_PRIORITY; } - - cfg->task_stack = config->task_stack; + if (config->task_stack) { + cfg->task_stack = config->task_stack; + } if (cfg->task_stack == 0) { cfg->task_stack = MQTT_TASK_STACK; } + if (config->port) { + cfg->port = config->port; + } + err = ESP_ERR_NO_MEM; if (config->host) { + free(cfg->host); cfg->host = strdup(config->host); ESP_MEM_CHECK(TAG, cfg->host, goto _mqtt_set_config_failed); } - cfg->port = config->port; if (config->username) { + free(client->connect_info.username); client->connect_info.username = strdup(config->username); ESP_MEM_CHECK(TAG, client->connect_info.username, goto _mqtt_set_config_failed); } if (config->password) { + free(client->connect_info.password); client->connect_info.password = strdup(config->password); ESP_MEM_CHECK(TAG, client->connect_info.password, goto _mqtt_set_config_failed); } if (config->client_id) { + free(client->connect_info.client_id); client->connect_info.client_id = strdup(config->client_id); - } else { + ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed); + } else if (client->connect_info.client_id == NULL) { client->connect_info.client_id = platform_create_id_string(); } ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed); ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id); if (config->uri) { + free(cfg->uri); cfg->uri = strdup(config->uri); ESP_MEM_CHECK(TAG, cfg->uri, goto _mqtt_set_config_failed); } if (config->lwt_topic) { + free(client->connect_info.will_topic); client->connect_info.will_topic = strdup(config->lwt_topic); ESP_MEM_CHECK(TAG, client->connect_info.will_topic, goto _mqtt_set_config_failed); } - if (config->lwt_msg_len) { + if (config->lwt_msg_len && config->lwt_msg) { + free(client->connect_info.will_message); client->connect_info.will_message = malloc(config->lwt_msg_len); ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); memcpy(client->connect_info.will_message, config->lwt_msg, config->lwt_msg_len); client->connect_info.will_length = config->lwt_msg_len; } else if (config->lwt_msg) { + free(client->connect_info.will_message); client->connect_info.will_message = strdup(config->lwt_msg); ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); client->connect_info.will_length = strlen(config->lwt_msg); } - - client->connect_info.will_qos = config->lwt_qos; - client->connect_info.will_retain = config->lwt_retain; - - client->connect_info.clean_session = 1; - if (config->disable_clean_session) { - client->connect_info.clean_session = false; + if (config->lwt_qos) { + client->connect_info.will_qos = config->lwt_qos; + } + if (config->lwt_retain) { + client->connect_info.will_retain = config->lwt_retain; + } + + if (config->disable_clean_session == client->connect_info.clean_session) { + client->connect_info.clean_session = !config->disable_clean_session; + } + if (config->keepalive) { + client->connect_info.keepalive = config->keepalive; } - client->connect_info.keepalive = config->keepalive; if (client->connect_info.keepalive == 0) { client->connect_info.keepalive = MQTT_KEEPALIVE_TICK; } cfg->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS; - cfg->user_context = config->user_context; - cfg->event_handle = config->event_handle; - cfg->auto_reconnect = true; - if (config->disable_auto_reconnect) { - cfg->auto_reconnect = false; + if (config->user_context) { + cfg->user_context = config->user_context; } + if (config->event_handle) { + cfg->event_handle = config->event_handle; + } - return err; + if (config->refresh_connection_after_ms) { + cfg->refresh_connection_after_ms = config->refresh_connection_after_ms; + } + + cfg->auto_reconnect = true; + if (config->disable_auto_reconnect == cfg->auto_reconnect) { + cfg->auto_reconnect = !config->disable_auto_reconnect; + } + + return ESP_OK; _mqtt_set_config_failed: esp_mqtt_destroy_config(client); return err; @@ -179,11 +212,13 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) free(cfg->uri); free(cfg->path); free(cfg->scheme); + memset(cfg, 0, sizeof(mqtt_config_storage_t)); free(client->connect_info.will_topic); free(client->connect_info.will_message); free(client->connect_info.client_id); free(client->connect_info.username); free(client->connect_info.password); + memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t)); free(client->config); return ESP_OK; } @@ -255,7 +290,7 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client) client->wait_timeout_ms = MQTT_RECONNECT_TIMEOUT_MS; client->reconnect_tick = platform_tick_get_ms(); client->state = MQTT_STATE_WAIT_TIMEOUT; - ESP_LOGI(TAG, "Reconnect after %d ms", client->wait_timeout_ms); + ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms); client->event.event_id = MQTT_EVENT_DISCONNECTED; client->wait_for_ping_resp = false; esp_mqtt_dispatch_event(client); @@ -335,6 +370,7 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co client->keepalive_tick = platform_tick_get_ms(); client->reconnect_tick = platform_tick_get_ms(); + client->refresh_connection_tick = platform_tick_get_ms(); client->wait_for_ping_resp = false; int buffer_size = config->buffer_size; if (buffer_size <= 0) { @@ -685,6 +721,9 @@ static void esp_mqtt_task(void *pv) switch ((int)client->state) { case MQTT_STATE_INIT: + client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; + esp_mqtt_dispatch_event(client); + if (client->transport == NULL) { ESP_LOGE(TAG, "There are no transport"); client->run = false; @@ -708,6 +747,7 @@ static void esp_mqtt_task(void *pv) client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); client->state = MQTT_STATE_CONNECTED; esp_mqtt_dispatch_event(client); + client->refresh_connection_tick = platform_tick_get_ms(); break; case MQTT_STATE_CONNECTED: @@ -735,6 +775,13 @@ static void esp_mqtt_task(void *pv) ESP_LOGD(TAG, "PING sent"); } + if (client->config->refresh_connection_after_ms && + platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) { + ESP_LOGD(TAG, "Refreshing the connection..."); + esp_mqtt_abort_connection(client); + client->state = MQTT_STATE_INIT; + } + //Delete mesaage after 30 senconds outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); //