feat(esp_wifi): Add unit test for writing wifi config in nvs

This commit adds a unit test for using wifi station and softap by flashing wifi config
directly into nvs using NVS Partition generator Utility (using csv file).
This commit is contained in:
Aditi
2025-03-26 16:39:27 +05:30
parent 9f4eb5c94b
commit 747e3e0f31
14 changed files with 404 additions and 2 deletions

1
.gitignore vendored
View File

@ -37,6 +37,7 @@ components/**/build/
components/**/build_*_*/
components/**/sdkconfig
components/**/sdkconfig.old
components/**/test_apps/wifi_nvs_config/nvs_data_suffix.csv
# Example project files
examples/**/build/

View File

@ -3,3 +3,7 @@
components/esp_wifi/test_apps/:
disable:
- if: SOC_WIFI_SUPPORTED != 1
components/esp_wifi/test_apps/wifi_nvs_config:
disable:
- if: SOC_WIFI_SUPPORTED != 1

View File

@ -0,0 +1,25 @@
#This is the project CMakeLists.txt file for the test subproject
cmake_minimum_required(VERSION 3.16)
set(EXTRA_COMPONENT_DIRS "$ENV{IDF_PATH}/tools/unit-test-app/components")
# "Trim" the build. Include the minimal set of components, main, and anything it depends on.
set(COMPONENTS main)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
if($ENV{CI_PIPELINE_ID})
idf_build_set_property(COMPILE_DEFINITIONS TEST_SUFFIX_STR="_$ENV{CI_PIPELINE_ID}" APPEND)
endif()
if(DEFINED ENV{CI_PIPELINE_ID})
set(TEST_SUFFIX_STR "_$ENV{CI_PIPELINE_ID}")
else()
string(TIMESTAMP TEST_SUFFIX_STR "%Y%m%d%H%M%S")
endif()
execute_process(
COMMAND python3 ${CMAKE_SOURCE_DIR}/update_csv_suffix.py ${TEST_SUFFIX_STR}
)
project(wifi_nvs_conn_test)

View File

@ -0,0 +1,2 @@
| Supported Targets | ESP32 | ESP32-C2 | ESP32-C3 | ESP32-C5 | ESP32-C6 | ESP32-C61 | ESP32-S2 | ESP32-S3 |
| ----------------- | ----- | -------- | -------- | -------- | -------- | --------- | -------- | -------- |

View File

@ -0,0 +1,10 @@
idf_component_register(SRC_DIRS .
PRIV_INCLUDE_DIRS . ${CMAKE_CURRENT_BINARY_DIR}
PRIV_REQUIRES cmock test_utils nvs_flash esp_wifi esp_event
WHOLE_ARCHIVE)
# Create a NVS image from the contents of the `nvs_data` CSV file
# that fits the partition named 'nvs'. FLASH_IN_PROJECT indicates that
# the generated image should be flashed when the entire project is flashed to
# the target with 'idf.py -p PORT flash'.
nvs_create_partition_image(nvs ../nvs_data_suffix.csv FLASH_IN_PROJECT)

View File

@ -0,0 +1,52 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include "unity.h"
#include "nvs_flash.h"
#include "nvs.h"
#include "esp_err.h"
#include "esp_netif.h"
#include "esp_wifi.h"
#include "esp_heap_caps.h"
// Some resources are lazy allocated in wifi and lwip
#define TEST_MEMORY_LEAK_THRESHOLD (-1536)
static size_t before_free_8bit;
static size_t before_free_32bit;
static void check_leak(size_t before_free, size_t after_free, const char *type)
{
ssize_t delta = after_free - before_free;
printf("MALLOC_CAP_%s: Before %u bytes free, After %u bytes free (delta %d)\n", type, before_free, after_free, delta);
TEST_ASSERT_MESSAGE(delta >= TEST_MEMORY_LEAK_THRESHOLD, "memory leak");
}
void setUp(void)
{
before_free_8bit = heap_caps_get_free_size(MALLOC_CAP_8BIT);
before_free_32bit = heap_caps_get_free_size(MALLOC_CAP_32BIT);
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
ESP_ERROR_CHECK(esp_wifi_init(&cfg));
}
void tearDown(void)
{
ESP_ERROR_CHECK(esp_wifi_deinit());
size_t after_free_8bit = heap_caps_get_free_size(MALLOC_CAP_8BIT);
size_t after_free_32bit = heap_caps_get_free_size(MALLOC_CAP_32BIT);
check_leak(before_free_8bit, after_free_8bit, "8BIT");
check_leak(before_free_32bit, after_free_32bit, "32BIT");
}
void app_main(void)
{
ESP_ERROR_CHECK(nvs_flash_init());
ESP_ERROR_CHECK(esp_netif_init());
unity_run_menu();
ESP_ERROR_CHECK(esp_netif_deinit());
ESP_ERROR_CHECK(nvs_flash_deinit());
}

