feat(websocket): add separate tx lock for send and receive

This commit is contained in:
xutao
2025-07-17 12:45:10 +08:00
parent 462561b8d9
commit 250eebf3fc
2 changed files with 66 additions and 1 deletions

View File

@ -7,4 +7,17 @@ menu "ESP WebSocket client"
Enable this option will reallocated buffer when send or receive data and free them when end of use.
This can save about 2 KB memory when no websocket data send and receive.
config ESP_WS_CLIENT_SEPARATE_TX_LOCK
bool "Enable separate tx lock for send and receive data"
default n
help
Enable this option will use separate lock for send and receive data.
This can avoid the lock contention when send and receive data at the same time.
config ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS
int "TX lock timeout in milliseconds"
depends on ESP_WS_CLIENT_SEPARATE_TX_LOCK
default 2000
help
Timeout for acquiring the TX lock when using separate TX lock.
endmenu

View File

@ -39,6 +39,10 @@ static const char *TAG = "websocket_client";
#define WEBSOCKET_KEEP_ALIVE_INTERVAL (5)
#define WEBSOCKET_KEEP_ALIVE_COUNT (3)
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
#define WEBSOCKET_TX_LOCK_TIMEOUT_MS (CONFIG_ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS)
#endif
#define ESP_WS_CLIENT_MEM_CHECK(TAG, a, action) if (!(a)) { \
ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Memory exhausted"); \
action; \
@ -131,6 +135,9 @@ struct esp_websocket_client {
bool selected_for_destroying;
EventGroupHandle_t status_bits;
SemaphoreHandle_t lock;
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
SemaphoreHandle_t tx_lock;
#endif
size_t errormsg_size;
char *errormsg_buffer;
char *rx_buffer;
@ -441,6 +448,9 @@ static void destroy_and_free_resources(esp_websocket_client_handle_t client)
esp_transport_list_destroy(client->transport_list);
}
vSemaphoreDelete(client->lock);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
vSemaphoreDelete(client->tx_lock);
#endif
free(client->tx_buffer);
free(client->rx_buffer);
free(client->errormsg_buffer);
@ -610,10 +620,17 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
return -1;
}
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1;
}
#else
if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1;
}
#endif
if (esp_websocket_new_buf(client, true) != ESP_OK) {
ESP_LOGE(TAG, "Failed to setup tx buffer");
@ -653,7 +670,11 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
ret = widx;
unlock_and_return:
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#else
xSemaphoreGiveRecursive(client->lock);
#endif
return ret;
}
@ -689,6 +710,11 @@ esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_clie
client->lock = xSemaphoreCreateRecursiveMutex();
ESP_WS_CLIENT_MEM_CHECK(TAG, client->lock, goto _websocket_init_fail);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
client->tx_lock = xSemaphoreCreateRecursiveMutex();
ESP_WS_CLIENT_MEM_CHECK(TAG, client->tx_lock, goto _websocket_init_fail);
#endif
client->config = calloc(1, sizeof(websocket_config_storage_t));
ESP_WS_CLIENT_MEM_CHECK(TAG, client->config, goto _websocket_init_fail);
@ -967,8 +993,17 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
if (client->last_opcode == WS_TRANSPORT_OPCODES_PING) {
const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer;
ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
return ESP_FAIL;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
client->wait_for_pong_resp = false;
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_CLOSE) {
@ -1050,8 +1085,16 @@ static void esp_websocket_client_task(void *pv)
if (_tick_get_ms() - client->ping_tick_ms > client->config->ping_interval_sec * 1000) {
client->ping_tick_ms = _tick_get_ms();
ESP_LOGD(TAG, "Sending PING...");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
break;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PING | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
if (!client->wait_for_pong_resp && client->config->pingpong_timeout_sec) {
client->pingpong_tick_ms = _tick_get_ms();
client->wait_for_pong_resp = true;
@ -1086,7 +1129,16 @@ static void esp_websocket_client_task(void *pv)
// if closing not initiated by the client echo the close message back
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) {
ESP_LOGD(TAG, "Closing initiated by the server, sending close frame");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
break;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_CLOSE | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
xEventGroupSetBits(client->status_bits, CLOSE_FRAME_SENT_BIT);
}
break;