feat(websocket): add separate tx lock for send and receive

This commit is contained in:
xutao
2025-07-17 12:45:10 +08:00
parent 462561b8d9
commit 250eebf3fc
2 changed files with 66 additions and 1 deletions

View File

@ -7,4 +7,17 @@ menu "ESP WebSocket client"
Enable this option will reallocated buffer when send or receive data and free them when end of use. Enable this option will reallocated buffer when send or receive data and free them when end of use.
This can save about 2 KB memory when no websocket data send and receive. This can save about 2 KB memory when no websocket data send and receive.
config ESP_WS_CLIENT_SEPARATE_TX_LOCK
bool "Enable separate tx lock for send and receive data"
default n
help
Enable this option will use separate lock for send and receive data.
This can avoid the lock contention when send and receive data at the same time.
config ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS
int "TX lock timeout in milliseconds"
depends on ESP_WS_CLIENT_SEPARATE_TX_LOCK
default 2000
help
Timeout for acquiring the TX lock when using separate TX lock.
endmenu endmenu

View File

@ -39,6 +39,10 @@ static const char *TAG = "websocket_client";
#define WEBSOCKET_KEEP_ALIVE_INTERVAL (5) #define WEBSOCKET_KEEP_ALIVE_INTERVAL (5)
#define WEBSOCKET_KEEP_ALIVE_COUNT (3) #define WEBSOCKET_KEEP_ALIVE_COUNT (3)
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
#define WEBSOCKET_TX_LOCK_TIMEOUT_MS (CONFIG_ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS)
#endif
#define ESP_WS_CLIENT_MEM_CHECK(TAG, a, action) if (!(a)) { \ #define ESP_WS_CLIENT_MEM_CHECK(TAG, a, action) if (!(a)) { \
ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Memory exhausted"); \ ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Memory exhausted"); \
action; \ action; \
@ -131,6 +135,9 @@ struct esp_websocket_client {
bool selected_for_destroying; bool selected_for_destroying;
EventGroupHandle_t status_bits; EventGroupHandle_t status_bits;
SemaphoreHandle_t lock; SemaphoreHandle_t lock;
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
SemaphoreHandle_t tx_lock;
#endif
size_t errormsg_size; size_t errormsg_size;
char *errormsg_buffer; char *errormsg_buffer;
char *rx_buffer; char *rx_buffer;
@ -441,6 +448,9 @@ static void destroy_and_free_resources(esp_websocket_client_handle_t client)
esp_transport_list_destroy(client->transport_list); esp_transport_list_destroy(client->transport_list);
} }
vSemaphoreDelete(client->lock); vSemaphoreDelete(client->lock);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
vSemaphoreDelete(client->tx_lock);
#endif
free(client->tx_buffer); free(client->tx_buffer);
free(client->rx_buffer); free(client->rx_buffer);
free(client->errormsg_buffer); free(client->errormsg_buffer);
@ -610,10 +620,17 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
return -1; return -1;
} }
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1;
}
#else
if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) { if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout); ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1; return -1;
} }
#endif
if (esp_websocket_new_buf(client, true) != ESP_OK) { if (esp_websocket_new_buf(client, true) != ESP_OK) {
ESP_LOGE(TAG, "Failed to setup tx buffer"); ESP_LOGE(TAG, "Failed to setup tx buffer");
@ -653,7 +670,11 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
ret = widx; ret = widx;
unlock_and_return: unlock_and_return:
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#else
xSemaphoreGiveRecursive(client->lock); xSemaphoreGiveRecursive(client->lock);
#endif
return ret; return ret;
} }
@ -689,6 +710,11 @@ esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_clie
client->lock = xSemaphoreCreateRecursiveMutex(); client->lock = xSemaphoreCreateRecursiveMutex();
ESP_WS_CLIENT_MEM_CHECK(TAG, client->lock, goto _websocket_init_fail); ESP_WS_CLIENT_MEM_CHECK(TAG, client->lock, goto _websocket_init_fail);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
client->tx_lock = xSemaphoreCreateRecursiveMutex();
ESP_WS_CLIENT_MEM_CHECK(TAG, client->tx_lock, goto _websocket_init_fail);
#endif
client->config = calloc(1, sizeof(websocket_config_storage_t)); client->config = calloc(1, sizeof(websocket_config_storage_t));
ESP_WS_CLIENT_MEM_CHECK(TAG, client->config, goto _websocket_init_fail); ESP_WS_CLIENT_MEM_CHECK(TAG, client->config, goto _websocket_init_fail);
@ -967,8 +993,17 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
if (client->last_opcode == WS_TRANSPORT_OPCODES_PING) { if (client->last_opcode == WS_TRANSPORT_OPCODES_PING) {
const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer; const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer;
ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len); ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
return ESP_FAIL;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len, esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
client->config->network_timeout_ms); client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) { } else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
client->wait_for_pong_resp = false; client->wait_for_pong_resp = false;
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_CLOSE) { } else if (client->last_opcode == WS_TRANSPORT_OPCODES_CLOSE) {
@ -1050,8 +1085,16 @@ static void esp_websocket_client_task(void *pv)
if (_tick_get_ms() - client->ping_tick_ms > client->config->ping_interval_sec * 1000) { if (_tick_get_ms() - client->ping_tick_ms > client->config->ping_interval_sec * 1000) {
client->ping_tick_ms = _tick_get_ms(); client->ping_tick_ms = _tick_get_ms();
ESP_LOGD(TAG, "Sending PING..."); ESP_LOGD(TAG, "Sending PING...");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
break;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PING | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms); esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PING | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
if (!client->wait_for_pong_resp && client->config->pingpong_timeout_sec) { if (!client->wait_for_pong_resp && client->config->pingpong_timeout_sec) {
client->pingpong_tick_ms = _tick_get_ms(); client->pingpong_tick_ms = _tick_get_ms();
client->wait_for_pong_resp = true; client->wait_for_pong_resp = true;
@ -1086,7 +1129,16 @@ static void esp_websocket_client_task(void *pv)
// if closing not initiated by the client echo the close message back // if closing not initiated by the client echo the close message back
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) { if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) {
ESP_LOGD(TAG, "Closing initiated by the server, sending close frame"); ESP_LOGD(TAG, "Closing initiated by the server, sending close frame");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
break;
}
#endif
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_CLOSE | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms); esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_CLOSE | WS_TRANSPORT_OPCODES_FIN, NULL, 0, client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#endif
xEventGroupSetBits(client->status_bits, CLOSE_FRAME_SENT_BIT); xEventGroupSetBits(client->status_bits, CLOSE_FRAME_SENT_BIT);
} }
break; break;