Merge branch 'bugfix/partial_transport_writes' into 'master'

Address partial writes and retain flags

See merge request espressif/esp-mqtt!99
This commit is contained in:
David Čermák
2021-05-26 15:56:08 +00:00
2 changed files with 27 additions and 14 deletions

View File

@ -50,6 +50,7 @@ typedef enum {
- data_len length of the data for this event
- current_data_offset offset of the current data for this event
- total_data_len total length of the data received
- retain retain flag of the message
Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is
longer than internal buffer. In that case only first event contains topic
pointer and length, other contain data only with current data length
@ -151,6 +152,7 @@ typedef struct {
int msg_id; /*!< MQTT messaged id of message */
int session_present; /*!< MQTT session_present flag for connection event */
esp_mqtt_error_codes_t *error_handle; /*!< esp-mqtt error handle including esp-tls errors as well as internal mqtt errors */
bool retain; /*!< Retained flag of the message associated with this event */
} esp_mqtt_event_t;
typedef esp_mqtt_event_t *esp_mqtt_event_handle_t;

View File

@ -577,9 +577,30 @@ static void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
client->config = NULL;
}
static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
{
int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length;
while (len > 0) {
wlen = esp_transport_write(client->transport,
(char *)client->mqtt_state.outbound_message->data + widx,
len,
client->config->network_timeout_ms);
if (wlen < 0) {
ESP_LOGE(TAG, "Writing failed: errno=%d", errno);
return ESP_FAIL;
} else if (wlen == 0) {
ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno);
return ESP_ERR_TIMEOUT;
}
widx += wlen;
len -= wlen;
}
return ESP_OK;
}
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms)
{
int write_len, read_len, connect_rsp_code;
int read_len, connect_rsp_code;
client->wait_for_ping_resp = false;
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
client->mqtt_state.connect_info);
@ -595,12 +616,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
write_len = esp_transport_write(client->transport,
(char *)client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length,
client->config->network_timeout_ms);
if (write_len < 0) {
ESP_LOGE(TAG, "Writing failed, errno= %d", errno);
if (esp_mqtt_write(client) != ESP_OK) {
return ESP_FAIL;
}
@ -881,14 +897,8 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
{
int write_len = esp_transport_write(client->transport,
(char *)client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length,
client->config->network_timeout_ms);
// client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
if (write_len <= 0) {
if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_client_dispatch_transport_error(client);
ESP_LOGE(TAG, "Error write data or timeout, written len = %d, errno=%d", write_len, errno);
return ESP_FAIL;
}
/* we've just sent a mqtt control packet, update keepalive counter
@ -946,6 +956,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
}
// post data event
client->event.retain = mqtt_get_retain(msg_buf);
client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;