fix(websocket): Fix race conditions, memory leak, and data loss

- Add state check in abort_connection to prevent double-close
- Fix memory leak: free errormsg_buffer on disconnect
- Reset connection state on reconnect to prevent stale data
- Implement lock ordering for separate TX lock mode
- Read buffered data immediately after connection to prevent data loss
- Added sdkconfig.ci.tx_lock config
This commit is contained in:
surengab
2025-10-14 19:58:06 +04:00
parent 072781fcd9
commit 23ca97d5ec
2 changed files with 142 additions and 9 deletions

View File

@@ -241,9 +241,29 @@ static esp_err_t esp_websocket_client_dispatch_event(esp_websocket_client_handle
return esp_event_loop_run(client->event_handle, 0);
}
/**
* @brief Abort the WebSocket connection and initiate reconnection or shutdown
*
* @param client WebSocket client handle
* @param error_type Type of error that caused the abort
*
* @return ESP_OK on success, ESP_FAIL on failure
*
* @note PRECONDITION: client->lock MUST be held by the calling thread before calling this function.
* This function does NOT acquire the lock itself. Calling without the lock will result in
* race conditions and undefined behavior.
*/
static esp_err_t esp_websocket_client_abort_connection(esp_websocket_client_handle_t client, esp_websocket_error_type_t error_type)
{
ESP_WS_CLIENT_STATE_CHECK(TAG, client, return ESP_FAIL);
if (client->state == WEBSOCKET_STATE_CLOSING || client->state == WEBSOCKET_STATE_UNKNOW ||
client->state == WEBSOCKET_STATE_WAIT_TIMEOUT) {
ESP_LOGW(TAG, "Connection already closing/closed, skipping abort");
goto cleanup;
}
esp_transport_close(client->transport);
if (!client->config->auto_reconnect) {
@@ -256,6 +276,18 @@ static esp_err_t esp_websocket_client_abort_connection(esp_websocket_client_hand
}
client->error_handle.error_type = error_type;
esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DISCONNECTED, NULL, 0);
cleanup:
if (client->errormsg_buffer) {
ESP_LOGD(TAG, "Freeing error buffer (%d bytes) - Free heap before: %" PRIu32 " bytes",
client->errormsg_size, esp_get_free_heap_size());
free(client->errormsg_buffer);
client->errormsg_buffer = NULL;
client->errormsg_size = 0;
} else {
ESP_LOGD(TAG, "Disconnect - Free heap: %" PRIu32 " bytes", esp_get_free_heap_size());
}
return ESP_OK;
}
@@ -453,6 +485,8 @@ static void destroy_and_free_resources(esp_websocket_client_handle_t client)
esp_websocket_client_destroy_config(client);
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
client->transport_list = NULL;
client->transport = NULL;
}
vSemaphoreDelete(client->lock);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
@@ -671,6 +705,11 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
if (wlen < 0 || (wlen == 0 && need_write != 0)) {
ret = wlen;
esp_websocket_free_buf(client, true);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
#endif
esp_tls_error_handle_t error_handle = esp_transport_get_error_handle(client->transport);
if (error_handle) {
esp_websocket_client_error(client, "esp_transport_write() returned %d, transport_error=%s, tls_error_code=%i, tls_flags=%i, errno=%d",
@@ -679,8 +718,16 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
} else {
esp_websocket_client_error(client, "esp_transport_write() returned %d, errno=%d", ret, errno);
}
ESP_LOGD(TAG, "Calling abort_connection due to send error");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
xSemaphoreGiveRecursive(client->lock);
return ret;
#else
// Already holding client->lock, safe to call
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
goto unlock_and_return;
#endif
}
opcode = 0;
widx += wlen;
@@ -1019,7 +1066,6 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
esp_websocket_free_buf(client, false);
return ESP_OK;
}
esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DATA, client->rx_buffer, rlen);
client->payload_offset += rlen;
@@ -1030,15 +1076,35 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer;
ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->lock);
// Now acquire tx_lock with timeout (consistent with PING/CLOSE handling)
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for PONG", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before returning
esp_websocket_free_buf(client, false);
return ESP_FAIL;
}
#endif
// Re-acquire client->lock to maintain consistency
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
// Another thread may have closed it while we didn't hold client->lock
if (client->state == WEBSOCKET_STATE_CLOSING || client->state == WEBSOCKET_STATE_UNKNOW ||
client->state == WEBSOCKET_STATE_WAIT_TIMEOUT || client->transport == NULL) {
ESP_LOGW(TAG, "Transport closed while preparing PONG, skipping send");
xSemaphoreGiveRecursive(client->tx_lock);
esp_websocket_free_buf(client, false);
return ESP_OK; // Caller expects client->lock to be held, which it is
}
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#else
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
client->config->network_timeout_ms);
#endif
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
client->wait_for_pong_resp = false;
@@ -1136,7 +1202,29 @@ static void esp_websocket_client_task(void *pv)
client->state = WEBSOCKET_STATE_CONNECTED;
client->wait_for_pong_resp = false;
client->error_handle.error_type = WEBSOCKET_ERROR_TYPE_NONE;
client->payload_len = 0;
client->payload_offset = 0;
client->last_fin = false;
client->last_opcode = WS_TRANSPORT_OPCODES_NONE;
esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_CONNECTED, NULL, 0);
// Check if there is data pending to be read (e.g. piggybacked with handshake)
if (esp_transport_poll_read(client->transport, 0) > 0) {
esp_err_t recv_result = esp_websocket_client_recv(client);
if (recv_result == ESP_OK) {
xSemaphoreGiveRecursive(client->lock);
esp_event_loop_run(client->event_handle, 0);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
if (client->state != WEBSOCKET_STATE_CONNECTED || client->transport == NULL) {
ESP_LOGD(TAG, "Connection state changed during handshake data processing");
break;
}
} else if (recv_result == ESP_FAIL) {
ESP_LOGE(TAG, "Error receive data during initial connection");
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
}
}
break;
case WEBSOCKET_STATE_CONNECTED:
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) { // only send and check for PING
@@ -1145,8 +1233,23 @@ static void esp_websocket_client_task(void *pv)
client->ping_tick_ms = _tick_get_ms();
ESP_LOGD(TAG, "Sending PING...");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
// Release client->lock first to avoid deadlock with send error path
xSemaphoreGiveRecursive(client->lock);
// Now acquire tx_lock with timeout (consistent with PONG handling)
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for PING", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before break
break;
}
// Re-acquire client->lock to check state
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
// Another thread may have closed it while we didn't hold client->lock
if (client->state != WEBSOCKET_STATE_CONNECTED || client->transport == NULL) {
ESP_LOGW(TAG, "Transport closed while preparing PING, skipping send");
xSemaphoreGiveRecursive(client->tx_lock);
break;
}
#endif
@@ -1182,8 +1285,23 @@ static void esp_websocket_client_task(void *pv)
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) {
ESP_LOGD(TAG, "Closing initiated by the server, sending close frame");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
// Release client->lock first to avoid deadlock with send error path
xSemaphoreGiveRecursive(client->lock);
// Now acquire tx_lock with timeout (consistent with PONG/PING handling)
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for CLOSE", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before break
break;
}
// Re-acquire client->lock to check state
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
// Another thread may have closed it while we didn't hold client->lock
if (client->state != WEBSOCKET_STATE_CLOSING || client->transport == NULL) {
ESP_LOGW(TAG, "Transport closed while preparing CLOSE frame, skipping send");
xSemaphoreGiveRecursive(client->tx_lock);
break;
}
#endif
@@ -1202,6 +1320,7 @@ static void esp_websocket_client_task(void *pv)
if (WEBSOCKET_STATE_CONNECTED == client->state) {
read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
if (read_select < 0) {
xSemaphoreTakeRecursive(client->lock, lock_timeout);
esp_tls_error_handle_t error_handle = esp_transport_get_error_handle(client->transport);
if (error_handle) {
esp_websocket_client_error(client, "esp_transport_poll_read() returned %d, transport_error=%s, tls_error_code=%i, tls_flags=%i, errno=%d",
@@ -1210,16 +1329,15 @@ static void esp_websocket_client_task(void *pv)
} else {
esp_websocket_client_error(client, "esp_transport_poll_read() returned %d, errno=%d", read_select, errno);
}
xSemaphoreTakeRecursive(client->lock, lock_timeout);
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
xSemaphoreGiveRecursive(client->lock);
} else if (read_select > 0) {
xSemaphoreTakeRecursive(client->lock, lock_timeout);
if (esp_websocket_client_recv(client) == ESP_FAIL) {
ESP_LOGE(TAG, "Error receive data");
xSemaphoreTakeRecursive(client->lock, lock_timeout);
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
xSemaphoreGiveRecursive(client->lock);
}
xSemaphoreGiveRecursive(client->lock);
} else {
ESP_LOGV(TAG, "Read poll timeout: skipping esp_transport_poll_read().");
}

View File

@@ -0,0 +1,15 @@
CONFIG_IDF_TARGET="esp32"
CONFIG_IDF_TARGET_LINUX=n
CONFIG_WEBSOCKET_URI_FROM_STDIN=n
CONFIG_WEBSOCKET_URI_FROM_STRING=y
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
CONFIG_EXAMPLE_CONNECT_WIFI=n
CONFIG_EXAMPLE_USE_INTERNAL_ETHERNET=y
CONFIG_EXAMPLE_ETH_PHY_IP101=y
CONFIG_EXAMPLE_ETH_MDC_GPIO=23
CONFIG_EXAMPLE_ETH_MDIO_GPIO=18
CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK=y
CONFIG_ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS=2000