Merge branch 'fix/nvs_tool_false_duplicate_warning_v5.3' into 'release/v5.3'

fix(nvs): nvs_tool.py refactor, reduce false duplicate warnings, add a test (v5.3)

See merge request espressif/esp-idf!33775
This commit is contained in:
Martin Vychodil
2025-04-23 22:14:44 +08:00
5 changed files with 901 additions and 162 deletions

View File

@ -364,3 +364,17 @@ test_idf_pytest_plugin:
script:
- cd tools/ci/idf_pytest
- pytest --junitxml=${CI_PROJECT_DIR}/XUNIT_RESULT.xml
test_nvs_gen_check:
extends: .host_test_template
artifacts:
paths:
- XUNIT_RESULT.xml
- components/nvs_flash/nvs_partition_tool
reports:
junit: XUNIT_RESULT.xml
variables:
LC_ALL: C.UTF-8
script:
- cd ${IDF_PATH}/components/nvs_flash/nvs_partition_tool
- pytest --noconftest test_nvs_gen_check.py --junitxml=XUNIT_RESULT.xml

View File

@ -1,28 +1,54 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2023-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Dict, List
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from nvs_logger import NVS_Logger
from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
from nvs_parser import nvs_const
from nvs_parser import NVS_Entry
from nvs_parser import NVS_Page
from nvs_parser import NVS_Partition
def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
used_namespaces: Dict[int, None] = {}
EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased')
used_namespaces: Dict[int, Optional[str]] = {}
found_namespaces: Dict[int, str] = {}
blobs: Dict = {}
blob_chunks: List[NVS_Entry] = []
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
# Partition size check
if len(nvs_partition.pages) < 3:
def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger, read_only: bool=False) -> bool:
""" Checks if the partition is large enough and has enough pages
"""
if len(nvs_partition.raw_data) // 0x1000 < 3 and not read_only:
nvs_log.info(
nvs_log.yellow(
'NVS Partition size must be at least 0x3000 (4kiB * 3 pages == 12kiB)!'
)
)
return False
if len(nvs_partition.raw_data) % 0x1000 != 0:
nvs_log.info(
nvs_log.yellow(
'NVS Partition size must be a multiple of 0x1000 (4kiB)!'
)
)
return False
if len(nvs_partition.pages) < 3 and not read_only:
nvs_log.info(
nvs_log.yellow(
'NVS Partition must contain 3 pages (sectors) at least to function properly!'
)
)
return False
return True
# Free/empty page check
def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
nvs_log.info(
nvs_log.red(
@ -31,44 +57,69 @@ at least one free page is required for proper function!'''
)
)
nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
return False
return True
for page in nvs_partition.pages:
# page: NVS_Page
# Print page header
if page.header['status'] == 'Empty':
nvs_log.info(nvs_log.cyan('Page Empty'))
def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
result = True
nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}'))
# Check if page is truly empty
if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
result = False
nvs_log.info(
nvs_log.red(
'The page is reported as Empty but its entry state bitmap is not empty!'
)
)
if any([not e.is_empty for e in page.entries]):
if any([not e.is_empty for e in nvs_page.entries]):
result = False
nvs_log.info(
nvs_log.red('The page is reported as Empty but there are data written!')
)
else:
# Check page header CRC32
if page.header['crc']['original'] == page.header['crc']['computed']:
nvs_log.info(
nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK'
)
else:
nvs_log.info(
nvs_log.cyan(f'Page no. {page.header["page_index"]}'),
f'Original CRC32:',
nvs_log.red(f'{page.header["crc"]["original"]:x}'),
f'Generated CRC32:',
nvs_log.green(f'{page.header["crc"]["computed"]:x}'),
)
# Check all entries
seen_written_entires: Dict[str, list[NVS_Entry]] = {}
for entry in page.entries:
return result
def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']:
nvs_log.info(
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK'
)
return True
else:
nvs_log.info(
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'),
f'Original CRC32:',
nvs_log.red(f'{nvs_page.header["crc"]["original"]:x}'),
f'Generated CRC32:',
nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'),
)
return False
def identify_entry_duplicates(entry: NVS_Entry, entry_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Identifies and logs written entries
Part 1 of duplicate entry check mechanism
"""
if entry.state == 'Written':
if entry.key in entry_dict:
entry_dict[entry.key].append(entry)
else:
entry_dict[entry.key] = [entry]
return entry_dict
def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, List[NVS_Entry]]:
"""Checks entries in the given page (entry state, children CRC32, entry type, span and gathers blobs and namespaces)
"""
seen_written_entires: Dict[str, List[NVS_Entry]] = {}
for entry in nvs_page.entries:
# entry: NVS_Entry
entry.page = nvs_page
# Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data
@ -76,11 +127,7 @@ at least one free page is required for proper function!'''
# and are stored in as entries inside string/blob data entry 'entry.children' list
# Duplicate entry check (1) - same key, different index - find duplicates
if entry.state == 'Written':
if entry.key in seen_written_entires:
seen_written_entires[entry.key].append(entry)
else:
seen_written_entires[entry.key] = [entry]
seen_written_entires = identify_entry_duplicates(entry, seen_written_entires)
# Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data)
if entry.is_empty:
@ -164,7 +211,7 @@ at least one free page is required for proper function!'''
# Gather blobs & namespaces
if entry.metadata['type'] == 'blob_index':
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
empty_entry
EMPTY_ENTRY
] * entry.data['chunk_count']
elif entry.metadata['type'] == 'blob_data':
blob_chunks.append(entry)
@ -174,33 +221,177 @@ at least one free page is required for proper function!'''
else:
used_namespaces[entry.metadata['namespace']] = None
# Duplicate entry check (2) - same key, different index - print duplicates
duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1]
for duplicate_entries in duplicate_entries_list:
# duplicate_entries: list[NVS_Entry]
nvs_log.info(
nvs_log.red(
f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]}
with status {page.header["status"]} is used by the following entries:'''
)
)
return seen_written_entires
def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates,
where entries which have the same key but under different namespaces are filtered out
Use `filter_entry_duplicates()` to properly filter out all duplicates
"""
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
for key, duplicate_entries in duplicate_entries_dict.items():
seen_entries: List[NVS_Entry] = []
entry_same_namespace_collisions_list: Set[NVS_Entry] = set()
# Search through the "duplicates" and see if there are real duplicates
# E.g. the key can be the same if the namespace is different
for entry in duplicate_entries:
if entry.metadata['type'] in nvs_const.item_type.values():
entry_same_namespace_collisions = set()
for other_entry in seen_entries:
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
entry_same_namespace_collisions.add(entry)
entry_same_namespace_collisions.add(other_entry)
if len(entry_same_namespace_collisions) != 0:
entry_same_namespace_collisions_list.update(entry_same_namespace_collisions)
seen_entries.append(entry)
# Catch real duplicates
new_duplicate_entries: List[NVS_Entry] = []
if len(seen_entries) > 1:
for entry in seen_entries:
if entry in entry_same_namespace_collisions_list:
new_duplicate_entries.append(entry)
if len(new_duplicate_entries) > 0:
new_duplicate_entries_dict[key] = new_duplicate_entries
return new_duplicate_entries_dict
def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates,
where entries related to blob index and blob data under the same namespace are filtered out
Use `filter_entry_duplicates()` to properly filter out all duplicates
"""
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
for key, duplicate_entries in duplicate_entries_dict.items():
seen_blob_index: List[NVS_Entry] = []
seen_blob_data: List[NVS_Entry] = []
seen_another_type_data: List[NVS_Entry] = []
blob_index_chunk_index_collisions_list: Set[NVS_Entry] = set()
blob_data_chunk_index_collisions_list: Set[NVS_Entry] = set()
# Search through the "duplicates" and see if there are real duplicates
# E.g. the key can be the same for blob_index and blob_data
# (and even for more blob_data entries if they have a different chunk_index)
for entry in duplicate_entries:
if entry.metadata['type'] == 'blob_index':
blob_index_chunk_index_collisions = set()
for other_entry in seen_blob_index:
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
blob_index_chunk_index_collisions.add(entry)
blob_index_chunk_index_collisions.add(other_entry)
if len(blob_index_chunk_index_collisions) != 0:
blob_index_chunk_index_collisions_list.update(blob_index_chunk_index_collisions)
seen_blob_index.append(entry)
elif entry.metadata['type'] == 'blob_data':
blob_data_chunk_index_collisions = set()
for other_entry in seen_blob_data:
if (entry.metadata['namespace'] == other_entry.metadata['namespace']
and entry.metadata['chunk_index'] == other_entry.metadata['chunk_index']):
blob_data_chunk_index_collisions.add(entry)
blob_data_chunk_index_collisions.add(other_entry)
if len(blob_data_chunk_index_collisions) != 0:
blob_data_chunk_index_collisions_list.update(blob_data_chunk_index_collisions)
seen_blob_data.append(entry)
else:
seen_another_type_data.append(entry)
# Catch real duplicates
new_duplicate_entries: List[NVS_Entry] = []
if len(seen_blob_index) > 1:
for entry in seen_blob_index:
if entry in blob_index_chunk_index_collisions_list:
new_duplicate_entries.append(entry)
if len(seen_blob_data) > 1:
for entry in seen_blob_data:
if entry in blob_data_chunk_index_collisions_list:
new_duplicate_entries.append(entry)
for entry in seen_another_type_data: # If there are any duplicates of other types
new_duplicate_entries.append(entry)
if len(new_duplicate_entries) > 0:
new_duplicate_entries_dict[key] = new_duplicate_entries
return new_duplicate_entries_dict
def filter_entry_duplicates(entries: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Takes a dictionary of (seen written) entries and outputs a new dictionary with "fake" duplicates filtered out, keeping only real duplicates in
(i.e. duplicate keys under different namespaces and blob index and blob data having the same key under the same namespace are allowed
and should be filtered out)
Part 2 of duplicate entry check mechanism
"""
# Only keep seen written entries which have been observerd multiple times (duplicates)
duplicate_entries_list = {key: v for key, v in entries.items() if len(v) > 1}
# Filter out "fake" duplicates 1 (duplicate keys under different namespaces are allowed)
duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list)
# Filter out "fake" duplicates 2 (blob index and blob data are allowed to have the same key even in the same namespace)
duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1)
return duplicate_entries_list_2
def print_entry_duplicates(duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None:
if len(duplicate_entries_list) > 0:
nvs_log.info(nvs_log.red('Found duplicate entries:'))
nvs_log.info(nvs_log.red('Entry\tKey\t\t\tType\t\tNamespace idx\tPage\tPage status'))
for _, duplicate_entries in duplicate_entries_list.items():
# duplicate_entries: List[NVS_Entry]
for entry in duplicate_entries:
# entry: NVS_Entry
if entry.metadata['namespace'] == 0:
entry_type = f'namespace ({entry.data["value"]})'
else:
entry_type = entry.metadata['type']
if entry.page is not None:
page_num = entry.page.header['page_index']
page_status = entry.page.header['status']
else:
page_num = 'Unknown'
page_status = 'Unknown'
entry_key_tab_cnt = len(entry.key) // 8
entry_key_tab = '\t' * (3 - entry_key_tab_cnt)
namespace_tab_cnt = len(entry_type) // 8
namepace_tab = '\t' * (2 - namespace_tab_cnt)
namespace_str = f'{entry.metadata["namespace"]}'
nvs_log.info(
nvs_log.red(
f'Entry #{entry.index:03d} {entry.key} is a duplicate!'
f'#{entry.index:03d}\t{entry.key}{entry_key_tab}{entry_type}{namepace_tab}{namespace_str}\t\t{page_num}\t{page_status}'
)
)
nvs_log.info()
# Blob checks
# Assemble blobs
def assemble_blobs(nvs_log: NVS_Logger) -> None:
"""Assembles blob data from blob chunks
"""
for chunk in blob_chunks:
# chunk: NVS_Entry
parent = blobs.get(
f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry]
f'{chunk.metadata["namespace"]:03d}{chunk.key}', [EMPTY_ENTRY]
)[0]
# Blob chunk without blob index check
if parent is empty_entry:
if parent is EMPTY_ENTRY:
nvs_log.info(
nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
f'Namespace index: {chunk.metadata["namespace"]:03d}',
@ -212,15 +403,19 @@ with status {page.header["status"]} is used by the following entries:'''
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
blobs[blob_key][chunk_index + 1] = chunk
# Blob data check
def check_blob_data(nvs_log: NVS_Logger) -> None:
"""Checks blob data for missing chunks or data
"""
for blob_key in blobs:
blob_index = blobs[blob_key][0]
blob_chunks = blobs[blob_key][1:]
blob_size = blob_index.data['size']
for i, chunk in enumerate(blob_chunks):
# chunk: NVS_Entry
# Blob missing chunk check
if chunk is empty_entry:
if chunk is EMPTY_ENTRY:
nvs_log.info(
nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
@ -237,11 +432,21 @@ with status {page.header["status"]} is used by the following entries:'''
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
)
# Namespace checks
def check_blobs(nvs_log: NVS_Logger) -> None:
# Assemble blobs
assemble_blobs(nvs_log)
# Blob data check
check_blob_data(nvs_log)
def check_namespaces(nvs_log: NVS_Logger) -> None:
"""Checks namespaces (entries using undefined namespace indexes, unused namespaces)
"""
# Undefined namespace index check
for used_ns in used_namespaces:
key = found_namespaces.pop(used_ns, '')
if key == '':
key = found_namespaces.pop(used_ns, None)
if key is None:
nvs_log.info(
nvs_log.red('Undefined namespace index!'),
f'Namespace index: {used_ns:03d}',
@ -255,3 +460,63 @@ with status {page.header["status"]} is used by the following entries:'''
f'Namespace index: {unused_ns:03d}',
f'[{found_namespaces[unused_ns]}]',
)
def reset_global_variables() -> None:
"""Global variables need to be cleared out before calling `integrity_check()` multiple times from a script
(e.g. when running tests) to avoid incorrect output
"""
global used_namespaces, found_namespaces, blobs, blob_chunks
used_namespaces = {}
found_namespaces = {}
blobs = {}
blob_chunks = []
def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
"""Function for multi-stage integrity check of a NVS partition
"""
# Partition size check
check_partition_size(nvs_partition, nvs_log)
# Free/empty page check
check_empty_page_present(nvs_partition, nvs_log)
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
# Loop through all pages in the partition
for page in nvs_partition.pages:
# page: NVS_Page
# Print a page header
if page.header['status'] == 'Empty':
# Check if a page is truly empty
check_empty_page_content(page, nvs_log)
else:
# Check a page header CRC32
check_page_crc(page, nvs_log)
# Check all entries in a page
seen_written_entires = check_page_entries(page, nvs_log)
# Collect all seen written entries
for key in seen_written_entires:
if key in seen_written_entires_all:
seen_written_entires_all[key].extend(seen_written_entires[key])
else:
seen_written_entires_all[key] = seen_written_entires[key]
# Duplicate entry check (2) - same key, different index
duplicates = filter_entry_duplicates(seen_written_entires_all)
# Print duplicate entries
print_entry_duplicates(duplicates, nvs_log)
nvs_log.info() # Empty line
# Blob checks
check_blobs(nvs_log)
# Namespace checks
check_namespaces(nvs_log)
reset_global_variables()

View File

@ -1,7 +1,10 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Dict, List, Optional
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from zlib import crc32
@ -61,8 +64,9 @@ class NVS_Partition:
f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})'
)
# Divide partition into pages
self.name = name
self.raw_data = raw_data
# Divide partition into pages
self.pages = []
for i in range(0, len(raw_data), nvs_const.page_size):
self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i))
@ -216,6 +220,7 @@ class NVS_Entry:
self.state = entry_state
self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size
self.index = index
self.page = None
namespace = self.raw[0]
entry_type = self.raw[1]

