diff --git a/include/mqtt_client.h b/include/mqtt_client.h index 640f7a3..089f2ac 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -50,6 +50,7 @@ typedef enum { - data_len length of the data for this event - current_data_offset offset of the current data for this event - total_data_len total length of the data received + - retain retain flag of the message Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is longer than internal buffer. In that case only first event contains topic pointer and length, other contain data only with current data length @@ -151,6 +152,7 @@ typedef struct { int msg_id; /*!< MQTT messaged id of message */ int session_present; /*!< MQTT session_present flag for connection event */ esp_mqtt_error_codes_t *error_handle; /*!< esp-mqtt error handle including esp-tls errors as well as internal mqtt errors */ + bool retain; /*!< Retained flag of the message associated with this event */ } esp_mqtt_event_t; typedef esp_mqtt_event_t *esp_mqtt_event_handle_t; diff --git a/mqtt_client.c b/mqtt_client.c index c158375..a71b1fb 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -577,9 +577,30 @@ static void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) client->config = NULL; } +static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client) +{ + int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length; + while (len > 0) { + wlen = esp_transport_write(client->transport, + (char *)client->mqtt_state.outbound_message->data + widx, + len, + client->config->network_timeout_ms); + if (wlen < 0) { + ESP_LOGE(TAG, "Writing failed: errno=%d", errno); + return ESP_FAIL; + } else if (wlen == 0) { + ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno); + return ESP_ERR_TIMEOUT; + } + widx += wlen; + len -= wlen; + } + return ESP_OK; +} + static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms) { - int write_len, read_len, connect_rsp_code; + int read_len, connect_rsp_code; client->wait_for_ping_resp = false; client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info); @@ -595,12 +616,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); - write_len = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->config->network_timeout_ms); - if (write_len < 0) { - ESP_LOGE(TAG, "Writing failed, errno= %d", errno); + if (esp_mqtt_write(client) != ESP_OK) { return ESP_FAIL; } @@ -881,14 +897,8 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) { - int write_len = esp_transport_write(client->transport, - (char *)client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->config->network_timeout_ms); - // client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - if (write_len <= 0) { + if (esp_mqtt_write(client) != ESP_OK) { esp_mqtt_client_dispatch_transport_error(client); - ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno); return ESP_FAIL; } /* we've just sent a mqtt control packet, update keepalive counter @@ -946,6 +956,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) } // post data event + client->event.retain = mqtt_get_retain(msg_buf); client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len); client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;