View File

@ -0,0 +1,219 @@
/*
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*
* This test code is in the Public Domain (or CC0 licensed, at your option.)
*
* Unless required by applicable law or agreed to in writing, this
* software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
*/
#include <stdio.h>
#include <string.h>
#include "unity.h"
#include "esp_mac.h"
#include "esp_event.h"
#include "esp_wifi.h"
#include "esp_wifi_types.h"
#include "esp_log.h"
#include "nvs_flash.h"
#include "test_utils.h"
#include "memory_checks.h"
#include "freertos/task.h"
#include "freertos/event_groups.h"
#define CONNECT_TIMEOUT_MS (10000)
#define GOT_IP_EVENT (1)
#define WIFI_DISCONNECT_EVENT (1<<1)
#define WIFI_STA_CONNECTED (1<<2)
#define WIFI_AP_STA_CONNECTED (1<<3)
#define EVENT_HANDLER_FLAG_DO_NOT_AUTO_RECONNECT 0x00000001
static uint32_t wifi_event_handler_flag;
static const char* TAG = "test_wifi";
static esp_netif_t* s_ap_netif = NULL;
static esp_netif_t* s_sta_netif = NULL;
static EventGroupHandle_t wifi_events;
static void stop_wifi(void);
static void wifi_ap_event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
ESP_LOGI(TAG, "wifi event handler: %"PRIi32, event_id);
switch (event_id) {
case WIFI_EVENT_AP_START:
ESP_LOGI(TAG, "WIFI_EVENT_AP_START");
break;
case WIFI_EVENT_AP_STACONNECTED:
ESP_LOGI(TAG, "WIFI_EVENT_AP_STACONNECTED");
if (wifi_events) {
xEventGroupSetBits(wifi_events, WIFI_AP_STA_CONNECTED);
}
break;
case WIFI_EVENT_AP_STADISCONNECTED:
ESP_LOGI(TAG, "WIFI_EVENT_AP_STADISCONNECTED");
ESP_LOGI(TAG, "sta disconnected");
break;
default:
break;
}
return;
}
static void wifi_sta_event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
ESP_LOGI(TAG, "wifi event handler: %"PRIi32, event_id);
switch (event_id) {
case WIFI_EVENT_STA_START:
ESP_LOGI(TAG, "WIFI_EVENT_STA_START");
// make sure softap has started
vTaskDelay(1000 / portTICK_PERIOD_MS);
TEST_ESP_OK(esp_wifi_connect());
ESP_LOGI(TAG, "start esp_wifi_connect");
break;
case WIFI_EVENT_STA_CONNECTED:
ESP_LOGI(TAG, "WIFI_EVENT_STA_CONNECTED");
if (wifi_events) {
xEventGroupSetBits(wifi_events, WIFI_STA_CONNECTED);
}
break;
case WIFI_EVENT_STA_DISCONNECTED:
ESP_LOGI(TAG, "WIFI_EVENT_STA_DISCONNECTED");
wifi_event_sta_disconnected_t *event = (wifi_event_sta_disconnected_t *)event_data;
ESP_LOGI(TAG, "disconnect reason: %u", event->reason);
if (!(EVENT_HANDLER_FLAG_DO_NOT_AUTO_RECONNECT & wifi_event_handler_flag)) {
TEST_ESP_OK(esp_wifi_connect());
}
if (wifi_events) {
xEventGroupSetBits(wifi_events, WIFI_DISCONNECT_EVENT);
}
break;
default:
break;
}
return;
}
static void ip_event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
ip_event_got_ip_t *event;
ESP_LOGI(TAG, "ip event handler");
switch (event_id) {
case IP_EVENT_STA_GOT_IP:
event = (ip_event_got_ip_t*)event_data;
ESP_LOGI(TAG, "IP_EVENT_STA_GOT_IP");
ESP_LOGI(TAG, "got ip:" IPSTR, IP2STR(&event->ip_info.ip));
if (wifi_events) {
xEventGroupSetBits(wifi_events, GOT_IP_EVENT);
}
break;
default:
break;
}
return;
}
static esp_err_t event_init(void)
{
ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT, ESP_EVENT_ANY_ID, &ip_event_handler, NULL));
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
esp_wifi_init(&cfg);
return ESP_OK;
}
static esp_err_t event_deinit(void)
{
ESP_ERROR_CHECK(esp_event_handler_unregister(IP_EVENT, ESP_EVENT_ANY_ID, &ip_event_handler));
ESP_ERROR_CHECK(esp_event_loop_delete_default());
return ESP_OK;
}
#define EMPH_STR(s) "****** "s" ******"
static void start_wifi(void)
{
event_init();
if (wifi_events == NULL) {
wifi_events = xEventGroupCreate();
}
xEventGroupClearBits(wifi_events, 0x00ffffff);
TEST_ESP_OK(esp_wifi_start());
}
static void stop_wifi(void)
{
ESP_LOGI(TAG, "Stopping wifi now");
TEST_ESP_OK(esp_wifi_stop());
event_deinit();
if (wifi_events) {
vEventGroupDelete(wifi_events);
wifi_events = NULL;
}
vTaskDelay(500 / portTICK_PERIOD_MS);
}
static void test_wifi_connection_sta(void)
{
EventBits_t bits;
ESP_ERROR_CHECK(esp_event_loop_create_default());
ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, &wifi_sta_event_handler, NULL));
s_sta_netif = esp_netif_create_default_wifi_sta();
esp_wifi_set_mode(WIFI_MODE_STA);
start_wifi();
wifi_config_t cfg;
esp_wifi_get_config(WIFI_IF_STA, &cfg);
ESP_LOGI(TAG, "STA mode, %s %s", cfg.sta.ssid, cfg.sta.password);
bits = xEventGroupWaitBits(wifi_events, GOT_IP_EVENT, 1, 0, CONNECT_TIMEOUT_MS / portTICK_PERIOD_MS);
TEST_ASSERT(bits & GOT_IP_EVENT);
xEventGroupClearBits(wifi_events, WIFI_DISCONNECT_EVENT);
wifi_event_handler_flag |= EVENT_HANDLER_FLAG_DO_NOT_AUTO_RECONNECT;
// Wait for 60s and the stop wifi
vTaskDelay(60000 / portTICK_PERIOD_MS);
esp_netif_destroy_default_wifi(s_sta_netif);
stop_wifi();
}
static void test_wifi_connection_softap(void)
{
EventBits_t bits;
ESP_ERROR_CHECK(esp_event_loop_create_default());
ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, &wifi_ap_event_handler, NULL));
s_ap_netif = esp_netif_create_default_wifi_ap();
esp_wifi_set_mode(WIFI_MODE_AP);
start_wifi();
wifi_config_t cfg;
esp_wifi_get_config(WIFI_IF_AP, &cfg);
ESP_LOGI(TAG, "AP mode, %s %s", cfg.ap.ssid, cfg.ap.password);
// wait station connected
bits = xEventGroupWaitBits(wifi_events, WIFI_AP_STA_CONNECTED, 1, 0, CONNECT_TIMEOUT_MS / portTICK_PERIOD_MS);
TEST_ASSERT(bits & WIFI_AP_STA_CONNECTED);
// wait 70s (longer than station side)
vTaskDelay((60000 + CONNECT_TIMEOUT_MS) / portTICK_PERIOD_MS);
esp_netif_destroy_default_wifi(s_ap_netif);
stop_wifi();
}
TEST_CASE_MULTIPLE_DEVICES("test wifi connection with station and AP", "[wifi][timeout=90]", test_wifi_connection_sta, test_wifi_connection_softap);

