test(driver_twai): new driver add interctive test

This commit is contained in:
wanckl
2025-05-12 16:25:54 +08:00
parent cf78d0a7ff
commit 69f258b02f
6 changed files with 206 additions and 10 deletions

View File

@@ -636,7 +636,13 @@ esp_err_t twai_new_node_onchip(const twai_onchip_node_config_t *node_config, twa
// Configure GPIO
ESP_GOTO_ON_ERROR(_node_config_io(node, node_config), err, TAG, "gpio config failed");
#if CONFIG_PM_ENABLE
#if SOC_TWAI_CLK_SUPPORT_APB
// DFS can change APB frequency. So add lock to prevent sleep and APB freq from changing
ESP_GOTO_ON_ERROR(esp_pm_lock_create(ESP_PM_APB_FREQ_MAX, 0, twai_controller_periph_signals.controllers[ctrlr_id].module_name, &node->pm_lock), err, TAG, "init power manager failed");
#else // XTAL
// XTAL freq can be closed in light sleep, so we need to create a lock to prevent light sleep
ESP_GOTO_ON_ERROR(esp_pm_lock_create(ESP_PM_NO_LIGHT_SLEEP, 0, twai_controller_periph_signals.controllers[ctrlr_id].module_name, &node->pm_lock), err, TAG, "init power manager failed");
#endif //SOC_TWAI_CLK_SUPPORT_APB
#endif //CONFIG_PM_ENABLE
node->api_base.enable = _node_enable;

View File

@@ -1,7 +1,7 @@
set(srcs "test_app_main.c")
if(CONFIG_SOC_TWAI_SUPPORTED)
list(APPEND srcs "test_twai_common.c")
list(APPEND srcs "test_twai_common.c" "test_twai_network.c")
endif()
if(CONFIG_SOC_TWAI_SUPPORT_FD)

View File

@@ -36,7 +36,7 @@ static IRAM_ATTR bool test_driver_install_rx_cb(twai_node_handle_t handle, const
return false;
}
TEST_CASE("twai install uninstall (loopback)", "[TWAI]")
TEST_CASE("twai install uninstall (loopback)", "[twai]")
{
esp_err_t ret;
twai_node_handle_t node_hdl[SOC_TWAI_CONTROLLER_NUM + 1];
@@ -145,7 +145,7 @@ static void test_twai_baudrate_correctness(twai_clock_source_t clk_src, uint32_t
TEST_ESP_OK(twai_node_delete(twai_node));
}
TEST_CASE("twai baudrate measurement", "[TWAI]")
TEST_CASE("twai baudrate measurement", "[twai]")
{
twai_clock_source_t twai_available_clk_srcs[] = SOC_TWAI_CLKS;
for (size_t i = 0; i < sizeof(twai_available_clk_srcs) / sizeof(twai_available_clk_srcs[0]); i++) {
@@ -163,7 +163,7 @@ static IRAM_ATTR bool test_enable_disable_rx_cb(twai_node_handle_t handle, const
return false;
}
TEST_CASE("twai transmit stop resume (loopback)", "[TWAI]")
TEST_CASE("twai transmit stop resume (loopback)", "[twai]")
{
// prepare test memory
uint8_t *send_pkg_ptr = heap_caps_malloc(TEST_TRANS_LEN, MALLOC_CAP_8BIT);
@@ -268,7 +268,7 @@ static IRAM_ATTR bool test_filter_rx_done_cb(twai_node_handle_t handle, const tw
return false;
}
TEST_CASE("twai mask filter (loopback)", "[TWAI]")
TEST_CASE("twai mask filter (loopback)", "[twai]")
{
uint8_t test_ctrl[2];
twai_node_handle_t node_hdl;
@@ -352,7 +352,7 @@ static IRAM_ATTR bool test_dual_filter_rx_done_cb(twai_node_handle_t handle, con
return false;
}
TEST_CASE("twai dual 16bit mask filter (loopback)", "[TWAI]")
TEST_CASE("twai dual 16bit mask filter (loopback)", "[twai]")
{
uint8_t test_ctrl[2];
twai_node_handle_t node_hdl;
@@ -421,7 +421,7 @@ static void IRAM_ATTR test_wait_trans_done_cache_disable(void *args)
}
}
TEST_CASE("twai driver cache safe (loopback)", "[TWAI]")
TEST_CASE("twai driver cache safe (loopback)", "[twai]")
{
// prepare test memory
uint8_t *send_pkg_ptr = heap_caps_malloc(TEST_TRANS_LEN, MALLOC_CAP_8BIT);

View File

@@ -60,7 +60,7 @@ static IRAM_ATTR bool test_range_filter_rx_done_cb(twai_node_handle_t handle, co
return false;
}
TEST_CASE("twai range filter (loopback)", "[TWAI]")
TEST_CASE("twai range filter (loopback)", "[twai]")
{
uint8_t test_ctrl[2];
twai_node_handle_t node_hdl;
@@ -131,7 +131,7 @@ static IRAM_ATTR bool test_fd_trans_time_rx_cb(twai_node_handle_t handle, const
return false;
}
TEST_CASE("twai fd transmit time (loopback)", "[TWAI]")
TEST_CASE("twai fd transmit time (loopback)", "[twai]")
{
// prepare test memory
uint8_t *send_pkg_ptr = heap_caps_malloc(TEST_TRANS_TIME_BUF_LEN, MALLOC_CAP_8BIT);

View File

@@ -0,0 +1,117 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#include "unity.h"
#include "unity_test_utils_cache.h"
#include "esp_log.h"
#include "freertos/FreeRTOS.h"
#include "test_utils.h"
#include "esp_twai.h"
#include "esp_twai_onchip.h"
#define TEST_TX_GPIO 4
#define TEST_RX_GPIO 5
static bool IRAM_ATTR test_listen_only_rx_cb(twai_node_handle_t handle, const twai_rx_done_event_data_t *edata, void *user_ctx)
{
twai_frame_t *rx_frame = ((twai_frame_t **)user_ctx)[1];
uint8_t *rx_cnt = ((uint8_t **)user_ctx)[0];
if (ESP_OK == twai_node_receive_from_isr(handle, rx_frame)) {
*rx_cnt += 1;
}
return false;
}
TEST_CASE("twai_listen_only", "[twai_net]")
{
twai_node_handle_t node_hdl;
twai_onchip_node_config_t node_config = {
.io_cfg.tx = TEST_TX_GPIO,
.io_cfg.rx = TEST_RX_GPIO,
.bit_timing.bitrate = 250000,
.tx_queue_depth = 3,
.flags.enable_listen_only = true,
};
TEST_ESP_OK(twai_new_node_onchip(&node_config, &node_hdl));
ESP_LOGI("Test", "driver installed");
uint8_t rx_buffer[8] = {0};
twai_frame_t rx_frame = {
.buffer = rx_buffer,
.buffer_len = sizeof(rx_buffer),
};
uint8_t rx_msg_cnt = 0;
void *user_data[2] = {&rx_msg_cnt, &rx_frame};
twai_event_callbacks_t user_cbs = {
.on_rx_done = test_listen_only_rx_cb,
};
TEST_ESP_OK(twai_node_register_event_callbacks(node_hdl, &user_cbs, user_data));
TEST_ESP_OK(twai_node_enable(node_hdl));
ESP_LOGI("Test", "Listening ...");
while (!rx_msg_cnt) {
vTaskDelay(1);
}
ESP_LOGI("Test", "receive with id 0x%lx", rx_frame.header.id);
ESP_LOG_BUFFER_HEX("Data", rx_frame.buffer, twaifd_dlc2len(rx_frame.header.dlc));
TEST_ASSERT_EQUAL_HEX(0x6688, rx_frame.header.id);
uint8_t expected_data[8] = {0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88};
TEST_ASSERT_EQUAL_HEX8_ARRAY(expected_data, rx_frame.buffer, twaifd_dlc2len(rx_frame.header.dlc));
TEST_ESP_OK(twai_node_disable(node_hdl));
TEST_ESP_OK(twai_node_delete(node_hdl));
}
TEST_CASE("twai_remote_request", "[twai_net]")
{
twai_node_handle_t node_hdl;
twai_onchip_node_config_t node_config = {
.io_cfg.tx = TEST_TX_GPIO,
.io_cfg.rx = TEST_RX_GPIO,
.bit_timing.bitrate = 250000,
.fail_retry_cnt = -1, // retry forever if send remote frame failed
.tx_queue_depth = 3,
};
TEST_ESP_OK(twai_new_node_onchip(&node_config, &node_hdl));
ESP_LOGI("Test", "driver installed");
uint8_t rx_buffer[8] = {0};
twai_frame_t rx_frame = {
.buffer = rx_buffer,
.buffer_len = sizeof(rx_buffer),
};
uint8_t rx_msg_cnt = 0;
void *user_data[2] = {&rx_msg_cnt, &rx_frame};
twai_event_callbacks_t user_cbs = {
.on_rx_done = test_listen_only_rx_cb,
};
TEST_ESP_OK(twai_node_register_event_callbacks(node_hdl, &user_cbs, user_data));
TEST_ESP_OK(twai_node_enable(node_hdl));
twai_frame_t tx_frame = {
.header.id = 0x123,
.header.dlc = 8,
.header.rtr = true,
.header.ide = true,
};
TEST_ESP_OK(twai_node_transmit(node_hdl, &tx_frame, 1000));
ESP_LOGI("Test", "send remote frame");
uint8_t expected_data[8] = {0x80, 0x70, 0x60, 0x50, 0x40, 0x30, 0x20, 0x10};
//waiting pkg receive finish
while (!rx_msg_cnt) {
vTaskDelay(1);
}
ESP_LOGI("Test", "receive with id 0x%lx", rx_frame.header.id);
ESP_LOG_BUFFER_HEX("Data", rx_frame.buffer, twaifd_dlc2len(rx_frame.header.dlc));
TEST_ASSERT_EQUAL_HEX(0x123, rx_frame.header.id);
TEST_ASSERT_EQUAL_HEX8_ARRAY(expected_data, rx_frame.buffer, twaifd_dlc2len(rx_frame.header.dlc));
TEST_ESP_OK(twai_node_disable(node_hdl));
TEST_ESP_OK(twai_node_delete(node_hdl));
}

View File

@@ -1,7 +1,12 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import subprocess
from time import sleep
import pytest
from can import Bus
from can import Message
from pytest_embedded import Dut
from pytest_embedded_idf.utils import idf_parametrize
from pytest_embedded_idf.utils import soc_filtered_targets
@@ -11,4 +16,72 @@ from pytest_embedded_idf.utils import soc_filtered_targets
@pytest.mark.parametrize('config', ['release', 'cache_safe'], indirect=True)
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
def test_driver_twai_loopbk(dut: Dut) -> None:
dut.run_all_single_board_cases(reset=True)
dut.run_all_single_board_cases(group='twai', reset=True)
# -------------------------------- test twai interactive ------------------------------
@pytest.fixture(name='socket_can')
def fixture_create_socket_can() -> Bus:
# Set up the socket CAN with the bitrate
start_command = 'sudo ip link set can0 up type can bitrate 250000'
stop_command = 'sudo ip link set can0 down'
try:
subprocess.run(start_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
except Exception as e:
print(f'Open bus Error: {e}')
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
yield bus # test invoked here
bus.shutdown()
subprocess.run(stop_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
@pytest.mark.twai_std
@pytest.mark.temp_skip_ci(targets=['esp32c5'], reason='no runner')
@pytest.mark.parametrize('config', ['release'], indirect=True)
@pytest.mark.timeout(10) # Whole test timeout
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
dut.serial.hard_reset()
dut.expect_exact('Press ENTER to see the list of tests')
dut.write('"twai_listen_only"')
# wait the DUT to finish initialize
sleep(0.1)
message = Message(
arbitration_id=0x6688,
is_extended_id=True,
data=[0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88],
)
print('USB Socket CAN Send:', message)
socket_can.send(message, timeout=0.2)
dut.expect_unity_test_output(timeout=10)
@pytest.mark.twai_std
@pytest.mark.temp_skip_ci(targets=['esp32c5'], reason='no runner')
@pytest.mark.parametrize('config', ['release'], indirect=True)
@pytest.mark.timeout(10) # Whole test timeout
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
dut.serial.hard_reset()
dut.expect_exact('Press ENTER to see the list of tests')
dut.write('"twai_remote_request"')
print('Waiting remote frame ...')
while True:
req = socket_can.recv(timeout=0.2)
if req is not None and req.is_remote_frame:
break
print(f'USB Socket CAN Received: {req}')
reply = Message(
arbitration_id=req.arbitration_id,
is_extended_id=req.is_extended_id,
data=[0x80, 0x70, 0x60, 0x50, 0x40, 0x30, 0x20, 0x10],
)
socket_can.send(reply, timeout=0.2)
print('USB Socket CAN Replied:', reply)
dut.expect_unity_test_output(timeout=10)