diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index f409003..52f0e9e 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -84,7 +84,7 @@ typedef struct mqtt_connect_info { char *password; char *will_topic; char *will_message; - int keepalive; /*!< keepalive=0 -> keepalive is disabled */ + int64_t keepalive; /*!< keepalive=0 -> keepalive is disabled */ int will_length; int will_qos; int will_retain; diff --git a/lib/include/platform_esp32_idf.h b/lib/include/platform_esp32_idf.h index 673e856..dc16040 100644 --- a/lib/include/platform_esp32_idf.h +++ b/lib/include/platform_esp32_idf.h @@ -9,11 +9,12 @@ #include "freertos/FreeRTOS.h" #include "freertos/event_groups.h" +#include #include char *platform_create_id_string(void); int platform_random(int max); -long long platform_tick_get_ms(void); +int64_t platform_tick_get_ms(void); void ms_to_timeval(int timeout_ms, struct timeval *tv); #define ESP_MEM_CHECK(TAG, a, action) if (!(a)) { \ diff --git a/lib/platform_esp32_idf.c b/lib/platform_esp32_idf.c index 9a50473..7ba6294 100644 --- a/lib/platform_esp32_idf.c +++ b/lib/platform_esp32_idf.c @@ -27,7 +27,7 @@ int platform_random(int max) return esp_random() % max; } -long long platform_tick_get_ms(void) +int64_t platform_tick_get_ms(void) { struct timeval te; gettimeofday(&te, NULL); // get current time diff --git a/mqtt_client.c b/mqtt_client.c index 0dbeba5..32c97f2 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1,4 +1,5 @@ #include +#include "esp_err.h" #include "platform.h" #include "esp_event.h" @@ -106,7 +107,7 @@ struct esp_mqtt_client { mqtt_connect_info_t connect_info; mqtt_client_state_t state; uint64_t refresh_connection_tick; - uint64_t keepalive_tick; + int64_t keepalive_tick; uint64_t reconnect_tick; int wait_timeout_ms; int auto_reconnect; @@ -601,6 +602,35 @@ static void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) client->config = NULL; } +static esp_err_t process_keepalive(esp_mqtt_client_handle_t client) +{ + if (client->connect_info.keepalive > 0) { + const int64_t time_since_last_keepalive = platform_tick_get_ms() - client->keepalive_tick; + const int64_t keepalive_ms = client->connect_info.keepalive * 1000; + + if (client->wait_for_ping_resp == true ) { + if ( time_since_last_keepalive > keepalive_ms) { + ESP_LOGE(TAG, "No PING_RESP, disconnected"); + esp_mqtt_abort_connection(client); + client->wait_for_ping_resp = false; + return ESP_FAIL; + } + return ESP_OK; + } + + if (time_since_last_keepalive > keepalive_ms / 2) { + if (esp_mqtt_client_ping(client) == ESP_FAIL) { + ESP_LOGE(TAG, "Can't send ping, disconnected"); + esp_mqtt_abort_connection(client); + return ESP_FAIL; + } + client->wait_for_ping_resp = true; + return ESP_OK; + } + } + return ESP_OK; +} + static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client) { int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length; @@ -927,10 +957,6 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) esp_mqtt_client_dispatch_transport_error(client); return ESP_FAIL; } - /* we've just sent a mqtt control packet, update keepalive counter - * [MQTT-3.1.2-23] - */ - client->keepalive_tick = platform_tick_get_ms(); return ESP_OK; } @@ -1332,6 +1358,12 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) case MQTT_MSG_TYPE_PINGRESP: ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); client->wait_for_ping_resp = false; + /* It is the responsibility of the Client to ensure that the interval between Control Packets + * being sent does not exceed the Keep Alive value. In the absence of sending any other Control + * Packets, the Client MUST send a PINGREQ Packet [MQTT-3.1.2-23]. + * [MQTT-3.1.2-23] + */ + client->keepalive_tick = platform_tick_get_ms(); break; } @@ -1483,23 +1515,8 @@ static void esp_mqtt_task(void *pv) } } - if (client->connect_info.keepalive && // connect_info.keepalive=0 means that the keepslive is disabled - platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) { - //No ping resp from last ping => Disconnected - if (client->wait_for_ping_resp) { - ESP_LOGE(TAG, "No PING_RESP, disconnected"); - esp_mqtt_abort_connection(client); - client->wait_for_ping_resp = false; - break; - } - if (esp_mqtt_client_ping(client) == ESP_FAIL) { - ESP_LOGE(TAG, "Can't send ping, disconnected"); - esp_mqtt_abort_connection(client); - break; - } else { - client->wait_for_ping_resp = true; - } - ESP_LOGD(TAG, "PING sent"); + if (process_keepalive(client) != ESP_OK) { + break; } if (client->config->refresh_connection_after_ms &&