diff --git a/components/nvs_flash/nvs_partition_tool/nvs_check.py b/components/nvs_flash/nvs_partition_tool/nvs_check.py index b6a4da3aba..ca2cd4680a 100644 --- a/components/nvs_flash/nvs_partition_tool/nvs_check.py +++ b/components/nvs_flash/nvs_partition_tool/nvs_check.py @@ -22,6 +22,22 @@ blob_chunks: List[NVS_Entry] = [] def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool: + """ Checks if the partition is large enough and has enough pages + """ + if len(nvs_partition.raw_data) / 0x1000 < 3: + nvs_log.info( + nvs_log.yellow( + 'NVS Partition size must be at least 0x3000 (4kiB * 3 pages == 12kiB)!' + ) + ) + return False + if len(nvs_partition.raw_data) % 0x1000 != 0: + nvs_log.info( + nvs_log.yellow( + 'NVS Partition size must be a multiple of 0x1000 (4kiB)!' + ) + ) + return False if len(nvs_partition.pages) < 3: nvs_log.info( nvs_log.yellow( @@ -83,16 +99,22 @@ def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool: return False -def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: +def identify_entry_duplicates(entry: NVS_Entry, entry_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + """Identifies and logs written entries + + Part 1 of duplicate entry check mechanism + """ if entry.state == 'Written': - if entry.key in seen_written_entires: - seen_written_entires[entry.key].append(entry) + if entry.key in entry_dict: + entry_dict[entry.key].append(entry) else: - seen_written_entires[entry.key] = [entry] - return seen_written_entires + entry_dict[entry.key] = [entry] + return entry_dict def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, List[NVS_Entry]]: + """Checks entries in the given page (entry state, children CRC32, entry type, span and gathers blobs and namespaces) + """ seen_written_entires: Dict[str, List[NVS_Entry]] = {} for entry in nvs_page.entries: @@ -203,6 +225,11 @@ def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, Lis def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + """Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates, + where entries which have the same key but under different namespaces are filtered out + + Use `filter_entry_duplicates()` to properly filter out all duplicates + """ new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {} for key, duplicate_entries in duplicate_entries_dict.items(): seen_entries: List[NVS_Entry] = [] @@ -237,6 +264,11 @@ def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + """Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates, + where entries related to blob index and blob data under the same namespace are filtered out + + Use `filter_entry_duplicates()` to properly filter out all duplicates + """ new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {} for key, duplicate_entries in duplicate_entries_dict.items(): seen_blob_index: List[NVS_Entry] = [] @@ -298,9 +330,19 @@ def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_En return new_duplicate_entries_dict -def filter_entry_duplicates(seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: - duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1} +def filter_entry_duplicates(entries: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + """Takes a dictionary of (seen written) entries and outputs a new dictionary with "fake" duplicates filtered out, keeping only real duplicates in + + (i.e. duplicate keys under different namespaces and blob index and blob data having the same key under the same namespace are allowed + and should be filtered out) + + Part 2 of duplicate entry check mechanism + """ + # Only keep seen written entries which have been observerd multiple times (duplicates) + duplicate_entries_list = {key: v for key, v in entries.items() if len(v) > 1} + # Filter out "fake" duplicates 1 (duplicate keys under different namespaces are allowed) duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list) + # Filter out "fake" duplicates 2 (blob index and blob data are allowed to have the same key even in the same namespace) duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1) return duplicate_entries_list_2 @@ -341,6 +383,8 @@ def print_entry_duplicates(duplicate_entries_list: Dict[str, List[NVS_Entry]], n def assemble_blobs(nvs_log: NVS_Logger) -> None: + """Assembles blob data from blob chunks + """ for chunk in blob_chunks: # chunk: NVS_Entry parent = blobs.get( @@ -361,6 +405,8 @@ def assemble_blobs(nvs_log: NVS_Logger) -> None: def check_blob_data(nvs_log: NVS_Logger) -> None: + """Checks blob data for missing chunks or data + """ for blob_key in blobs: blob_index = blobs[blob_key][0] blob_chunks = blobs[blob_key][1:] @@ -395,6 +441,8 @@ def check_blobs(nvs_log: NVS_Logger) -> None: def check_namespaces(nvs_log: NVS_Logger) -> None: + """Checks namespaces (entries using undefined namespace indexes, unused namespaces) + """ # Undefined namespace index check for used_ns in used_namespaces: key = found_namespaces.pop(used_ns, None) @@ -415,6 +463,9 @@ def check_namespaces(nvs_log: NVS_Logger) -> None: def reset_global_variables() -> None: + """Global variables need to be cleared out before calling `integrity_check()` multiple times from a script + (e.g. when running tests) to avoid incorrect output + """ global used_namespaces, found_namespaces, blobs, blob_chunks used_namespaces = {} found_namespaces = {} @@ -423,6 +474,8 @@ def reset_global_variables() -> None: def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: + """Function for multi-stage integrity check of a NVS partition + """ # Partition size check check_partition_size(nvs_partition, nvs_log) @@ -431,18 +484,19 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: seen_written_entires_all: Dict[str, List[NVS_Entry]] = {} + # Loop through all pages in the partition for page in nvs_partition.pages: # page: NVS_Page - # Print page header + # Print a page header if page.header['status'] == 'Empty': - # Check if page is truly empty + # Check if a page is truly empty check_empty_page_content(page, nvs_log) else: - # Check page header CRC32 + # Check a page header CRC32 check_page_crc(page, nvs_log) - # Check all entries + # Check all entries in a page seen_written_entires = check_page_entries(page, nvs_log) # Collect all seen written entries