From a00a3134c67657949ab7ab5317133a49a855fe30 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Sun, 23 May 2021 15:43:07 +0200 Subject: [PATCH 1/2] Add support for Retain flag in messages posted by events Closes https://github.com/espressif/esp-mqtt/issues/193 --- include/mqtt_client.h | 2 ++ mqtt_client.c | 1 + 2 files changed, 3 insertions(+) diff --git a/include/mqtt_client.h b/include/mqtt_client.h index da98b85..e4a656c 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -50,6 +50,7 @@ typedef enum { - data_len length of the data for this event - current_data_offset offset of the current data for this event - total_data_len total length of the data received + - retain retain flag of the message Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is longer than internal buffer. In that case only first event contains topic pointer and length, other contain data only with current data length @@ -151,6 +152,7 @@ typedef struct { int msg_id; /*!< MQTT messaged id of message */ int session_present; /*!< MQTT session_present flag for connection event */ esp_mqtt_error_codes_t *error_handle; /*!< esp-mqtt error handle including esp-tls errors as well as internal mqtt errors */ + bool retain; /*!< Retained flag of the message associated with this event */ } esp_mqtt_event_t; typedef esp_mqtt_event_t *esp_mqtt_event_handle_t; diff --git a/mqtt_client.c b/mqtt_client.c index e67f4fe..6346575 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -923,6 +923,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) } // post data event + client->event.retain = mqtt_get_retain(msg_buf); client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len); client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len; From d8c9c7a9e75d82eaa766c27454e3a2282eb001cf Mon Sep 17 00:00:00 2001 From: David Cermak Date: Sun, 23 May 2021 11:50:20 +0200 Subject: [PATCH 2/2] Add support for partial transport writes Partially addresses https://github.com/espressif/esp-idf/issues/6940 --- mqtt_client.c | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index 6346575..e30b215 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -554,9 +554,30 @@ static void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) client->config = NULL; } +static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client) +{ + int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length; + while (len > 0) { + wlen = esp_transport_write(client->transport, + (char *)client->mqtt_state.outbound_message->data + widx, + len, + client->config->network_timeout_ms); + if (wlen < 0) { + ESP_LOGE(TAG, "Writing failed: errno=%d", errno); + return ESP_FAIL; + } else if (wlen == 0) { + ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno); + return ESP_ERR_TIMEOUT; + } + widx += wlen; + len -= wlen; + } + return ESP_OK; +} + static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms) { - int write_len, read_len, connect_rsp_code; + int read_len, connect_rsp_code; client->wait_for_ping_resp = false; client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info); @@ -572,12 +593,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); - write_len = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->config->network_timeout_ms); - if (write_len < 0) { - ESP_LOGE(TAG, "Writing failed, errno= %d", errno); + if (esp_mqtt_write(client) != ESP_OK) { return ESP_FAIL; } @@ -858,14 +874,8 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) { - int write_len = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->config->network_timeout_ms); - // client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - if (write_len <= 0) { + if (esp_mqtt_write(client) != ESP_OK) { esp_mqtt_client_dispatch_transport_error(client); - ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno); return ESP_FAIL; } /* we've just sent a mqtt control packet, update keepalive counter