View File

@ -0,0 +1,7 @@
key,type,encoding,value
nvs.net80211,namespace,,
sta.ssid,data,blob_sz_fill(32;0x00),TESTSSID
sta.pswd,data,blob_fill(64;0x00),TESTPASSWORD
ap.ssid,data,blob_sz_fill(32;0x00),TESTSSID
ap.passwd,data,blob_fill(64;0x00),TESTPASSWORD
ap.authmode,data,u8,3
1 key type encoding value
2 nvs.net80211 namespace
3 sta.ssid data blob_sz_fill(32;0x00) TESTSSID
4 sta.pswd data blob_fill(64;0x00) TESTPASSWORD
5 ap.ssid data blob_sz_fill(32;0x00) TESTSSID
6 ap.passwd data blob_fill(64;0x00) TESTPASSWORD
7 ap.authmode data u8 3

View File

@ -0,0 +1,6 @@
# Name, Type, SubType, Offset, Size, Flags
# Note: if you have increased the bootloader size, make sure to update the offsets to avoid overlap
# Name, Type, SubType, Offset, Size, Flags
nvs, data, nvs, 0x9000, 0x6000,
phy_init, data, phy, 0xf000, 0x1000,
factory, app, factory, 0x10000, 1M,
1 # Name, Type, SubType, Offset, Size, Flags
2 # Note: if you have increased the bootloader size, make sure to update the offsets to avoid overlap
3 # Name, Type, SubType, Offset, Size, Flags
4 nvs, data, nvs, 0x9000, 0x6000,
5 phy_init, data, phy, 0xf000, 0x1000,
6 factory, app, factory, 0x10000, 1M,

