diff --git a/mqtt_client.c b/mqtt_client.c index e98fca0..22f6f15 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -43,21 +43,36 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client); static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client); -static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client) +/** + * @brief Processes error reported from transport layer (considering the message read status) + * + * @param err: Error reported from TCP transport + * @param client: MQTT client handle + * @param mid_message: True if the error occured when reading incomplete message + * + * @return - 0 on Timeout + * - -1 on Timeout with incomplete message + * - -2 on Error or EOF + */ +static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client, bool mid_message) { - if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) { - ESP_LOGD(TAG, "%s: transport_read(): EOF", __func__); - return 0; - } - if (err == ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT) { + if (mid_message) { + // No error message, because we could've read with timeout 0 (caller will decide) + return -1; + } + // Not an error, continue ESP_LOGD(TAG, "%s: transport_read(): call timed out before data was ready!", __func__); return 0; } + if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) { + ESP_LOGE(TAG, "%s: transport_read(): EOF", __func__); + } + ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno); esp_mqtt_client_dispatch_transport_error(client); - return -1; + return -2; } #if MQTT_ENABLE_SSL @@ -1094,7 +1109,7 @@ post_data_event: msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, client->config->network_timeout_ms); if (ret <= 0) { - return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL; + return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL; } msg_data_len = ret; @@ -1178,11 +1193,13 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_ /* * Returns: - * -1 in case of failure + * -2 in case of failure or EOF (clean connection closure) + * -1 timeout while in-the-middle of the messge * 0 if no message has been received * 1 if a message has been received and placed to client->mqtt_state: * message length: client->mqtt_state.message_length * message content: client->mqtt_state.in_buffer + * */ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms) { @@ -1198,7 +1215,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t */ read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms); if (read_len <= 0) { - return esp_mqtt_handle_transport_read_error(read_len, client); + return esp_mqtt_handle_transport_read_error(read_len, client, false); } ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf); /* @@ -1224,7 +1241,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t */ read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms); if (read_len <= 0) { - return esp_mqtt_handle_transport_read_error(read_len, client); + return esp_mqtt_handle_transport_read_error(read_len, client, true); } ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf); buf++; @@ -1245,7 +1262,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms); ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len); if (read_len <= 0) { - return esp_mqtt_handle_transport_read_error(read_len, client); + return esp_mqtt_handle_transport_read_error(read_len, client, true); } client->mqtt_state.in_buffer_read_len += read_len; buf += read_len; @@ -1276,7 +1293,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms); ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len); if (read_len <= 0) { - return esp_mqtt_handle_transport_read_error(read_len, client); + return esp_mqtt_handle_transport_read_error(read_len, client, true); } client->mqtt_state.in_buffer_read_len += read_len; if (client->mqtt_state.in_buffer_read_len < total_len) { @@ -1290,23 +1307,32 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t return 1; err: esp_mqtt_client_dispatch_transport_error(client); - return -1; + return -2; } static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) { uint8_t msg_type = 0, msg_qos = 0; uint16_t msg_id = 0; + size_t previous_in_buffer_read_len = client->mqtt_state.in_buffer_read_len; /* non-blocking receive in order not to block other tasks */ int recv = mqtt_message_receive(client, 0); - if (recv < 0) { + if (recv == 0) { // Timeout + return ESP_OK; + } + if (recv == -1) { // Mid-message timeout + if (previous_in_buffer_read_len == client->mqtt_state.in_buffer_read_len) { + // Report error only if didn't receive anything since previous iteration + ESP_LOGE(TAG, "%s: Network timeout while reading MQTT message", __func__); + return ESP_FAIL; + } + return ESP_OK; // Treat as standard timeout (keep reading the message) + } + if (recv < 0) { // Other error ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv); return ESP_FAIL; } - if (recv == 0) { - return ESP_OK; - } int read_len = client->mqtt_state.message_length; // If the message was valid, get the type, quality of service and id of the message