diff --git a/components/nvs_flash/nvs_partition_tool/nvs_check.py b/components/nvs_flash/nvs_partition_tool/nvs_check.py index fd38978e11..6051ee80a3 100644 --- a/components/nvs_flash/nvs_partition_tool/nvs_check.py +++ b/components/nvs_flash/nvs_partition_tool/nvs_check.py @@ -4,17 +4,21 @@ from typing import Dict, List from nvs_logger import NVS_Logger -from nvs_parser import NVS_Entry, NVS_Partition, nvs_const +from nvs_parser import nvs_const +from nvs_parser import NVS_Entry +from nvs_parser import NVS_Page +from nvs_parser import NVS_Partition -def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: - used_namespaces: Dict[int, None] = {} - found_namespaces: Dict[int, str] = {} - blobs: Dict = {} - blob_chunks: List[NVS_Entry] = [] - empty_entry = NVS_Entry(-1, bytearray(32), 'Erased') +EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased') - # Partition size check +used_namespaces: Dict[int, None] = {} +found_namespaces: Dict[int, str] = {} +blobs: Dict = {} +blob_chunks: List[NVS_Entry] = [] + + +def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: if len(nvs_partition.pages) < 3: nvs_log.info( nvs_log.yellow( @@ -22,7 +26,8 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: ) ) - # Free/empty page check + +def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages): nvs_log.info( nvs_log.red( @@ -32,175 +37,186 @@ at least one free page is required for proper function!''' ) nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n')) - for page in nvs_partition.pages: - # page: NVS_Page - # Print page header - if page.header['status'] == 'Empty': - nvs_log.info(nvs_log.cyan('Page Empty')) +def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: + nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}')) - # Check if page is truly empty - if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size: + if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size: + nvs_log.info( + nvs_log.red( + 'The page is reported as Empty but its entry state bitmap is not empty!' + ) + ) + + if any([not e.is_empty for e in nvs_page.entries]): + nvs_log.info( + nvs_log.red('The page is reported as Empty but there are data written!') + ) + + +def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: + if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']: + nvs_log.info( + nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK' + ) + else: + nvs_log.info( + nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), + f'Original CRC32:', + nvs_log.red(f'{nvs_page.header["crc"]["original"]:x}'), + f'Generated CRC32:', + nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'), + ) + + +def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, list[NVS_Entry]]: + if entry.state == 'Written': + if entry.key in seen_written_entires: + seen_written_entires[entry.key].append(entry) + else: + seen_written_entires[entry.key] = [entry] + return seen_written_entires + + +def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, list[NVS_Entry]]: + seen_written_entires: Dict[str, list[NVS_Entry]] = {} + + for entry in nvs_page.entries: + # entry: NVS_Entry + + # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data + + # Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless) + # and are stored in as entries inside string/blob data entry 'entry.children' list + + # Duplicate entry check (1) - same key, different index - find duplicates + seen_written_entires = identify_entry_duplicates(entry, seen_written_entires) + + # Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data) + if entry.is_empty: + if entry.state == 'Written': nvs_log.info( nvs_log.red( - 'The page is reported as Empty but its entry state bitmap is not empty!' + f' Entry #{entry.index:03d} is reported as Written but it is empty!' ) ) - if any([not e.is_empty for e in page.entries]): + continue + elif entry.state == 'Erased': nvs_log.info( - nvs_log.red('The page is reported as Empty but there are data written!') + nvs_log.yellow( + f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)' + ) ) - else: - # Check page header CRC32 - if page.header['crc']['original'] == page.header['crc']['computed']: + + if entry.state == 'Written': + # Entry CRC32 check + if ( + entry.metadata['crc']['original'] + != entry.metadata['crc']['computed'] + ): nvs_log.info( - nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK' + nvs_log.red( + f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}' + ), + f'Written:', + nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'), + f'Generated:', + nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'), ) + + # Entry children CRC32 check + if ( + entry.metadata['span'] > 1 + and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed']) + ): + nvs_log.info( + nvs_log.red( + f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!' + ), + f'Written:', + nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'), + f'Generated:', + nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'), + ) + + # Entry type check + if entry.metadata['type'] not in [ + nvs_const.item_type[key] for key in nvs_const.item_type + ]: + nvs_log.info( + nvs_log.yellow( + f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!' + ), + f'Type: {entry.metadata["type"]}', + ) + + # Span check + if ( + entry.index + entry.metadata['span'] - 1 + >= int(nvs_const.page_size / nvs_const.entry_size) - 2 + ): + nvs_log.info( + nvs_log.red( + f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!' + ) + ) + # Spanned entry state checks + elif entry.metadata['span'] > 1: + parent_state = entry.state + for kid in entry.children: + if parent_state != kid.state: + nvs_log.info( + nvs_log.yellow(' Inconsistent data state!'), + f'Entry #{entry.index:03d} {entry.key} state: {parent_state},', + f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}', + ) + + # Gather blobs & namespaces + if entry.metadata['type'] == 'blob_index': + blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [ + EMPTY_ENTRY + ] * entry.data['chunk_count'] + elif entry.metadata['type'] == 'blob_data': + blob_chunks.append(entry) + + if entry.metadata['namespace'] == 0: + found_namespaces[entry.data['value']] = entry.key else: - nvs_log.info( - nvs_log.cyan(f'Page no. {page.header["page_index"]}'), - f'Original CRC32:', - nvs_log.red(f'{page.header["crc"]["original"]:x}'), - f'Generated CRC32:', - nvs_log.green(f'{page.header["crc"]["computed"]:x}'), - ) + used_namespaces[entry.metadata['namespace']] = None - # Check all entries - seen_written_entires: Dict[str, list[NVS_Entry]] = {} - for entry in page.entries: - # entry: NVS_Entry + return seen_written_entires - # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data - # Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless) - # and are stored in as entries inside string/blob data entry 'entry.children' list +def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> List[List[NVS_Entry]]: + duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1] + return duplicate_entries_list - # Duplicate entry check (1) - same key, different index - find duplicates - if entry.state == 'Written': - if entry.key in seen_written_entires: - seen_written_entires[entry.key].append(entry) - else: - seen_written_entires[entry.key] = [entry] - # Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data) - if entry.is_empty: - if entry.state == 'Written': - nvs_log.info( - nvs_log.red( - f' Entry #{entry.index:03d} is reported as Written but it is empty!' - ) - ) - continue - elif entry.state == 'Erased': - nvs_log.info( - nvs_log.yellow( - f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)' - ) - ) - - if entry.state == 'Written': - # Entry CRC32 check - if ( - entry.metadata['crc']['original'] - != entry.metadata['crc']['computed'] - ): - nvs_log.info( - nvs_log.red( - f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}' - ), - f'Written:', - nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'), - f'Generated:', - nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'), - ) - - # Entry children CRC32 check - if ( - entry.metadata['span'] > 1 - and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed']) - ): - nvs_log.info( - nvs_log.red( - f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!' - ), - f'Written:', - nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'), - f'Generated:', - nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'), - ) - - # Entry type check - if entry.metadata['type'] not in [ - nvs_const.item_type[key] for key in nvs_const.item_type - ]: - nvs_log.info( - nvs_log.yellow( - f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!' - ), - f'Type: {entry.metadata["type"]}', - ) - - # Span check - if ( - entry.index + entry.metadata['span'] - 1 - >= int(nvs_const.page_size / nvs_const.entry_size) - 2 - ): - nvs_log.info( - nvs_log.red( - f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!' - ) - ) - # Spanned entry state checks - elif entry.metadata['span'] > 1: - parent_state = entry.state - for kid in entry.children: - if parent_state != kid.state: - nvs_log.info( - nvs_log.yellow(' Inconsistent data state!'), - f'Entry #{entry.index:03d} {entry.key} state: {parent_state},', - f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}', - ) - - # Gather blobs & namespaces - if entry.metadata['type'] == 'blob_index': - blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [ - empty_entry - ] * entry.data['chunk_count'] - elif entry.metadata['type'] == 'blob_data': - blob_chunks.append(entry) - - if entry.metadata['namespace'] == 0: - found_namespaces[entry.data['value']] = entry.key - else: - used_namespaces[entry.metadata['namespace']] = None - - # Duplicate entry check (2) - same key, different index - print duplicates - duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1] - for duplicate_entries in duplicate_entries_list: - # duplicate_entries: list[NVS_Entry] +def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: List[List[NVS_Entry]], nvs_log: NVS_Logger) -> None: + for duplicate_entries in duplicate_entries_list: + # duplicate_entries: list[NVS_Entry] + nvs_log.info( + nvs_log.red( + f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]} +with status {page.header["status"]} is used by the following entries:''' + ) + ) + for entry in duplicate_entries: nvs_log.info( nvs_log.red( - f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]} -with status {page.header["status"]} is used by the following entries:''' + f'Entry #{entry.index:03d} {entry.key} is a duplicate!' ) ) - for entry in duplicate_entries: - nvs_log.info( - nvs_log.red( - f'Entry #{entry.index:03d} {entry.key} is a duplicate!' - ) - ) - nvs_log.info() - # Blob checks - # Assemble blobs +def assemble_blobs(nvs_log: NVS_Logger) -> None: for chunk in blob_chunks: + # chunk: NVS_Entry parent = blobs.get( - f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry] + f'{chunk.metadata["namespace"]:03d}{chunk.key}', [EMPTY_ENTRY] )[0] # Blob chunk without blob index check - if parent is empty_entry: + if parent is EMPTY_ENTRY: nvs_log.info( nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'), f'Namespace index: {chunk.metadata["namespace"]:03d}', @@ -212,15 +228,19 @@ with status {page.header["status"]} is used by the following entries:''' chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start'] blobs[blob_key][chunk_index + 1] = chunk - # Blob data check + # return blobs + + +def check_blob_data(nvs_log: NVS_Logger) -> None: for blob_key in blobs: blob_index = blobs[blob_key][0] blob_chunks = blobs[blob_key][1:] blob_size = blob_index.data['size'] for i, chunk in enumerate(blob_chunks): + # chunk: NVS_Entry # Blob missing chunk check - if chunk is empty_entry: + if chunk is EMPTY_ENTRY: nvs_log.info( nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'), f'Namespace index: {blob_index.metadata["namespace"]:03d}', @@ -237,7 +257,15 @@ with status {page.header["status"]} is used by the following entries:''' f'Namespace index: {blob_index.metadata["namespace"]:03d}', ) - # Namespace checks + +def check_blobs(nvs_log: NVS_Logger) -> None: + # Assemble blobs + assemble_blobs(nvs_log) + # Blob data check + check_blob_data(nvs_log) + + +def check_namespaces(nvs_log: NVS_Logger) -> None: # Undefined namespace index check for used_ns in used_namespaces: key = found_namespaces.pop(used_ns, '') @@ -255,3 +283,38 @@ with status {page.header["status"]} is used by the following entries:''' f'Namespace index: {unused_ns:03d}', f'[{found_namespaces[unused_ns]}]', ) + + +def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: + # Partition size check + check_partition_size(nvs_partition, nvs_log) + + # Free/empty page check + check_empty_page_present(nvs_partition, nvs_log) + + for page in nvs_partition.pages: + # page: NVS_Page + + # Print page header + if page.header['status'] == 'Empty': + # Check if page is truly empty + check_empty_page_content(page, nvs_log) + else: + # Check page header CRC32 + check_page_crc(page, nvs_log) + + # Check all entries + seen_written_entires = check_page_entries(page, nvs_log) + + # Duplicate entry check (2) - same key, different index - print duplicates + duplicates = filter_entry_duplicates(seen_written_entires) + # Print duplicate entries + print_entry_duplicates(page, duplicates, nvs_log) + + nvs_log.info() # Empty line + + # Blob checks + check_blobs(nvs_log) + + # Namespace checks + check_namespaces(nvs_log)