View File

@ -0,0 +1,12 @@
[pytest]
addopts = -s -p no:pytest_embedded
# log related
log_cli = True
log_cli_level = INFO
log_cli_format = %(asctime)s %(levelname)s %(message)s
log_cli_date_format = %Y-%m-%d %H:%M:%S
## log all to `system-out` when case fail
junit_logging = stdout
junit_log_passing_tests = False

View File

@ -0,0 +1,443 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from importlib.metadata import version
from io import BufferedRandom
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from zlib import crc32
import esp_idf_nvs_partition_gen.nvs_partition_gen as nvs_partition_gen
import nvs_check as nvs_check
import pytest
from esp_idf_nvs_partition_gen.nvs_partition_gen import NVS
from nvs_logger import nvs_log
from nvs_logger import NVS_Logger
from nvs_parser import nvs_const
from nvs_parser import NVS_Entry
from nvs_parser import NVS_Partition
from packaging.version import Version
NVS_PART_GEN_VERSION_SKIP = '0.1.8'
# Temporary workaround for pytest skipping tests based on the version of the esp-idf-nvs-partition-gen package
@pytest.fixture(scope='session', autouse=True)
def before() -> None:
ver = version('esp-idf-nvs-partition-gen')
if Version(ver) < Version(NVS_PART_GEN_VERSION_SKIP):
pytest.skip('pass')
class SilentLogger(NVS_Logger):
def __init__(self) -> None:
super().__init__()
self.color = False
def info(self, *args, **kwargs) -> None: # type: ignore
pass
logger = nvs_log # SilentLogger()
LOREM_STRING = '''Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Nullam eget orci fringilla, cursus nisi sit amet, hendrerit tortor.
Vivamus lectus dolor, rhoncus eget metus id, convallis placerat quam.
Nulla facilisi.
In at aliquam nunc, in dictum augue.
Nullam dapibus ligula nec enim commodo lobortis.
Praesent facilisis ante nec magna various lobortis.
Phasellus sodales sed nisi vitae pulvinar.
Aliquam tempor quis sem et tempor.
Etiam interdum nunc quis justo pharetra, sed finibus arcu lacinia.
Suspendisse potenti.
Praesent et turpis ut justo accumsan pellentesque sed at leo.
Aenean consequat ligula ac mattis porta.
Nullam id justo a arcu tincidunt sodales.
Nunc rhoncus pretium nibh ut convallis.
Maecenas orci enim, tincidunt eget vestibulum eu, placerat non ante.
Proin sit amet felis tempor, ullamcorper sem sed, scelerisque nibh.
Aliquam sit amet semper leo, in fringilla nulla.
Vestibulum sit amet tortor tincidunt, laoreet risus eget, ullamcorper sapien.
Fusce non finibus nisl. Cras vitae dui nibh.
Sed fermentum ullamcorper various.
Integer sit amet elit sed nunc fringilla molestie nec nec diam.
Etiam et ornare tellus.
Donec tristique auctor urna, ac aliquam tellus sodales id.
Duis nec magna eget mi consequat gravida.
Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae;
Name imperdiet ante neque, nec viverra sem pellentesque vel.
Sed nec arcu non nisl tempor pretium.
Quisque facilisis auctor lobortis.
Pellentesque sed finibus sem, eu lacinia tellus.
Vivamus imperdiet non augue in tincidunt.
Sed aliquet tincidunt dignissim.
Name vehicula leo eu dolor pellentesque, ultrices tempus ex hendrerit.
'''
def get_entry_type_bin(entry_type_str: str) -> Optional[int]:
# Reverse `item_type` dict lookup
entry_type_bin: Optional[int] = next(key for key, value in nvs_const.item_type.items() if value == entry_type_str)
if entry_type_bin is None:
logger.info(logger.yellow(f'Unknown entry type {entry_type_str}'))
return entry_type_bin
def create_entry_data_bytearray(namespace_index: int, entry_type: int, span: int, chunk_index: int, key: str, data: Any) -> bytearray:
key_bytearray = bytearray(key, 'ascii')
key_encoded = (key_bytearray + bytearray({0x00}) * (16 - len(key_bytearray)))[:16] # Pad key with null bytes
key_encoded[15] = 0x00 # Null-terminate the key
is_signed = entry_type >= 0x11 and entry_type <= 0x18
entry_data: bytearray = bytearray({0xFF}) * nvs_const.entry_size # Empty entry
entry_data[0] = namespace_index
entry_data[1] = entry_type
entry_data[2] = span
entry_data[3] = chunk_index
# entry_data[4:8] # CRC32
entry_data[8:24] = key_encoded
entry_data[24:32] = data.to_bytes(8, byteorder='little', signed=is_signed)
raw_without_crc = entry_data[:4] + entry_data[8:32]
entry_data[4:8] = crc32(raw_without_crc, 0xFFFFFFFF).to_bytes(4, byteorder='little', signed=False)
return entry_data
@pytest.fixture
def generate_nvs() -> Callable:
def _execute_nvs_setup(nvs_setup_func: Callable, output: Optional[Path] = None) -> NVS_Partition:
nvs_file: Optional[Union[BytesIO, BufferedRandom]] = None
if output is None:
nvs_file = BytesIO()
else:
try:
nvs_file = open(output, 'wb+')
except OSError as e:
raise RuntimeError(f'Cannot open file {output}, error: {e}')
nvs_obj = nvs_setup_func(nvs_file)
nvs_partition_gen.nvs_close(nvs_obj)
nvs_file.seek(0)
nvs_parsed = NVS_Partition('test', bytearray(nvs_file.read()))
nvs_file.close()
return nvs_parsed
return _execute_nvs_setup
# Setup functions
def setup_ok_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'int32_test', 'data', 'i32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'uint32_test', 'data', 'u32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'int8_test', 'data', 'i8', str(100))
return nvs_obj
def setup_ok_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x5000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'short_string_key', 'data', 'string', 'Hello world!')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2)
nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!')
nvs_partition_gen.write_entry(nvs_obj, 'multi_blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
return nvs_obj
def setup_ok_mixed(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
prim_types = ['i8', 'u8', 'i16', 'u16', 'i32', 'u32']
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_singlepage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'etc', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2)
nvs_partition_gen.write_entry(nvs_obj, 'abcd', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
return nvs_obj
def setup_bad_mixed_same_key_different_page(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
prim_types = ['i8', 'u8', 'i16', 'u16', 'i32', 'u32']
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_singlepage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'etc', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2)
nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
# Should be on a different page already - start creating duplicates
for i in range(6):
data_type = prim_types[i % len(prim_types)]
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', data_type, str(i)) # Conflicting keys under "abcd" namespace - 6 duplicates
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', 'abc') # Conflicting key for string - 7th duplicate
# Create new duplicates of storage namespace with an unsafe version of write_namespace function
nvs_obj.write_namespace_unsafe('storage') # Conflicting namespace - 8th duplicate (the function is only for testing)
nvs_partition_gen.write_entry(nvs_obj, 'storage2', 'namespace', '', '') # New namespace, ignored
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', 'abc') # Should be ignored as is under different "storage2" namespace
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', 'abc') # 3 conflicting keys under "storage2" namespace - 9th duplicate
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', 'def')
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', '123')
# This no longer (nvs generator version >= 0.1.5) creates a duplicate namespace entry, only changes
# the current used namespace index
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
return nvs_obj
def setup_bad_same_key_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'unique_key', 'data', 'i16', str(1234))
nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'i32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'u32', str(24))
nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'i8', str(-5))
nvs_partition_gen.write_entry(nvs_obj, 'another_same_key', 'data', 'u16', str(321))
nvs_partition_gen.write_entry(nvs_obj, 'another_same_key', 'data', 'u16', str(456))
return nvs_obj
def setup_bad_same_key_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'same_string_key', 'data', 'string', 'Hello')
nvs_partition_gen.write_entry(nvs_obj, 'same_string_key', 'data', 'string', 'world!')
nvs_partition_gen.write_entry(nvs_obj, 'unique_string_key', 'data', 'string', 'I am unique!')
return nvs_obj
def setup_bad_same_key_blob_index(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key_2', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin') # Duplicate key
return nvs_obj
def setup_read_only(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x1000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'int32_test', 'data', 'i32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'uint32_test', 'data', 'u32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'int8_test', 'data', 'i8', str(100))
nvs_partition_gen.write_entry(nvs_obj, 'short_string_key', 'data', 'string', 'Hello world!')
return nvs_obj
# Helper functions
def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]:
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
for page in nvs.pages:
# page: NVS_Page
for entry in page.entries:
# entry: NVS_Entry
# Duplicate entry check (1) - same key, different index - find duplicates
seen_written_entires_all = nvs_check.identify_entry_duplicates(entry, seen_written_entires_all)
# Duplicate entry check (2) - same key, different index
duplicates: Dict[str, List[NVS_Entry]] = nvs_check.filter_entry_duplicates(seen_written_entires_all)
return duplicates
# Tests
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_partition_size(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
assert nvs_check.check_partition_size(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_empty_page_present(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
assert nvs_check.check_empty_page_present(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_empty_page_content__check_page_crc(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
for page in nvs.pages:
if page.header['status'] == 'Empty':
assert page.is_empty
assert nvs_check.check_empty_page_content(page, logger)
else:
assert not page.is_empty
assert nvs_check.check_page_crc(page, logger)
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_duplicates_ok(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 0 # No duplicates in any page
@pytest.mark.parametrize('setup_func', [setup_bad_same_key_primitive])
def test_check_duplicates_bad_same_key_primitive_type(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 2 # 2 different lists of duplicate keys
assert len(list(duplicates.values())[0]) == 3 # 3 entries with the same_key
assert len(list(duplicates.values())[1]) == 2 # 2 entries with the another_same_key
nvs_check.integrity_check(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_bad_same_key_variable_len])
def test_check_duplicates_bad_same_key_variable_len_type(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 1 # Only one duplicate key list
assert len(list(duplicates.values())[0]) == 2 # 2 entries with the same_string_key
nvs_check.integrity_check(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_bad_mixed_same_key_different_page])
def test_check_duplicates_bad_same_key_different_pages(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 9 # 9 duplicate keys in total (8 pairs of 2 duplicates + 1 triplet)
for i, value in enumerate(list(duplicates.values())):
if i < 8:
assert len(value) == 2 # i in range 0-7 -- pairs of 2 entries with the same key
else:
assert len(value) == 3 # i == 8 -- 3 entries with the lorem_string key
nvs_check.integrity_check(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_bad_same_key_blob_index])
def test_check_duplicates_bad_same_key_blob_index(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 1 # Only one duplicate key list - blob_index and blob_data share the same key (which is OK),
# however there are 2 duplicates of each blob_index and blob_data
assert len(list(duplicates.values())[0]) == 6 # 6 entries with the blob_key (2x blob_index, 4x blob_data)
nvs_check.integrity_check(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_read_only])
def test_check_read_only_partition(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
assert nvs.raw_data is not None
assert len(nvs.raw_data) == 0x1000
assert nvs_check.check_partition_size(nvs, logger, read_only=True)
assert not nvs_check.check_empty_page_present(nvs, logger)