View File

@ -0,0 +1,30 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import pytest
from pytest_embedded_idf.unity_tester import CaseTester
from pytest_embedded_idf.utils import idf_parametrize
@pytest.mark.wifi_two_dut
@pytest.mark.parametrize('count', [2], indirect=True)
@idf_parametrize(
'target',
['esp32', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32s2', 'esp32s3'],
indirect=['target'],
)
def test_wifi_nvs_connect_cases(case_tester: CaseTester) -> None: # type: ignore
case_tester.run_all_cases()
@pytest.mark.wifi_two_dut
@pytest.mark.xtal_26mhz
@pytest.mark.parametrize(
'count, config, baud',
[
(2, 'esp32c2_xtal26m', '74880'),
],
indirect=True,
)
@idf_parametrize('target', ['esp32c2'], indirect=['target'])
def test_wifi_nvs_connect_cases_esp32c2_xtal26m(case_tester: CaseTester) -> None:
case_tester.run_all_cases()

View File

@ -0,0 +1,2 @@
CONFIG_IDF_TARGET="esp32c2"
CONFIG_XTAL_FREQ_26=y

View File

@ -0,0 +1,5 @@
# ignore task watchdog triggered by unity_run_menu
CONFIG_ESP_TASK_WDT_EN=n
CONFIG_PARTITION_TABLE_CUSTOM=y
CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partitions_example.csv"
CONFIG_PARTITION_TABLE_FILENAME="partitions_example.csv"

View File

@ -0,0 +1,39 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import csv
import os
import sys
def update_csv_suffix(suffix: str) -> None:
# Hardcoded CSV file path
CSV_FILE = os.path.join(os.path.dirname(__file__), 'nvs_data.csv')
# Output CSV file path with suffix
NEW_CSV_FILE = os.path.join(os.path.dirname(__file__), 'nvs_data_suffix.csv')
with open(NEW_CSV_FILE, 'w', newline='') as outfile:
with open(CSV_FILE, 'r', newline='') as infile:
reader = csv.DictReader(infile)
fieldnames = reader.fieldnames if reader.fieldnames else []
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
if row['value'] == 'TESTSSID':
row['value'] = f'TESTSSID_{suffix}'
elif row['value'] == 'TESTPASSWORD':
row['value'] = f'TESTPASSWORD_{suffix}'
writer.writerow(row)
print(f'[update_csv.py] Patched CSV with suffix: {suffix}, saved as {NEW_CSV_FILE}')
if __name__ == '__main__':
if len(sys.argv) != 2:
print('Usage: python update_csv_suffix.py <suffix>')
sys.exit(1)
suffix = sys.argv[1]
update_csv_suffix(suffix)

View File

@ -1,5 +1,5 @@
| Supported Targets | ESP32 | ESP32-C2 | ESP32-C3 | ESP32-C6 | ESP32-C61 | ESP32-S2 | ESP32-S3 |
| ----------------- | ----- | -------- | -------- | -------- | --------- | -------- | -------- |
| Supported Targets | ESP32 | ESP32-C2 | ESP32-C3 | ESP32-C5 | ESP32-C6 | ESP32-C61 | ESP32-S2 | ESP32-S3 |
| ----------------- | ----- | -------- | -------- | -------- | -------- | --------- | -------- | -------- |
# WiFi NVS Config Example