From d4b66556186141e09db89c57781c6709e0e71d8a Mon Sep 17 00:00:00 2001 From: David Cermak Date: Wed, 30 Jan 2019 16:38:46 +0100 Subject: [PATCH 1/6] minor fixes for issues present with fragmented/packed data --- mqtt_client.c | 158 ++++++++++++++++++++++++-------------------------- 1 file changed, 77 insertions(+), 81 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 9903462..919189e 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -536,12 +536,9 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i } else { total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; mqtt_len = mqtt_data_length; - if (client->mqtt_state.message_length_read < client->mqtt_state.message_length) { - /* if message is framented -> correct the size for the first DATA event */ - mqtt_data_length = client->mqtt_state.message_length_read - ((uint8_t*)mqtt_data- message); - } + mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message); /* read msg id only once */ - client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + client->event.msg_id = mqtt_get_id(message, length); } } else { mqtt_len = len_read; @@ -555,7 +552,7 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i client->event.event_id = MQTT_EVENT_DATA; client->event.data = (char *)mqtt_data; client->event.data_len = mqtt_len; - client->event.total_data_len = total_mqtt_len; + client->event.total_data_len = mqtt_data_length; client->event.current_data_offset = mqtt_offset; client->event.topic = (char *)mqtt_topic; client->event.topic_len = mqtt_topic_length; @@ -660,88 +657,87 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) } // In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here - while ( transport_message_offset < read_len ){ - // If the message was valid, get the type, quality of service and id of the message - msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]); - msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]); - msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset); - ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); + while ( transport_message_offset < read_len ) { + // If the message was valid, get the type, quality of service and id of the message + msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]); + msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]); + msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset); + client->mqtt_state.message_length_read = read_len - transport_message_offset; + client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); - ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); - switch (msg_type) - { - case MQTT_MSG_TYPE_SUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { - ESP_LOGD(TAG, "Subscribe successful"); - client->event.event_id = MQTT_EVENT_SUBSCRIBED; - esp_mqtt_dispatch_event_with_msgid(client); - } - break; - case MQTT_MSG_TYPE_UNSUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { - ESP_LOGD(TAG, "UnSubscribe successful"); - client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; - esp_mqtt_dispatch_event_with_msgid(client); - } - break; - case MQTT_MSG_TYPE_PUBLISH: - if (msg_qos == 1) { - client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); - } - else if (msg_qos == 2) { - client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); - } + ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); - if (msg_qos == 1 || msg_qos == 2) { - ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); - - if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); - // TODO: Shoule reconnect? - // return ESP_FAIL; + switch (msg_type) + { + case MQTT_MSG_TYPE_SUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { + ESP_LOGD(TAG, "Subscribe successful"); + client->event.event_id = MQTT_EVENT_SUBSCRIBED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_UNSUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { + ESP_LOGD(TAG, "UnSubscribe successful"); + client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PUBLISH: + if (msg_qos == 1) { + client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + } + else if (msg_qos == 2) { + client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); } - } - // Deliver the publish message - client->mqtt_state.message_length_read = read_len - transport_message_offset; - client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); - ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); - deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); - break; - case MQTT_MSG_TYPE_PUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); - client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event_with_msgid(client); - } + if (msg_qos == 1 || msg_qos == 2) { + ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); - break; - case MQTT_MSG_TYPE_PUBREC: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); - client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); - mqtt_write_data(client); - break; - case MQTT_MSG_TYPE_PUBREL: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); - client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); - mqtt_write_data(client); + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); + // TODO: Shoule reconnect? + // return ESP_FAIL; + } + } + // Deliver the publish message + ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length); + deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); + break; + case MQTT_MSG_TYPE_PUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); + client->event.event_id = MQTT_EVENT_PUBLISHED; + esp_mqtt_dispatch_event_with_msgid(client); + } - break; - case MQTT_MSG_TYPE_PUBCOMP: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { - ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); - client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event_with_msgid(client); - } - break; - case MQTT_MSG_TYPE_PINGRESP: - ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); - client->wait_for_ping_resp = false; - break; - } + break; + case MQTT_MSG_TYPE_PUBREC: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); + client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_write_data(client); + break; + case MQTT_MSG_TYPE_PUBREL: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); + client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_write_data(client); - transport_message_offset += mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset) ; + break; + case MQTT_MSG_TYPE_PUBCOMP: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); + client->event.event_id = MQTT_EVENT_PUBLISHED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PINGRESP: + ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); + client->wait_for_ping_resp = false; + break; + } + + transport_message_offset += client->mqtt_state.message_length; } return ESP_OK; From 815623dfe5a0e41fa0e51ab4e336feb3eaa5ba15 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Sat, 2 Feb 2019 16:46:31 +0100 Subject: [PATCH 2/6] improvements on runtime configuration of uri closes https://github.com/espressif/esp-idf/issues/2870 --- mqtt_client.c | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 919189e..93b2219 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -365,11 +365,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co } } - if (client->config->scheme == NULL) { - client->config->scheme = create_string("mqtt", 4); - ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed); - } - client->keepalive_tick = platform_tick_get_ms(); client->reconnect_tick = platform_tick_get_ms(); client->refresh_connection_tick = platform_tick_get_ms(); @@ -432,17 +427,15 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u return ESP_FAIL; } - if (client->config->scheme == NULL) { - client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len); - } + // set uri overrides actual scheme, host, path if configured previously + free(client->config->scheme); + free(client->config->host); + free(client->config->path); - if (client->config->host == NULL) { - client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len); - } + client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len); + client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len); + client->config->path = create_string(uri + puri.field_data[UF_PATH].off, puri.field_data[UF_PATH].len); - if (client->config->path == NULL) { - client->config->path = create_string(uri + puri.field_data[UF_PATH].off, puri.field_data[UF_PATH].len); - } if (client->config->path) { esp_transport_handle_t trans = esp_transport_list_get_transport(client->transport_list, "ws"); if (trans) { From 6a0d1e7bff1e3085669d41ae3412c649db20108e Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 20 Dec 2018 21:46:55 +0100 Subject: [PATCH 3/6] support for qos1 and qos2 message retrasmission on reconnect --- lib/include/mqtt_msg.h | 1 + lib/include/mqtt_outbox.h | 12 +++++-- lib/mqtt_outbox.c | 30 ++++++++++++---- mqtt_client.c | 74 ++++++++++++++++++++++++++++++++------- 4 files changed, 96 insertions(+), 21 deletions(-) diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 931cbf9..1a07cae 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -106,6 +106,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> static inline int mqtt_get_connect_session_present(uint8_t* buffer) { return buffer[2] & 0x01; } static inline int mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; } static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } +static inline void mqtt_set_dup(uint8_t* buffer) { buffer[0] |= 0x08; } static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index 31dd5b9..b4f1e4f 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -21,21 +21,29 @@ typedef struct outbox_message { uint8_t *data; int len; int msg_id; + int msg_qos; int msg_type; uint8_t *remaining_data; int remaining_len; } outbox_message_t; +typedef enum pending_state { + QUEUED, + TRANSMITTED, + CONFIRMED +} pending_state_t; + outbox_handle_t outbox_init(); outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox); +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); +uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); esp_err_t outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout); -esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id); +esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); int outbox_get_size(outbox_handle_t outbox); esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size); void outbox_destroy(outbox_handle_t outbox); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 6d6efc5..403fd9c 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -14,9 +14,10 @@ typedef struct outbox_item { int len; int msg_id; int msg_type; + int msg_qos; int tick; int retry_count; - bool pending; + pending_state_t pending; STAILQ_ENTRY(outbox_item) next; } outbox_item_t; @@ -37,8 +38,10 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl ESP_MEM_CHECK(TAG, item, return NULL); item->msg_id = message->msg_id; item->msg_type = message->msg_type; + item->msg_qos = message->msg_qos; item->tick = tick; item->len = message->len; + item->pending = QUEUED; item->buffer = malloc(message->len + message->remaining_len); ESP_MEM_CHECK(TAG, item->buffer, { free(item); @@ -64,21 +67,34 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) return NULL; } -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox) +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending) { outbox_item_handle_t item; STAILQ_FOREACH(item, outbox, next) { - if (!item->pending) { + if (item->pending == pending) { return item; } } return NULL; } + +uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) +{ + if (item) { + *len = item->len; + *msg_id = item->msg_id; + *msg_type = item->msg_type; + *qos = item->msg_qos; + return (uint8_t*)item->buffer; + } + return NULL; +} + esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) { outbox_item_handle_t item, tmp; STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - if (item->msg_id == msg_id && item->msg_type == msg_type) { + if (item->msg_id == msg_id && (0xFF&(item->msg_type)) == msg_type) { STAILQ_REMOVE(outbox, item, outbox_item, next); free(item->buffer); free(item); @@ -102,11 +118,11 @@ esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id) } return ESP_OK; } -esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id) +esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending) { outbox_item_handle_t item = outbox_get(outbox, msg_id); if (item) { - item->pending = true; + item->pending = pending; return ESP_OK; } return ESP_FAIL; @@ -153,7 +169,7 @@ int outbox_get_size(outbox_handle_t outbox) esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size) { while(outbox_get_size(outbox) > max_size) { - outbox_item_handle_t item = outbox_dequeue(outbox); + outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED); if (item == NULL) { return ESP_FAIL; } diff --git a/mqtt_client.c b/mqtt_client.c index 93b2219..73d3624 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -200,7 +200,6 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl if (config->disable_auto_reconnect == cfg->auto_reconnect) { cfg->auto_reconnect = !config->disable_auto_reconnect; } - return ESP_OK; _mqtt_set_config_failed: esp_mqtt_destroy_config(client); @@ -305,7 +304,6 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co ESP_MEM_CHECK(TAG, client, return NULL); esp_mqtt_set_config(client, config); - client->transport_list = esp_transport_list_init(); ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed); @@ -605,6 +603,7 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; msg.remaining_data = remaining_data; msg.remaining_len = remaining_len; //Copy to queue buffer @@ -624,6 +623,7 @@ static void mqtt_enqueue(esp_mqtt_client_handle_t client) msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; //Copy to queue buffer outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } @@ -700,21 +700,21 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) case MQTT_MSG_TYPE_PUBACK: if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); + outbox_set_pending(client->outbox, msg_id, CONFIRMED); client->event.event_id = MQTT_EVENT_PUBLISHED; esp_mqtt_dispatch_event_with_msgid(client); } - break; case MQTT_MSG_TYPE_PUBREC: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + outbox_set_pending(client->outbox, msg_id, CONFIRMED); mqtt_write_data(client); break; case MQTT_MSG_TYPE_PUBREL: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); mqtt_write_data(client); - break; case MQTT_MSG_TYPE_PUBCOMP: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); @@ -736,9 +736,29 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_OK; } +static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item) +{ + // decode queued data + client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id, + &client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos); + // set duplicate flag for QoS-2 message + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) { + mqtt_set_dup(client->mqtt_state.outbound_message->data); + } + + // try to resend the data + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error to public data "); + esp_mqtt_abort_connection(client); + return ESP_FAIL; + } + return ESP_OK; +} + static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; + uint32_t last_retransmit = 0; client->run = true; //get transport by scheme @@ -796,6 +816,21 @@ static void esp_mqtt_task(void *pv) break; } + // resend all non-transmitted messages first + outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED); + if (item) { + if (mqtt_resend_queued(client, item) == ESP_OK) { + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); + } + // resend other "transmitted" messages after 1s + } else if (platform_tick_get_ms() - last_retransmit > 1000) { + last_retransmit = platform_tick_get_ms(); + item = outbox_dequeue(client->outbox, TRANSMITTED); + if (item) { + mqtt_resend_queued(client, item); + } + } + if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) { //No ping resp from last ping => Disconnected if(client->wait_for_ping_resp){ @@ -914,13 +949,13 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic ESP_LOGE(TAG, "Client has not connected"); return -1; } - mqtt_enqueue(client); //move pending msg to outbox (if have) client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, topic, qos, &client->mqtt_state.pending_msg_id); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; + mqtt_enqueue(client); //move pending msg to outbox (if have) if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); @@ -958,10 +993,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) { uint16_t pending_msg_id = 0; - if (client->state != MQTT_STATE_CONNECTED) { - ESP_LOGE(TAG, "Client has not connected"); - return -1; - } + if (len <= 0) { len = strlen(data); } @@ -973,15 +1005,23 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, /* We have to set as pending all the qos>0 messages) */ if (qos > 0) { - mqtt_enqueue(client); client->mqtt_state.outbound_message = publish_msg; client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; + client->mqtt_state.pending_publish_qos = qos; client->mqtt_state.pending_msg_count ++; + // by default store as QUEUED (not transmitted yet) + mqtt_enqueue(client); } else { client->mqtt_state.outbound_message = publish_msg; } + /* Skip sending if not connected (rely on resending) */ + if (client->state != MQTT_STATE_CONNECTED) { + ESP_LOGD(TAG, "Publish: client is not connected"); + goto cannot_publish; + } + /* Provide support for sending fragmented message if it doesn't fit buffer */ int remaining_len = len; const char *current_data = data; @@ -990,8 +1030,8 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, while (sending) { if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos); - return -1; + esp_mqtt_abort_connection(client); + goto cannot_publish; } int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; @@ -1028,7 +1068,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, sending = false; } } + + if (qos > 0) { + outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED); + } return pending_msg_id; + +cannot_publish: + if (qos == 0) { + ESP_LOGW(TAG, "Publishing qos0 data while client not connecting"); + } + return 0; } From 752953dc3be007cca4255b66a35d3087e61f6a54 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Fri, 21 Dec 2018 13:52:48 +0100 Subject: [PATCH 4/6] added mqtt api locks, so methods can be executed from user context closes #67, closes #90, closes https://github.com/espressif/esp-idf/issues/2975 --- include/mqtt_config.h | 17 +++++++---- mqtt_client.c | 65 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 67 insertions(+), 15 deletions(-) diff --git a/include/mqtt_config.h b/include/mqtt_config.h index c0b4ab7..972f2a1 100644 --- a/include/mqtt_config.h +++ b/include/mqtt_config.h @@ -10,6 +10,7 @@ #define MQTT_PROTOCOL_311 CONFIG_MQTT_PROTOCOL_311 #define MQTT_RECONNECT_TIMEOUT_MS (10*1000) +#define MQTT_POLL_READ_TIMEOUT_MS (1000) #if CONFIG_MQTT_BUFFER_SIZE #define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE @@ -61,14 +62,18 @@ #define MQTT_CORE_SELECTION_ENABLED CONFIG_MQTT_TASK_CORE_SELECTION_ENABLED +#ifdef CONFIG_MQTT_DISABLE_API_LOCKS +#define MQTT_DISABLE_API_LOCKS CONFIG_MQTT_DISABLE_API_LOCKS +#endif + #ifdef CONFIG_MQTT_USE_CORE_0 - #define MQTT_TASK_CORE 0 + #define MQTT_TASK_CORE 0 #else - #ifdef CONFIG_MQTT_USE_CORE_1 - #define MQTT_TASK_CORE 1 - #else - #define MQTT_TASK_CORE 0 - #endif + #ifdef CONFIG_MQTT_USE_CORE_1 + #define MQTT_TASK_CORE 1 + #else + #define MQTT_TASK_CORE 0 + #endif #endif diff --git a/mqtt_client.c b/mqtt_client.c index 73d3624..8fe31cc 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -13,6 +13,18 @@ /* using uri parser */ #include "http_parser.h" +#ifdef MQTT_DISABLE_API_LOCKS +# define MQTT_API_LOCK(c) +# define MQTT_API_UNLOCK(c) +# define MQTT_API_LOCK_FROM_OTHER_TASK(c) +# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) +#else +# define MQTT_API_LOCK(c) xSemaphoreTake(c->api_lock, portMAX_DELAY) +# define MQTT_API_UNLOCK(c) xSemaphoreGive(c->api_lock) +# define MQTT_API_LOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreTake(c->api_lock, portMAX_DELAY); } } +# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreGive(c->api_lock); } } +#endif /* MQTT_USE_API_LOCKS */ + static const char *TAG = "MQTT_CLIENT"; typedef struct mqtt_state @@ -72,6 +84,8 @@ struct esp_mqtt_client { bool wait_for_ping_resp; outbox_handle_t outbox; EventGroupHandle_t status_bits; + SemaphoreHandle_t api_lock; + TaskHandle_t task_handle; }; const static int STOPPED_BIT = BIT0; @@ -87,6 +101,7 @@ static char *create_string(const char *ptr, int len); esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) { + MQTT_API_LOCK(client); //Copy user configurations to client context esp_err_t err = ESP_OK; mqtt_config_storage_t *cfg; @@ -94,7 +109,10 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl cfg = client->config; } else { cfg = calloc(1, sizeof(mqtt_config_storage_t)); - ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM); + ESP_MEM_CHECK(TAG, cfg, { + MQTT_API_UNLOCK(client); + return ESP_ERR_NO_MEM; + }); client->config = cfg; } if (config->task_prio) { @@ -200,9 +218,11 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl if (config->disable_auto_reconnect == cfg->auto_reconnect) { cfg->auto_reconnect = !config->disable_auto_reconnect; } + MQTT_API_UNLOCK(client); return ESP_OK; _mqtt_set_config_failed: esp_mqtt_destroy_config(client); + MQTT_API_UNLOCK(client); return err; } @@ -302,8 +322,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co { esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client)); ESP_MEM_CHECK(TAG, client, return NULL); - + client->api_lock = xSemaphoreCreateMutex(); + if (!client->api_lock) { + free(client); + return NULL; + } esp_mqtt_set_config(client, config); + MQTT_API_LOCK(client); client->transport_list = esp_transport_list_init(); ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed); @@ -384,9 +409,11 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed); client->status_bits = xEventGroupCreate(); ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed); + MQTT_API_UNLOCK(client); return client; _mqtt_init_failed: esp_mqtt_client_destroy(client); + MQTT_API_UNLOCK(client); return NULL; } @@ -399,6 +426,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client) vEventGroupDelete(client->status_bits); free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); + vSemaphoreDelete(client->api_lock); free(client); return ESP_OK; } @@ -638,7 +666,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) uint16_t msg_id; uint32_t transport_message_offset = 0 ; - read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 1000); + read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0); if (read_len < 0) { ESP_LOGE(TAG, "Read error or end of stream"); @@ -776,7 +804,7 @@ static void esp_mqtt_task(void *pv) client->state = MQTT_STATE_INIT; xEventGroupClearBits(client->status_bits, STOPPED_BIT); while (client->run) { - + MQTT_API_LOCK(client); switch ((int)client->state) { case MQTT_STATE_INIT: xEventGroupClearBits(client->status_bits, RECONNECT_BIT); @@ -873,10 +901,20 @@ static void esp_mqtt_task(void *pv) ESP_LOGD(TAG, "Reconnecting..."); break; } + MQTT_API_UNLOCK(client); xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true, client->wait_timeout_ms / 2 / portTICK_RATE_MS); - break; + // continue the while loop insted of break, as the mutex is unlocked + continue; } + MQTT_API_UNLOCK(client); + if (MQTT_STATE_CONNECTED == client->state) { + if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) { + ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno); + esp_mqtt_abort_connection(client); + } + } + } esp_transport_close(client->transport); xEventGroupSetBits(client->status_bits, STOPPED_BIT); @@ -892,13 +930,13 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) } #if MQTT_CORE_SELECTION_ENABLED ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE); - if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL, MQTT_TASK_CORE) != pdTRUE) { + if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) { ESP_LOGE(TAG, "Error create mqtt task"); return ESP_FAIL; } #else ESP_LOGD(TAG, "Core selection disabled"); - if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL) != pdTRUE) { + if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) { ESP_LOGE(TAG, "Error create mqtt task"); return ESP_FAIL; } @@ -949,6 +987,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic ESP_LOGE(TAG, "Client has not connected"); return -1; } + MQTT_API_LOCK_FROM_OTHER_TASK(client); client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, topic, qos, &client->mqtt_state.pending_msg_id); @@ -959,10 +998,12 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return -1; } ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return client->mqtt_state.pending_msg_id; } @@ -972,7 +1013,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top ESP_LOGE(TAG, "Client has not connected"); return -1; } - mqtt_enqueue(client); + MQTT_API_LOCK_FROM_OTHER_TASK(client); client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection, topic, &client->mqtt_state.pending_msg_id); @@ -980,13 +1021,16 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; + mqtt_enqueue(client); if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return -1; } ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return client->mqtt_state.pending_msg_id; } @@ -998,6 +1042,7 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, len = strlen(data); } + MQTT_API_LOCK_FROM_OTHER_TASK(client); mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, topic, data, len, qos, retain, @@ -1072,12 +1117,14 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, if (qos > 0) { outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED); } + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return pending_msg_id; cannot_publish: if (qos == 0) { - ESP_LOGW(TAG, "Publishing qos0 data while client not connecting"); + ESP_LOGW(TAG, "Publish: Loosing qos0 data when client not connected"); } + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return 0; } From 51089629f79051d55bf7780771f88d51c5e9c2dc Mon Sep 17 00:00:00 2001 From: David Cermak Date: Tue, 5 Feb 2019 17:01:54 +0100 Subject: [PATCH 5/6] corrected outbox for oversized messge and qos1, added errno to error messages --- lib/include/mqtt_outbox.h | 2 +- lib/mqtt_outbox.c | 9 ++++++--- mqtt_client.c | 22 +++++++++++----------- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/include/mqtt_outbox.h b/lib/include/mqtt_outbox.h index b4f1e4f..c9422e9 100644 --- a/lib/include/mqtt_outbox.h +++ b/lib/include/mqtt_outbox.h @@ -35,7 +35,7 @@ typedef enum pending_state { outbox_handle_t outbox_init(); outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending); +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); diff --git a/lib/mqtt_outbox.c b/lib/mqtt_outbox.c index 403fd9c..933d55a 100644 --- a/lib/mqtt_outbox.c +++ b/lib/mqtt_outbox.c @@ -40,7 +40,7 @@ outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handl item->msg_type = message->msg_type; item->msg_qos = message->msg_qos; item->tick = tick; - item->len = message->len; + item->len = message->len + message->remaining_len; item->pending = QUEUED; item->buffer = malloc(message->len + message->remaining_len); ESP_MEM_CHECK(TAG, item->buffer, { @@ -67,11 +67,14 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) return NULL; } -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending) +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, int *tick) { outbox_item_handle_t item; STAILQ_FOREACH(item, outbox, next) { if (item->pending == pending) { + if (tick) { + *tick = item->tick; + } return item; } } @@ -169,7 +172,7 @@ int outbox_get_size(outbox_handle_t outbox) esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size) { while(outbox_get_size(outbox) > max_size) { - outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED); + outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED, NULL); if (item == NULL) { return ESP_FAIL; } diff --git a/mqtt_client.c b/mqtt_client.c index 8fe31cc..c409222 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -501,7 +501,7 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) client->config->network_timeout_ms); // client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); if (write_len <= 0) { - ESP_LOGE(TAG, "Error write data or timeout, written len = %d", write_len); + ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno); return ESP_FAIL; } /* we've just sent a mqtt control packet, update keepalive counter @@ -624,9 +624,6 @@ static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *rem client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); //lock mutex outbox_message_t msg = { 0 }; - if (client->mqtt_state.pending_msg_count > 0) { - client->mqtt_state.pending_msg_count --; - } msg.data = client->mqtt_state.outbound_message->data; msg.len = client->mqtt_state.outbound_message->length; msg.msg_id = client->mqtt_state.pending_msg_id; @@ -669,7 +666,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0); if (read_len < 0) { - ESP_LOGE(TAG, "Read error or end of stream"); + ESP_LOGE(TAG, "Read error or end of stream, errno:%d", errno); return ESP_FAIL; } @@ -787,6 +784,7 @@ static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; uint32_t last_retransmit = 0; + int32_t msg_tick = 0; client->run = true; //get transport by scheme @@ -845,7 +843,7 @@ static void esp_mqtt_task(void *pv) } // resend all non-transmitted messages first - outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED); + outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL); if (item) { if (mqtt_resend_queued(client, item) == ESP_OK) { outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); @@ -853,8 +851,8 @@ static void esp_mqtt_task(void *pv) // resend other "transmitted" messages after 1s } else if (platform_tick_get_ms() - last_retransmit > 1000) { last_retransmit = platform_tick_get_ms(); - item = outbox_dequeue(client->outbox, TRANSMITTED); - if (item) { + item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick); + if (item && (last_retransmit - msg_tick > 1000)) { mqtt_resend_queued(client, item); } } @@ -1048,15 +1046,17 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, qos, retain, &pending_msg_id); - /* We have to set as pending all the qos>0 messages) */ + /* We have to set as pending all the qos>0 messages */ if (qos > 0) { client->mqtt_state.outbound_message = publish_msg; client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; client->mqtt_state.pending_publish_qos = qos; client->mqtt_state.pending_msg_count ++; - // by default store as QUEUED (not transmitted yet) - mqtt_enqueue(client); + // by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer + if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) { + mqtt_enqueue(client); + } } else { client->mqtt_state.outbound_message = publish_msg; } From 3300338c854e79e4bb78b0f9f99e04d4cfcbf4b9 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Mon, 11 Feb 2019 14:51:39 +0100 Subject: [PATCH 6/6] tabs to spaces corrections --- mqtt_client.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index c409222..8503d78 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -859,20 +859,20 @@ static void esp_mqtt_task(void *pv) if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) { //No ping resp from last ping => Disconnected - if(client->wait_for_ping_resp){ - ESP_LOGE(TAG, "No PING_RESP, disconnected"); - esp_mqtt_abort_connection(client); - client->wait_for_ping_resp = false; - break; + if(client->wait_for_ping_resp){ + ESP_LOGE(TAG, "No PING_RESP, disconnected"); + esp_mqtt_abort_connection(client); + client->wait_for_ping_resp = false; + break; } - if (esp_mqtt_client_ping(client) == ESP_FAIL) { + if (esp_mqtt_client_ping(client) == ESP_FAIL) { ESP_LOGE(TAG, "Can't send ping, disconnected"); esp_mqtt_abort_connection(client); break; } else { - client->wait_for_ping_resp = true; + client->wait_for_ping_resp = true; } - ESP_LOGD(TAG, "PING sent"); + ESP_LOGD(TAG, "PING sent"); } if (client->config->refresh_connection_after_ms &&