docs(nvs): Document nvs_check.py functions

This commit is contained in:
Adam Múdry
2024-09-24 11:52:14 +02:00
parent 1eacc6c2a8
commit 9620508388

View File

@ -22,6 +22,22 @@ blob_chunks: List[NVS_Entry] = []
def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
""" Checks if the partition is large enough and has enough pages
"""
if len(nvs_partition.raw_data) / 0x1000 < 3:
nvs_log.info(
nvs_log.yellow(
'NVS Partition size must be at least 0x3000 (4kiB * 3 pages == 12kiB)!'
)
)
return False
if len(nvs_partition.raw_data) % 0x1000 != 0:
nvs_log.info(
nvs_log.yellow(
'NVS Partition size must be a multiple of 0x1000 (4kiB)!'
)
)
return False
if len(nvs_partition.pages) < 3:
nvs_log.info(
nvs_log.yellow(
@ -83,16 +99,22 @@ def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
return False
def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
def identify_entry_duplicates(entry: NVS_Entry, entry_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Identifies and logs written entries
Part 1 of duplicate entry check mechanism
"""
if entry.state == 'Written':
if entry.key in seen_written_entires:
seen_written_entires[entry.key].append(entry)
if entry.key in entry_dict:
entry_dict[entry.key].append(entry)
else:
seen_written_entires[entry.key] = [entry]
return seen_written_entires
entry_dict[entry.key] = [entry]
return entry_dict
def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, List[NVS_Entry]]:
"""Checks entries in the given page (entry state, children CRC32, entry type, span and gathers blobs and namespaces)
"""
seen_written_entires: Dict[str, List[NVS_Entry]] = {}
for entry in nvs_page.entries:
@ -203,6 +225,11 @@ def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, Lis
def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates,
where entries which have the same key but under different namespaces are filtered out
Use `filter_entry_duplicates()` to properly filter out all duplicates
"""
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
for key, duplicate_entries in duplicate_entries_dict.items():
seen_entries: List[NVS_Entry] = []
@ -237,6 +264,11 @@ def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS
def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates,
where entries related to blob index and blob data under the same namespace are filtered out
Use `filter_entry_duplicates()` to properly filter out all duplicates
"""
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
for key, duplicate_entries in duplicate_entries_dict.items():
seen_blob_index: List[NVS_Entry] = []
@ -298,9 +330,19 @@ def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_En
return new_duplicate_entries_dict
def filter_entry_duplicates(seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1}
def filter_entry_duplicates(entries: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
"""Takes a dictionary of (seen written) entries and outputs a new dictionary with "fake" duplicates filtered out, keeping only real duplicates in
(i.e. duplicate keys under different namespaces and blob index and blob data having the same key under the same namespace are allowed
and should be filtered out)
Part 2 of duplicate entry check mechanism
"""
# Only keep seen written entries which have been observerd multiple times (duplicates)
duplicate_entries_list = {key: v for key, v in entries.items() if len(v) > 1}
# Filter out "fake" duplicates 1 (duplicate keys under different namespaces are allowed)
duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list)
# Filter out "fake" duplicates 2 (blob index and blob data are allowed to have the same key even in the same namespace)
duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1)
return duplicate_entries_list_2
@ -341,6 +383,8 @@ def print_entry_duplicates(duplicate_entries_list: Dict[str, List[NVS_Entry]], n
def assemble_blobs(nvs_log: NVS_Logger) -> None:
"""Assembles blob data from blob chunks
"""
for chunk in blob_chunks:
# chunk: NVS_Entry
parent = blobs.get(
@ -361,6 +405,8 @@ def assemble_blobs(nvs_log: NVS_Logger) -> None:
def check_blob_data(nvs_log: NVS_Logger) -> None:
"""Checks blob data for missing chunks or data
"""
for blob_key in blobs:
blob_index = blobs[blob_key][0]
blob_chunks = blobs[blob_key][1:]
@ -395,6 +441,8 @@ def check_blobs(nvs_log: NVS_Logger) -> None:
def check_namespaces(nvs_log: NVS_Logger) -> None:
"""Checks namespaces (entries using undefined namespace indexes, unused namespaces)
"""
# Undefined namespace index check
for used_ns in used_namespaces:
key = found_namespaces.pop(used_ns, None)
@ -415,6 +463,9 @@ def check_namespaces(nvs_log: NVS_Logger) -> None:
def reset_global_variables() -> None:
"""Global variables need to be cleared out before calling `integrity_check()` multiple times from a script
(e.g. when running tests) to avoid incorrect output
"""
global used_namespaces, found_namespaces, blobs, blob_chunks
used_namespaces = {}
found_namespaces = {}
@ -423,6 +474,8 @@ def reset_global_variables() -> None:
def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
"""Function for multi-stage integrity check of a NVS partition
"""
# Partition size check
check_partition_size(nvs_partition, nvs_log)
@ -431,18 +484,19 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
# Loop through all pages in the partition
for page in nvs_partition.pages:
# page: NVS_Page
# Print page header
# Print a page header
if page.header['status'] == 'Empty':
# Check if page is truly empty
# Check if a page is truly empty
check_empty_page_content(page, nvs_log)
else:
# Check page header CRC32
# Check a page header CRC32
check_page_crc(page, nvs_log)
# Check all entries
# Check all entries in a page
seen_written_entires = check_page_entries(page, nvs_log)
# Collect all seen written entries