mirror of
https://github.com/espressif/esp-protocols.git
synced 2025-06-25 17:31:33 +02:00
fix(websocket): Fix locking issues of esp_websocket_client_send_with_exact_opcode
API
Extended examples to cover more cases Added new config CONFIG_ESP_WS_CLIENT_ENABLE_DYNAMIC_BUFFER for testing
This commit is contained in:
@ -555,9 +555,29 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
|
|||||||
int wlen = 0, widx = 0;
|
int wlen = 0, widx = 0;
|
||||||
bool contained_fin = opcode & WS_TRANSPORT_OPCODES_FIN;
|
bool contained_fin = opcode & WS_TRANSPORT_OPCODES_FIN;
|
||||||
|
|
||||||
|
if (client == NULL || len < 0 || (data == NULL && len > 0)) {
|
||||||
|
ESP_LOGE(TAG, "Invalid arguments");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!esp_websocket_client_is_connected(client)) {
|
||||||
|
ESP_LOGE(TAG, "Websocket client is not connected");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (client->transport == NULL) {
|
||||||
|
ESP_LOGE(TAG, "Invalid transport");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
|
||||||
|
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (esp_websocket_new_buf(client, true) != ESP_OK) {
|
if (esp_websocket_new_buf(client, true) != ESP_OK) {
|
||||||
ESP_LOGE(TAG, "Failed to setup tx buffer");
|
ESP_LOGE(TAG, "Failed to setup tx buffer");
|
||||||
return -1;
|
goto unlock_and_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (widx < len || opcode) { // allow for sending "current_opcode" only message with len==0
|
while (widx < len || opcode) { // allow for sending "current_opcode" only message with len==0
|
||||||
@ -583,14 +603,18 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
|
|||||||
esp_websocket_client_error(client, "esp_transport_write() returned %d, errno=%d", ret, errno);
|
esp_websocket_client_error(client, "esp_transport_write() returned %d, errno=%d", ret, errno);
|
||||||
}
|
}
|
||||||
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
|
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
|
||||||
return ret;
|
goto unlock_and_return;
|
||||||
}
|
}
|
||||||
opcode = 0;
|
opcode = 0;
|
||||||
widx += wlen;
|
widx += wlen;
|
||||||
need_write = len - widx;
|
need_write = len - widx;
|
||||||
}
|
}
|
||||||
esp_websocket_free_buf(client, true);
|
esp_websocket_free_buf(client, true);
|
||||||
return widx;
|
ret = widx;
|
||||||
|
|
||||||
|
unlock_and_return:
|
||||||
|
xSemaphoreGiveRecursive(client->lock);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_client_config_t *config)
|
esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_client_config_t *config)
|
||||||
@ -1211,35 +1235,7 @@ int esp_websocket_client_send_fin(esp_websocket_client_handle_t client, TickType
|
|||||||
|
|
||||||
int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, const uint8_t *data, int len, TickType_t timeout)
|
int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, const uint8_t *data, int len, TickType_t timeout)
|
||||||
{
|
{
|
||||||
int ret = -1;
|
return esp_websocket_client_send_with_exact_opcode(client, opcode | WS_TRANSPORT_OPCODES_FIN, data, len, timeout);
|
||||||
if (client == NULL || len < 0 || (data == NULL && len > 0)) {
|
|
||||||
ESP_LOGE(TAG, "Invalid arguments");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
|
|
||||||
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!esp_websocket_client_is_connected(client)) {
|
|
||||||
ESP_LOGE(TAG, "Websocket client is not connected");
|
|
||||||
goto unlock_and_return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (client->transport == NULL) {
|
|
||||||
ESP_LOGE(TAG, "Invalid transport");
|
|
||||||
goto unlock_and_return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = esp_websocket_client_send_with_exact_opcode(client, opcode | WS_TRANSPORT_OPCODES_FIN, data, len, timeout);
|
|
||||||
if (ret < 0) {
|
|
||||||
ESP_LOGE(TAG, "Failed to send the buffer");
|
|
||||||
goto unlock_and_return;
|
|
||||||
}
|
|
||||||
unlock_and_return:
|
|
||||||
xSemaphoreGiveRecursive(client->lock);
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool esp_websocket_client_is_connected(esp_websocket_client_handle_t client)
|
bool esp_websocket_client_is_connected(esp_websocket_client_handle_t client)
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
|
* SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD
|
||||||
*
|
*
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
*/
|
*/
|
||||||
@ -96,14 +96,24 @@ static void websocket_app_start(void)
|
|||||||
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
ESP_LOGI(TAG, "Sending fragmented message");
|
// Sending text data
|
||||||
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
ESP_LOGI(TAG, "Sending fragmented text message");
|
||||||
memset(data, 'a', sizeof(data));
|
memset(data, 'a', sizeof(data));
|
||||||
esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY);
|
esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY);
|
||||||
memset(data, 'b', sizeof(data));
|
memset(data, 'b', sizeof(data));
|
||||||
esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY);
|
esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY);
|
||||||
esp_websocket_client_send_fin(client, portMAX_DELAY);
|
esp_websocket_client_send_fin(client, portMAX_DELAY);
|
||||||
|
|
||||||
|
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
||||||
|
// Sending binary data
|
||||||
|
ESP_LOGI(TAG, "Sending fragmented binary message");
|
||||||
|
char binary_data[128];
|
||||||
|
memset(binary_data, 0, sizeof(binary_data));
|
||||||
|
esp_websocket_client_send_bin_partial(client, binary_data, sizeof(binary_data), portMAX_DELAY);
|
||||||
|
memset(binary_data, 1, sizeof(binary_data));
|
||||||
|
esp_websocket_client_send_cont_msg(client, binary_data, sizeof(binary_data), portMAX_DELAY);
|
||||||
|
esp_websocket_client_send_fin(client, portMAX_DELAY);
|
||||||
|
|
||||||
esp_websocket_client_destroy(client);
|
esp_websocket_client_destroy(client);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +88,9 @@ static void websocket_event_handler(void *handler_args, esp_event_base_t base, i
|
|||||||
case WEBSOCKET_EVENT_DATA:
|
case WEBSOCKET_EVENT_DATA:
|
||||||
ESP_LOGI(TAG, "WEBSOCKET_EVENT_DATA");
|
ESP_LOGI(TAG, "WEBSOCKET_EVENT_DATA");
|
||||||
ESP_LOGI(TAG, "Received opcode=%d", data->op_code);
|
ESP_LOGI(TAG, "Received opcode=%d", data->op_code);
|
||||||
if (data->op_code == 0x08 && data->data_len == 2) {
|
if (data->op_code == 0x2) { // Opcode 0x2 indicates binary data
|
||||||
|
ESP_LOG_BUFFER_HEX("Received binary data", data->data_ptr, data->data_len);
|
||||||
|
} else if (data->op_code == 0x08 && data->data_len == 2) {
|
||||||
ESP_LOGW(TAG, "Received closed message with code=%d", 256 * data->data_ptr[0] + data->data_ptr[1]);
|
ESP_LOGW(TAG, "Received closed message with code=%d", 256 * data->data_ptr[0] + data->data_ptr[1]);
|
||||||
} else {
|
} else {
|
||||||
ESP_LOGW(TAG, "Received=%.*s\n\n", data->data_len, (char *)data->data_ptr);
|
ESP_LOGW(TAG, "Received=%.*s\n\n", data->data_len, (char *)data->data_ptr);
|
||||||
@ -183,13 +185,32 @@ static void websocket_app_start(void)
|
|||||||
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
ESP_LOGI(TAG, "Sending fragmented message");
|
|
||||||
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
||||||
|
// Sending text data
|
||||||
|
ESP_LOGI(TAG, "Sending fragmented text message");
|
||||||
memset(data, 'a', sizeof(data));
|
memset(data, 'a', sizeof(data));
|
||||||
esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY);
|
esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY);
|
||||||
memset(data, 'b', sizeof(data));
|
memset(data, 'b', sizeof(data));
|
||||||
esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY);
|
esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY);
|
||||||
esp_websocket_client_send_fin(client, portMAX_DELAY);
|
esp_websocket_client_send_fin(client, portMAX_DELAY);
|
||||||
|
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
||||||
|
|
||||||
|
// Sending binary data
|
||||||
|
ESP_LOGI(TAG, "Sending fragmented binary message");
|
||||||
|
char binary_data[5];
|
||||||
|
memset(binary_data, 0, sizeof(binary_data));
|
||||||
|
esp_websocket_client_send_bin_partial(client, binary_data, sizeof(binary_data), portMAX_DELAY);
|
||||||
|
memset(binary_data, 1, sizeof(binary_data));
|
||||||
|
esp_websocket_client_send_cont_msg(client, binary_data, sizeof(binary_data), portMAX_DELAY);
|
||||||
|
esp_websocket_client_send_fin(client, portMAX_DELAY);
|
||||||
|
vTaskDelay(1000 / portTICK_PERIOD_MS);
|
||||||
|
|
||||||
|
// Sending text data longer than ws buffer (default 1024)
|
||||||
|
ESP_LOGI(TAG, "Sending text longer than ws buffer (default 1024)");
|
||||||
|
const int size = 2000;
|
||||||
|
char *long_data = malloc(size);
|
||||||
|
memset(long_data, 'a', size);
|
||||||
|
esp_websocket_client_send_text(client, long_data, size, portMAX_DELAY);
|
||||||
|
|
||||||
xSemaphoreTake(shutdown_sema, portMAX_DELAY);
|
xSemaphoreTake(shutdown_sema, portMAX_DELAY);
|
||||||
esp_websocket_client_close(client, portMAX_DELAY);
|
esp_websocket_client_close(client, portMAX_DELAY);
|
||||||
|
@ -27,10 +27,13 @@ def get_my_ip():
|
|||||||
|
|
||||||
|
|
||||||
class WebsocketTestEcho(WebSocket):
|
class WebsocketTestEcho(WebSocket):
|
||||||
|
|
||||||
def handleMessage(self):
|
def handleMessage(self):
|
||||||
self.sendMessage(self.data)
|
if isinstance(self.data, bytes):
|
||||||
print('\n Server sent: {}\n'.format(self.data))
|
print(f'\n Server received binary data: {self.data.hex()}\n')
|
||||||
|
self.sendMessage(self.data, binary=True)
|
||||||
|
else:
|
||||||
|
print(f'\n Server received: {self.data}\n')
|
||||||
|
self.sendMessage(self.data)
|
||||||
|
|
||||||
def handleConnected(self):
|
def handleConnected(self):
|
||||||
print('Connection from: {}'.format(self.address))
|
print('Connection from: {}'.format(self.address))
|
||||||
@ -86,6 +89,9 @@ def test_examples_protocol_websocket(dut):
|
|||||||
3. send and receive data
|
3. send and receive data
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Test for echo functionality:
|
||||||
|
# Sends a series of simple "hello" messages to the WebSocket server and verifies that each one is echoed back correctly.
|
||||||
|
# This tests the basic responsiveness and correctness of the WebSocket connection.
|
||||||
def test_echo(dut):
|
def test_echo(dut):
|
||||||
dut.expect('WEBSOCKET_EVENT_CONNECTED')
|
dut.expect('WEBSOCKET_EVENT_CONNECTED')
|
||||||
for i in range(0, 5):
|
for i in range(0, 5):
|
||||||
@ -93,12 +99,16 @@ def test_examples_protocol_websocket(dut):
|
|||||||
print('All echos received')
|
print('All echos received')
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
# Test for clean closure of the WebSocket connection:
|
||||||
|
# Ensures that the WebSocket can correctly receive a close frame and terminate the connection without issues.
|
||||||
def test_close(dut):
|
def test_close(dut):
|
||||||
code = dut.expect(
|
code = dut.expect(
|
||||||
re.compile(
|
re.compile(
|
||||||
b'websocket: Received closed message with code=(\\d*)'))[0]
|
b'websocket: Received closed message with code=(\\d*)'))[0]
|
||||||
print('Received close frame with code {}'.format(code))
|
print('Received close frame with code {}'.format(code))
|
||||||
|
|
||||||
|
# Test for JSON message handling:
|
||||||
|
# Sends a JSON formatted string and verifies that the received message matches the expected JSON structure.
|
||||||
def test_json(dut, websocket):
|
def test_json(dut, websocket):
|
||||||
json_string = """
|
json_string = """
|
||||||
[
|
[
|
||||||
@ -118,7 +128,7 @@ def test_examples_protocol_websocket(dut):
|
|||||||
match = dut.expect(
|
match = dut.expect(
|
||||||
re.compile(b'Json=({[a-zA-Z0-9]*).*}')).group(0).decode()[5:]
|
re.compile(b'Json=({[a-zA-Z0-9]*).*}')).group(0).decode()[5:]
|
||||||
if match == str(data[0]):
|
if match == str(data[0]):
|
||||||
print('Sent message and received message are equal \n')
|
print('\n Sent message and received message are equal \n')
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
@ -126,6 +136,8 @@ def test_examples_protocol_websocket(dut):
|
|||||||
\nreceived: {}\nwith length {}'.format(
|
\nreceived: {}\nwith length {}'.format(
|
||||||
data[0], len(data[0]), match, len(match)))
|
data[0], len(data[0]), match, len(match)))
|
||||||
|
|
||||||
|
# Test for receiving long messages:
|
||||||
|
# This sends a message with a specified length (2000 characters) to ensure the WebSocket can handle large data payloads. Repeated 3 times for reliability.
|
||||||
def test_recv_long_msg(dut, websocket, msg_len, repeats):
|
def test_recv_long_msg(dut, websocket, msg_len, repeats):
|
||||||
|
|
||||||
send_msg = ''.join(
|
send_msg = ''.join(
|
||||||
@ -142,7 +154,7 @@ def test_examples_protocol_websocket(dut):
|
|||||||
recv_msg += match
|
recv_msg += match
|
||||||
|
|
||||||
if recv_msg == send_msg:
|
if recv_msg == send_msg:
|
||||||
print('Sent message and received message are equal \n')
|
print('\n Sent message and received message are equal \n')
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
@ -150,9 +162,50 @@ def test_examples_protocol_websocket(dut):
|
|||||||
\nreceived: {}\nwith length {}'.format(
|
\nreceived: {}\nwith length {}'.format(
|
||||||
send_msg, len(send_msg), recv_msg, len(recv_msg)))
|
send_msg, len(send_msg), recv_msg, len(recv_msg)))
|
||||||
|
|
||||||
def test_fragmented_msg(dut):
|
# Test for receiving the first fragment of a large message:
|
||||||
|
# Verifies the WebSocket's ability to correctly process the initial segment of a fragmented message.
|
||||||
|
def test_recv_fragmented_msg1(dut):
|
||||||
|
dut.expect('websocket: Total payload length=2000, data_len=1024, current payload offset=0')
|
||||||
|
|
||||||
|
# Test for receiving the second fragment of a large message:
|
||||||
|
# Confirms that the WebSocket can correctly handle and process the subsequent segment of a fragmented message.
|
||||||
|
def test_recv_fragmented_msg2(dut):
|
||||||
|
dut.expect('websocket: Total payload length=2000, data_len=976, current payload offset=1024')
|
||||||
|
|
||||||
|
# Test for receiving fragmented text messages:
|
||||||
|
# Checks if the WebSocket can accurately reconstruct a message sent in several smaller parts.
|
||||||
|
def test_fragmented_txt_msg(dut):
|
||||||
dut.expect('Received=' + 32 * 'a' + 32 * 'b')
|
dut.expect('Received=' + 32 * 'a' + 32 * 'b')
|
||||||
print('Fragmented data received')
|
print('\nFragmented data received\n')
|
||||||
|
|
||||||
|
# Extract the hexdump portion of the log line
|
||||||
|
def parse_hexdump(line):
|
||||||
|
match = re.search(r'\(.*\) Received binary data: ([0-9A-Fa-f ]+)', line)
|
||||||
|
if match:
|
||||||
|
hexdump = match.group(1).strip().replace(' ', '')
|
||||||
|
# Convert the hexdump string to a bytearray
|
||||||
|
return bytearray.fromhex(hexdump)
|
||||||
|
return bytearray()
|
||||||
|
|
||||||
|
# Capture the binary log output from the DUT
|
||||||
|
def test_fragmented_binary_msg(dut):
|
||||||
|
match = dut.expect(r'\(.*\) Received binary data: .*')
|
||||||
|
if match:
|
||||||
|
line = match.group(0).strip()
|
||||||
|
if isinstance(line, bytes):
|
||||||
|
line = line.decode('utf-8')
|
||||||
|
|
||||||
|
# Parse the hexdump from the log line
|
||||||
|
received_data = parse_hexdump(line)
|
||||||
|
|
||||||
|
# Create the expected bytearray with the specified pattern
|
||||||
|
expected_data = bytearray([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
|
||||||
|
|
||||||
|
# Validate the received data
|
||||||
|
assert received_data == expected_data, f'Received data does not match expected data. Received: {received_data}, Expected: {expected_data}'
|
||||||
|
print('\nFragmented data received\n')
|
||||||
|
else:
|
||||||
|
assert False, 'Log line with binary data not found'
|
||||||
|
|
||||||
# Starting of the test
|
# Starting of the test
|
||||||
try:
|
try:
|
||||||
@ -184,10 +237,12 @@ def test_examples_protocol_websocket(dut):
|
|||||||
dut.expect('Please enter uri of websocket endpoint', timeout=30)
|
dut.expect('Please enter uri of websocket endpoint', timeout=30)
|
||||||
dut.write(uri)
|
dut.write(uri)
|
||||||
test_echo(dut)
|
test_echo(dut)
|
||||||
# Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte
|
|
||||||
test_recv_long_msg(dut, ws, 2000, 3)
|
test_recv_long_msg(dut, ws, 2000, 3)
|
||||||
test_json(dut, ws)
|
test_json(dut, ws)
|
||||||
test_fragmented_msg(dut)
|
test_fragmented_txt_msg(dut)
|
||||||
|
test_fragmented_binary_msg(dut)
|
||||||
|
test_recv_fragmented_msg1(dut)
|
||||||
|
test_recv_fragmented_msg2(dut)
|
||||||
test_close(dut)
|
test_close(dut)
|
||||||
else:
|
else:
|
||||||
print('DUT connecting to {}'.format(uri))
|
print('DUT connecting to {}'.format(uri))
|
||||||
|
@ -0,0 +1,14 @@
|
|||||||
|
CONFIG_IDF_TARGET="esp32"
|
||||||
|
CONFIG_IDF_TARGET_LINUX=n
|
||||||
|
CONFIG_WEBSOCKET_URI_FROM_STDIN=n
|
||||||
|
CONFIG_WEBSOCKET_URI_FROM_STRING=y
|
||||||
|
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
|
||||||
|
CONFIG_EXAMPLE_CONNECT_WIFI=n
|
||||||
|
CONFIG_EXAMPLE_USE_INTERNAL_ETHERNET=y
|
||||||
|
CONFIG_EXAMPLE_ETH_PHY_IP101=y
|
||||||
|
CONFIG_EXAMPLE_ETH_MDC_GPIO=23
|
||||||
|
CONFIG_EXAMPLE_ETH_MDIO_GPIO=18
|
||||||
|
CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
|
||||||
|
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
|
||||||
|
CONFIG_EXAMPLE_CONNECT_IPV6=y
|
||||||
|
CONFIG_ESP_WS_CLIENT_ENABLE_DYNAMIC_BUFFER=y
|
Reference in New Issue
Block a user