mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-07 06:34:34 +02:00
Merge branch 'feature/nvs_tool_refinement' into 'master'
nvs: Partition tool refinement Closes IDF-6061 See merge request espressif/esp-idf!20683
This commit is contained in:
@@ -1,780 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
import argparse
|
||||
import sys
|
||||
import traceback
|
||||
from typing import Any, Dict, List
|
||||
from zlib import crc32
|
||||
|
||||
|
||||
class Logger:
|
||||
ansi = {
|
||||
'red': '\033[31m',
|
||||
'green': '\033[32m',
|
||||
'yellow': '\033[33m',
|
||||
'blue': '\033[34m',
|
||||
'cyan': '\033[36m',
|
||||
'bold': '\033[1m',
|
||||
'clear': '\033[0m'
|
||||
}
|
||||
|
||||
def __init__(self, *, color:str='auto'):
|
||||
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
|
||||
|
||||
def set_color(self, color:str) -> None:
|
||||
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
|
||||
|
||||
def info(self, *args, **kwargs) -> None: # type: ignore
|
||||
kwargs['file'] = kwargs.get('file', sys.stdout) # Set default output to be stdout, but can be overwritten
|
||||
print(*args, **kwargs)
|
||||
|
||||
def error(self, *args, **kwargs) -> None: # type: ignore
|
||||
kwargs['file'] = kwargs.get('file', sys.stderr) # Set default output to be stderr, but can be overwritten
|
||||
print(*args, **kwargs)
|
||||
|
||||
def red(self, text:str) -> str:
|
||||
if self.color:
|
||||
return Logger.ansi['red'] + text + Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def green(self, text:str) -> str:
|
||||
if self.color:
|
||||
return Logger.ansi['green'] + text + Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def yellow(self, text:str) -> str:
|
||||
if self.color:
|
||||
return Logger.ansi['yellow'] + text + Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def blue(self, text:str) -> str:
|
||||
if self.color:
|
||||
return Logger.ansi['blue'] + text + Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def cyan(self, text:str) -> str:
|
||||
if self.color:
|
||||
return Logger.ansi['cyan'] + text + Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def bold(self, text:str) -> str:
|
||||
if self.color:
|
||||
return Logger.ansi['bold'] + text + Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
|
||||
# Constants
|
||||
class Constants:
|
||||
class ConstantError(AttributeError):
|
||||
pass
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.page_size = 4096
|
||||
self.entry_size = 32
|
||||
self.item_type = {
|
||||
0x01: 'uint8_t',
|
||||
0x11: 'int8_t',
|
||||
0x02: 'uint16_t',
|
||||
0x12: 'int16_t',
|
||||
0x04: 'uint32_t',
|
||||
0x14: 'int32_t',
|
||||
0x08: 'uint64_t',
|
||||
0x18: 'int64_t',
|
||||
0x21: 'string',
|
||||
0x41: 'blob',
|
||||
0x42: 'blob_data',
|
||||
0x48: 'blob_index',
|
||||
}
|
||||
self.page_status = {
|
||||
0xffffffff: 'Empty',
|
||||
0xfffffffe: 'Active',
|
||||
0xfffffffc: 'Full',
|
||||
0xfffffff8: 'Freeing',
|
||||
}
|
||||
self.entry_status = {
|
||||
0b11: 'Empty',
|
||||
0b10: 'Written',
|
||||
0b00: 'Erased',
|
||||
}
|
||||
|
||||
def __setattr__(self, key:str, val:Any) -> None:
|
||||
if self.__dict__.get(key, None) is None:
|
||||
self.__dict__[key] = val
|
||||
else:
|
||||
raise Constants.ConstantError('Cannot change a constant!')
|
||||
|
||||
|
||||
const = Constants()
|
||||
log = Logger()
|
||||
|
||||
|
||||
class NotAlignedError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class NVS_Partition:
|
||||
def __init__(self, name:str, raw_data:bytearray):
|
||||
if len(raw_data) % const.page_size != 0:
|
||||
raise NotAlignedError(f'Given partition data is not aligned to page size ({len(raw_data)} % {const.page_size} = {len(raw_data)%const.page_size})')
|
||||
|
||||
# Divide partition into pages
|
||||
self.name = name
|
||||
self.pages = []
|
||||
for i in range(0, len(raw_data), const.page_size):
|
||||
self.pages.append(NVS_Page(raw_data[i:i + const.page_size], i))
|
||||
|
||||
|
||||
class NVS_Page:
|
||||
def __init__(self, page_data:bytearray, address:int):
|
||||
if len(page_data) != const.page_size:
|
||||
raise NotAlignedError(f'Size of given page does not match page size ({len(page_data)} != {const.page_size})')
|
||||
|
||||
# Initialize class
|
||||
self.is_empty = page_data[0:const.entry_size] == bytearray({0xff}) * const.entry_size
|
||||
self.start_address = address
|
||||
self.raw_header = page_data[0:const.entry_size]
|
||||
self.raw_entry_state_bitmap = page_data[const.entry_size:2 * const.entry_size]
|
||||
self.entries = []
|
||||
|
||||
# Load header
|
||||
self.header:Dict[str, Any] = {
|
||||
'status': const.page_status.get(int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'),
|
||||
'page_index': int.from_bytes(page_data[4:8], byteorder='little'),
|
||||
'version': 256 - page_data[8],
|
||||
'crc':{
|
||||
'original': int.from_bytes(page_data[28:32], byteorder='little'),
|
||||
'computed': crc32(page_data[4:28], 0xFFFFFFFF)
|
||||
}
|
||||
}
|
||||
|
||||
# Load entry state bitmap
|
||||
entry_states = []
|
||||
for c in self.raw_entry_state_bitmap:
|
||||
for index in range(0, 8, 2):
|
||||
entry_states.append(const.entry_status.get((c >> index) & 3, 'Invalid'))
|
||||
entry_states = entry_states[:-2]
|
||||
|
||||
# Load entries
|
||||
i = 2
|
||||
while i < int(const.page_size / const.entry_size): # Loop through every entry
|
||||
span = page_data[(i * const.entry_size) + 2]
|
||||
if span in [0xff, 0]: # 'Default' span length to prevent span overflow
|
||||
span = 1
|
||||
|
||||
# Load an entry
|
||||
entry = NVS_Entry(i - 2, page_data[i * const.entry_size:(i + 1) * const.entry_size], entry_states[i - 2])
|
||||
self.entries.append(entry)
|
||||
|
||||
# Load all children entries
|
||||
if span != 1:
|
||||
for span_idx in range(1, span):
|
||||
page_addr = i + span_idx
|
||||
entry_idx = page_addr - 2
|
||||
if page_addr * const.entry_size >= const.page_size:
|
||||
break
|
||||
child_entry = NVS_Entry(entry_idx, page_data[page_addr * const.entry_size:(page_addr + 1) * const.entry_size], entry_states[entry_idx])
|
||||
entry.child_assign(child_entry)
|
||||
entry.compute_crc()
|
||||
i += span
|
||||
|
||||
|
||||
class NVS_Entry:
|
||||
def __init__(self, index:int, entry_data:bytearray, entry_state:str):
|
||||
if len(entry_data) != const.entry_size:
|
||||
raise NotAlignedError(f'Given entry is not aligned to entry size ({len(entry_data)} % {const.entry_size} = {len(entry_data)%const.entry_size})')
|
||||
|
||||
def item_convert(i_type:int, data:bytearray) -> Dict:
|
||||
byte_size_mask = 0x0f
|
||||
number_sign_mask = 0xf0
|
||||
fixed_entry_length_threshold = 0x20 # Fixed length entry type number is always smaller than this
|
||||
if i_type in const.item_type:
|
||||
# Deal with non variable length entries
|
||||
if i_type < fixed_entry_length_threshold:
|
||||
size = i_type & byte_size_mask
|
||||
num = int.from_bytes(data[:size], byteorder='little', signed=bool(i_type & number_sign_mask))
|
||||
return {'value':num}
|
||||
|
||||
# Deal with variable length entries
|
||||
if const.item_type[i_type] in ['string', 'blob_data', 'blob']:
|
||||
size = int.from_bytes(data[:2], byteorder='little')
|
||||
crc = int.from_bytes(data[4:8], byteorder='little')
|
||||
return {'value': [size, crc],
|
||||
'size': size,
|
||||
'crc': crc}
|
||||
if const.item_type[i_type] == 'blob_index':
|
||||
size = int.from_bytes(data[:4], byteorder='little')
|
||||
chunk_count = data[4]
|
||||
chunk_start = data[5]
|
||||
return {'value': [size, chunk_count, chunk_start],
|
||||
'size': size,
|
||||
'chunk_count': chunk_count,
|
||||
'chunk_start': chunk_start}
|
||||
return {'value': log.red('Cannot parse')}
|
||||
|
||||
def key_decode(data:bytearray) -> str:
|
||||
decoded = ''
|
||||
for n in data:
|
||||
char = chr(n)
|
||||
if char.isprintable():
|
||||
decoded += char
|
||||
return decoded
|
||||
|
||||
self.raw = entry_data
|
||||
self.state = entry_state
|
||||
self.is_empty = self.raw == bytearray({0xff}) * const.entry_size
|
||||
self.index = index
|
||||
|
||||
namespace = self.raw[0]
|
||||
entry_type = self.raw[1]
|
||||
span = self.raw[2]
|
||||
chunk_index = self.raw[3]
|
||||
crc = self.raw[4:8]
|
||||
key = self.raw[8:24]
|
||||
data = self.raw[24:32]
|
||||
raw_without_crc = self.raw[:4] + self.raw[8:32]
|
||||
self.metadata: Dict[str, Any] = {
|
||||
'namespace': namespace,
|
||||
'type': const.item_type.get(entry_type, f'0x{entry_type:02x}'),
|
||||
'span': span,
|
||||
'chunk_index': chunk_index,
|
||||
'crc':{
|
||||
'original': int.from_bytes(crc, byteorder='little'),
|
||||
'computed': crc32(raw_without_crc, 0xFFFFFFFF),
|
||||
'data_original': int.from_bytes(data[-4:], byteorder='little'),
|
||||
'data_computed': 0
|
||||
}
|
||||
}
|
||||
self.children: List['NVS_Entry'] = []
|
||||
self.key = key_decode(key)
|
||||
self.data = item_convert(entry_type, data)
|
||||
|
||||
def dump_raw(self) -> str:
|
||||
hex_bytes = ''
|
||||
decoded = ''
|
||||
for i, c in enumerate(self.raw):
|
||||
middle_index = int(len(self.raw) / 2)
|
||||
if i == middle_index: # Add a space in the middle
|
||||
hex_bytes += ' '
|
||||
decoded += ' '
|
||||
hex_bytes += f'{c:02x} '
|
||||
decoded += chr(c) if chr(c).isprintable() else '.'
|
||||
return hex_bytes + ' ' + decoded
|
||||
|
||||
def child_assign(self, entry:'NVS_Entry') -> None:
|
||||
if not isinstance(entry, type(self)):
|
||||
raise ValueError('You can assign only NVS_Entry')
|
||||
self.children.append(entry)
|
||||
|
||||
def compute_crc(self) -> None:
|
||||
if self.metadata['span'] == 1:
|
||||
return
|
||||
|
||||
# Merge entries into one buffer
|
||||
children_data = bytearray()
|
||||
for entry in self.children:
|
||||
children_data += entry.raw
|
||||
children_data = children_data[:self.data['size']] # Discard padding
|
||||
|
||||
self.metadata['crc']['data_computed'] = crc32(children_data, 0xFFFFFFFF)
|
||||
|
||||
|
||||
def storage_stats(nvs_partition:NVS_Partition) -> None:
|
||||
global_stats = {
|
||||
'written_entries': 0,
|
||||
'free_entries': 0,
|
||||
'erased_entries': 0,
|
||||
'invalid_entries': 0
|
||||
}
|
||||
|
||||
for page in nvs_partition.pages:
|
||||
written_e = 0
|
||||
free_e = 0
|
||||
erased_e = 0
|
||||
invalid_e = 0
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written':
|
||||
written_e += 1
|
||||
elif entry.state == 'Empty':
|
||||
free_e += 1
|
||||
elif entry.state == 'Erased':
|
||||
erased_e += 1
|
||||
else:
|
||||
invalid_e += 1
|
||||
|
||||
log.info(log.bold(f'Page {page.header["status"]}'))
|
||||
log.info(' Found entries:')
|
||||
log.info(f' Written: {written_e: 5d}')
|
||||
log.info(f' Erased: {erased_e: 5d}')
|
||||
log.info(f' Empty: {free_e: 5d}')
|
||||
log.info(f' Invalid: {invalid_e: 5d}')
|
||||
log.info(f' Total: {written_e + free_e + erased_e + invalid_e: 5d}')
|
||||
log.info()
|
||||
global_stats['written_entries'] += written_e
|
||||
global_stats['erased_entries'] += erased_e
|
||||
global_stats['free_entries'] += free_e
|
||||
global_stats['invalid_entries'] += invalid_e
|
||||
|
||||
log.info(log.bold('Global'))
|
||||
log.info(' Config:')
|
||||
log.info(f' Page size: {const.page_size: 5d}')
|
||||
log.info(f' Entry size: {const.entry_size: 5d}')
|
||||
log.info(f' Total pages: {len(nvs_partition.pages): 5d}')
|
||||
log.info(' Entries:')
|
||||
log.info(f' Written: {global_stats["written_entries"]: 5d}')
|
||||
log.info(f' Erased: {global_stats["erased_entries"]: 5d}')
|
||||
log.info(f' Empty: {global_stats["free_entries"]: 5d}')
|
||||
log.info(f' Invalid: {global_stats["invalid_entries"]: 5d}')
|
||||
log.info(f' Total: {sum([global_stats[key] for key in global_stats]): 5d}')
|
||||
log.info()
|
||||
|
||||
|
||||
def dump_everything(nvs_partition:NVS_Partition, written_only:bool=False) -> None:
|
||||
for page in nvs_partition.pages:
|
||||
# Print page header
|
||||
if page.is_empty:
|
||||
log.info(log.bold(f'Page Empty, Page address: 0x{page.start_address:x}'))
|
||||
else:
|
||||
if page.header['crc']['original'] == page.header['crc']['computed']: # Color CRC32
|
||||
crc = log.green(f'{page.header["crc"]["original"]: >8x}')
|
||||
else:
|
||||
crc = log.red(f'{page.header["crc"]["original"]: >8x}')
|
||||
log.info(log.bold(f'Page no. {page.header["page_index"]}'
|
||||
+ f', Status: {page.header["status"]}'
|
||||
+ f', Version: {page.header["version"]}'
|
||||
+ f', CRC32: {crc}')
|
||||
+ log.bold(f', Page address: 0x{page.start_address:x}'))
|
||||
|
||||
log.info(log.bold(' Entry state bitmap: '), end='')
|
||||
for x in page.raw_entry_state_bitmap:
|
||||
log.info(f'{x:02x} ', end='')
|
||||
log.info()
|
||||
|
||||
# Dump entries
|
||||
empty_entries = []
|
||||
for entry in page.entries:
|
||||
# Skip non-written entries if needed
|
||||
if written_only and not entry.state == 'Written':
|
||||
continue
|
||||
|
||||
# Compress all empty entries
|
||||
if entry.state == 'Empty' and entry.is_empty: # Gather all subsequent empty entries
|
||||
empty_entries.append(entry)
|
||||
continue
|
||||
else:
|
||||
# Print the empty entries
|
||||
if len(empty_entries) >= 3: # There is enough entries to compress
|
||||
log.info(log.bold(f' {empty_entries[0].index:03d}.'), 'Empty')
|
||||
log.info(log.bold(' ...'))
|
||||
log.info(log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty')
|
||||
else: # No need for compression
|
||||
for e in empty_entries:
|
||||
log.info(log.bold(f' {e.index:03d}.'), 'Empty')
|
||||
empty_entries.clear()
|
||||
|
||||
# Dump a single entry
|
||||
status = entry.state
|
||||
if status == 'Written':
|
||||
status = log.green(f'{status: <7}')
|
||||
elif status == 'Erased':
|
||||
status = log.red(f'{status: <7}')
|
||||
|
||||
crc = ''
|
||||
if entry.metadata['crc']['original'] == entry.metadata['crc']['computed']: # Color CRC32
|
||||
crc = log.green(f'{entry.metadata["crc"]["original"]: >8x}')
|
||||
else:
|
||||
crc = log.red(f'{entry.metadata["crc"]["original"]: >8x}')
|
||||
|
||||
log.info(log.bold(f' {entry.index:03d}.')
|
||||
+ ' ' + status
|
||||
+ f', Namespace Index: {entry.metadata["namespace"]:03d}'
|
||||
+ f', Type: {entry.metadata["type"]:<10}'
|
||||
+ f', Span: {entry.metadata["span"]:03d}'
|
||||
+ f', Chunk Index: {entry.metadata["chunk_index"]:03d}'
|
||||
+ f', CRC32: {crc}'
|
||||
+ f' | {entry.key} : ', end='')
|
||||
|
||||
if entry.metadata['type'] not in ['string', 'blob_data', 'blob_index', 'blob']: # Entry is non-variable length
|
||||
log.info(entry.data['value'])
|
||||
else:
|
||||
if entry.metadata['type'] == 'blob_index':
|
||||
log.info(f'Size={entry.data["size"]}'
|
||||
+ f', ChunkCount={entry.data["chunk_count"]}'
|
||||
+ f', ChunkStart={entry.data["chunk_start"]}')
|
||||
else:
|
||||
if entry.metadata['crc']['data_original'] == entry.metadata['crc']['data_computed']: # Color CRC32
|
||||
crc = log.green(f'{entry.metadata["crc"]["data_original"]:x}')
|
||||
else:
|
||||
crc = log.red(f'{entry.metadata["crc"]["data_original"]:x}')
|
||||
log.info(f'Size={entry.data["size"]}, CRC32={crc}')
|
||||
|
||||
# Dump all children entries
|
||||
if entry.metadata['span'] != 1:
|
||||
for i, data in enumerate(entry.children):
|
||||
log.info(f'{"": >6}0x{(i*const.entry_size):03x} {data.dump_raw()}')
|
||||
|
||||
# Dump trailing empty entries
|
||||
if len(empty_entries) >= 3:
|
||||
log.info(log.bold(f' {empty_entries[0].index:03d}.'), 'Empty')
|
||||
log.info(log.bold(' ...'))
|
||||
log.info(log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty')
|
||||
else:
|
||||
for e in empty_entries:
|
||||
log.info(log.bold(f' {e.index:03d}.'), 'Empty')
|
||||
empty_entries.clear()
|
||||
log.info()
|
||||
|
||||
|
||||
def dump_written_entries(nvs_partition:NVS_Partition) -> None:
|
||||
dump_everything(nvs_partition, True)
|
||||
|
||||
|
||||
def list_namespaces(nvs_partition:NVS_Partition) -> None:
|
||||
# Gather namespaces
|
||||
ns = {}
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
|
||||
ns[entry.data['value']] = entry.key
|
||||
|
||||
# Print found namespaces
|
||||
log.info(log.bold(f'Index : Namespace'))
|
||||
for ns_index in sorted(ns):
|
||||
log.info(f' {ns_index:03d} :', log.cyan(ns[ns_index]))
|
||||
|
||||
|
||||
def dump_key_value_pairs(nvs_partition:NVS_Partition) -> None:
|
||||
# Get namespace list
|
||||
ns = {}
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
|
||||
ns[entry.data['value']] = entry.key
|
||||
|
||||
# Print key-value pairs
|
||||
for page in nvs_partition.pages:
|
||||
# Print page header
|
||||
if page.is_empty:
|
||||
log.info(log.bold('Page Empty'))
|
||||
else:
|
||||
log.info(log.bold(f'Page no. {page.header["page_index"]}'
|
||||
+ f', Status: {page.header["status"]}'))
|
||||
|
||||
# Print entries
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written' and entry.metadata['namespace'] != 0: # Ignore non-written entries
|
||||
chunk_index = ''
|
||||
data = ''
|
||||
if entry.metadata['type'] not in ['string', 'blob_data', 'blob_index', 'blob']: # Non-variable length entry
|
||||
data = entry.data['value']
|
||||
elif entry.metadata['type'] == 'blob_index':
|
||||
continue
|
||||
else: # Variable length entries
|
||||
tmp = b''
|
||||
for e in entry.children: # Merge all children entries
|
||||
tmp += bytes(e.raw)
|
||||
tmp = tmp[:entry.data['size']] # Discard padding
|
||||
if entry.metadata['type'] == 'blob_data':
|
||||
if entry.metadata['chunk_index'] >= 128: # Get real chunk index
|
||||
chunk_index = f'[{entry.metadata["chunk_index"] - 128}]'
|
||||
else:
|
||||
chunk_index = f'[{entry.metadata["chunk_index"]}]'
|
||||
data = str(tmp)
|
||||
|
||||
if entry.metadata['namespace'] not in ns:
|
||||
continue
|
||||
else:
|
||||
log.info(' '
|
||||
+ log.cyan(ns[entry.metadata['namespace']])
|
||||
+ ':'
|
||||
+ log.yellow(entry.key)
|
||||
+ f'{chunk_index} = {data}')
|
||||
log.info()
|
||||
|
||||
|
||||
def dump_written_blobs(nvs_partition:NVS_Partition) -> None:
|
||||
blobs: Dict = {}
|
||||
strings: List[NVS_Entry] = []
|
||||
legacy_blobs: List[NVS_Entry] = []
|
||||
ns = {}
|
||||
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
|
||||
|
||||
# Gather namespaces, blob indexes and legacy blobs
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written':
|
||||
if entry.metadata['type'] == 'blob_index':
|
||||
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [empty_entry] * entry.data['chunk_count']
|
||||
elif entry.metadata['type'] == 'blob':
|
||||
legacy_blobs.append(entry)
|
||||
elif entry.metadata['type'] == 'string':
|
||||
strings.append(entry)
|
||||
elif entry.metadata['namespace'] == 0:
|
||||
ns[entry.data['value']] = entry.key
|
||||
|
||||
# Dump blobs
|
||||
for key in blobs:
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
# Gather all blob chunks
|
||||
if entry.state == 'Written' \
|
||||
and entry.metadata['type'] != 'blob_index' \
|
||||
and entry.metadata['namespace'] == blobs[key][0].metadata['namespace'] \
|
||||
and entry.key == blobs[key][0].key:
|
||||
blobs[key][1 + entry.metadata['chunk_index'] - blobs[key][0].data['chunk_start']] = entry
|
||||
|
||||
blob_index = blobs[key][0]
|
||||
blob_chunks = blobs[key][1:]
|
||||
|
||||
# Print blob info
|
||||
log.info(log.cyan(ns.get(blob_index.metadata['namespace'], blob_index.metadata['namespace']))
|
||||
+ ':'
|
||||
+ log.yellow(blob_index.key)
|
||||
+ ' - '
|
||||
+ f'Type: Blob (Version 2), '
|
||||
+ f'Size: {blob_index.data["size"]}')
|
||||
|
||||
# Print blob data
|
||||
raw_entries = []
|
||||
for kid in blob_chunks: # Gather all chunk entries
|
||||
if kid is empty_entry:
|
||||
raw_entries += [empty_entry]
|
||||
else:
|
||||
raw_entries += kid.children
|
||||
|
||||
for i, entry in enumerate(raw_entries):
|
||||
if entry is empty_entry:
|
||||
log.info(log.yellow(f' {"":->63} Missing data {"":-<64}'))
|
||||
else:
|
||||
log.info(f' 0x{(i * const.entry_size):05x} {entry.dump_raw()}')
|
||||
log.info()
|
||||
|
||||
# Dump strings
|
||||
for string in strings:
|
||||
log.info(log.cyan(ns.get(string.metadata['namespace'], string.metadata['namespace']))
|
||||
+ ':'
|
||||
+ log.yellow(string.key)
|
||||
+ ' - '
|
||||
+ 'Type: String, '
|
||||
+ f'Size: {string.data["size"]}')
|
||||
for i, entry in enumerate(string.children):
|
||||
log.info(f' 0x{(i * const.entry_size):05x} {entry.dump_raw()}')
|
||||
log.info()
|
||||
|
||||
# Dump legacy blobs
|
||||
for blob in legacy_blobs:
|
||||
log.info(log.cyan(ns.get(blob.metadata['namespace'], blob.metadata['namespace']))
|
||||
+ ':'
|
||||
+ log.yellow(blob.key)
|
||||
+ ' - '
|
||||
+ 'Type: Blob (Version 1), '
|
||||
+ f'Size: {blob.data["size"]}')
|
||||
for i, entry in enumerate(blob.children):
|
||||
log.info(f' 0x{(i * const.entry_size):05x} {entry.dump_raw()}')
|
||||
log.info()
|
||||
|
||||
|
||||
def integrity_check(nvs_partition:NVS_Partition) -> None:
|
||||
used_namespaces: Dict[int, None] = {}
|
||||
found_namespaces: Dict[int, str] = {}
|
||||
blobs:Dict = {}
|
||||
blob_chunks: List[NVS_Entry] = []
|
||||
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
|
||||
|
||||
# Partition size
|
||||
if len(nvs_partition.pages) < 3:
|
||||
log.info(log.yellow('Partition has to have at least 3 pages to function properly!'))
|
||||
|
||||
for page in nvs_partition.pages:
|
||||
# Print page header
|
||||
if page.header['status'] == 'Empty':
|
||||
log.info(log.cyan('Page Empty'))
|
||||
|
||||
# Check if page is truly empty
|
||||
if page.raw_entry_state_bitmap != bytearray({0xff}) * const.entry_size:
|
||||
log.info(log.red('Page is reported as empty but entry state bitmap is not empty!'))
|
||||
if any([not e.is_empty for e in page.entries]):
|
||||
log.info(log.red('Page is reported as emtpy but there is written data!'))
|
||||
else:
|
||||
# Check page header CRC32
|
||||
if page.header['crc']['original'] == page.header['crc']['computed']:
|
||||
log.info(log.cyan(f'Page no. {page.header["page_index"]}'))
|
||||
else:
|
||||
log.info(log.cyan(f'Page no. {page.header["page_index"]}'),
|
||||
f'Written CRC32:',
|
||||
log.red(f'{page.header["crc"]["original"]:x}'),
|
||||
f'Generated CRC32:',
|
||||
log.green(f'{page.header["crc"]["computed"]:x}'))
|
||||
|
||||
# Check all entries
|
||||
for entry in page.entries:
|
||||
# Entry state check
|
||||
if entry.is_empty:
|
||||
if entry.state == 'Written':
|
||||
log.info(log.red(f' Entry #{entry.index:03d} is reported as written but is empty!'))
|
||||
continue
|
||||
elif entry.state == 'Erased':
|
||||
log.info(log.yellow(f' Entry #{entry.index:03d} is reported as erased but is empty!'))
|
||||
|
||||
if entry.state == 'Written':
|
||||
# Entry CRC32 check
|
||||
if entry.metadata['crc']['original'] != entry.metadata['crc']['computed']:
|
||||
log.info(log.red(f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'),
|
||||
f'Written:',
|
||||
log.red(f'{entry.metadata["crc"]["original"]:x}'),
|
||||
f'Generated:',
|
||||
log.green(f'{entry.metadata["crc"]["computed"]:x}'))
|
||||
|
||||
# Entry children CRC32 check
|
||||
if entry.metadata['span'] > 1 and entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed']:
|
||||
log.info(log.red(f' Entry #{entry.index:03d} {entry.key} data has wrong CRC32!'),
|
||||
f'Written:',
|
||||
log.red(f'{entry.metadata["crc"]["original"]:x}'),
|
||||
f'Generated:',
|
||||
log.green(f'{entry.metadata["crc"]["computed"]:x}'))
|
||||
|
||||
# Entry type check
|
||||
if entry.metadata['type'] not in [const.item_type[key] for key in const.item_type]:
|
||||
log.info(log.yellow(f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'),
|
||||
f'Type: {entry.metadata["type"]}')
|
||||
|
||||
# Span check
|
||||
if entry.index + entry.metadata['span'] - 1 >= int(const.page_size / const.entry_size) - 2:
|
||||
log.info(log.red(f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'))
|
||||
# Spanned entry state checks
|
||||
elif entry.metadata['span'] > 1:
|
||||
parent_state = entry.state
|
||||
for kid in entry.children:
|
||||
if parent_state != kid.state:
|
||||
log.info(log.yellow(' Inconsistent data state!'),
|
||||
f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
|
||||
f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}')
|
||||
|
||||
# Gather blobs & namespaces
|
||||
if entry.metadata['type'] == 'blob_index':
|
||||
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [empty_entry] * entry.data['chunk_count']
|
||||
elif entry.metadata['type'] == 'blob_data':
|
||||
blob_chunks.append(entry)
|
||||
|
||||
if entry.metadata['namespace'] == 0:
|
||||
found_namespaces[entry.data['value']] = entry.key
|
||||
else:
|
||||
used_namespaces[entry.metadata['namespace']] = None
|
||||
log.info()
|
||||
|
||||
# Assemble blobs
|
||||
for chunk in blob_chunks:
|
||||
parent = blobs.get(f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry])[0]
|
||||
if parent is empty_entry:
|
||||
log.info(log.red(f'Blob {chunk.key} chunk has no blob index!'),
|
||||
f'Namespace index: {chunk.metadata["namespace"]:03d}',
|
||||
f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
|
||||
f'Chunk Index: {chunk.metadata["chunk_index"]:03d}')
|
||||
else:
|
||||
blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
|
||||
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
|
||||
blobs[blob_key][chunk_index + 1] = chunk
|
||||
|
||||
# Check all blobs
|
||||
for blob_key in blobs:
|
||||
blob_index = blobs[blob_key][0]
|
||||
blob_chunks = blobs[blob_key][1:]
|
||||
blob_size = blob_index.data['size']
|
||||
|
||||
# Blob data check
|
||||
for i, chunk in enumerate(blob_chunks):
|
||||
if chunk is empty_entry:
|
||||
log.info(log.red(f'Blob {blob_index.key} is missing a chunk!'),
|
||||
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
|
||||
f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
|
||||
f'Chunk Index: {i:03d}')
|
||||
else:
|
||||
blob_size -= len(chunk.children) * const.entry_size
|
||||
|
||||
if blob_size > 0:
|
||||
log.info(log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
|
||||
f'Namespace index: {blob_index.metadata["namespace"]:03d}')
|
||||
|
||||
# Namespaces
|
||||
for used_ns in used_namespaces:
|
||||
key = found_namespaces.pop(used_ns, '')
|
||||
if key == '':
|
||||
log.info(log.red('Undefined namespace index!'),
|
||||
f'Namespace index: {used_ns:03d}',
|
||||
f'[undefined]')
|
||||
|
||||
for unused_ns in found_namespaces:
|
||||
log.info(log.yellow('Found unused namespace.'),
|
||||
f'Namespace index: {unused_ns:03d}',
|
||||
f'[{found_namespaces[unused_ns]}]')
|
||||
|
||||
|
||||
def program_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description='Parse NVS partition')
|
||||
parser.add_argument('file', help='Path to dumped NVS partition')
|
||||
parser.add_argument('-i', '--integrity-check', action='store_true',
|
||||
help='Check partition for potential errors')
|
||||
tmp = {'all': 'Print everything',
|
||||
'written': 'Print only currently written entries',
|
||||
'minimal': 'Print only namespace:key=value pairs',
|
||||
'namespaces': 'List all written namespaces',
|
||||
'blobs': 'Print all blobs and strings',
|
||||
'storage_info': 'Print storage related information (free/used entries, etc)',
|
||||
'none': 'Do not print anything (if you only want to do integrity check)'}
|
||||
parser.add_argument('-d', '--dump', choices=tmp, default='all', metavar='type',
|
||||
help=f"type={{{str(list(tmp.keys()))[1:-1]}}} : {' ; '.join([f'{opt} - {tmp[opt]}' for opt in tmp])}")
|
||||
parser.add_argument('--color', choices=['never', 'auto', 'always'], default='auto', help='Enable color (ANSI)')
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = program_args()
|
||||
|
||||
if const.entry_size != 32:
|
||||
raise ValueError(f'Entry size is not 32 B! This is currently non negotiable.')
|
||||
|
||||
log.set_color(args.color)
|
||||
|
||||
try:
|
||||
with open(args.file, 'rb') as f:
|
||||
partition = f.read()
|
||||
except IndexError:
|
||||
log.error('No file given')
|
||||
raise
|
||||
except FileNotFoundError:
|
||||
log.error('Bad filename')
|
||||
raise
|
||||
|
||||
nvs = NVS_Partition(args.file.split('/')[-1], bytearray(partition))
|
||||
|
||||
def noop(_:NVS_Partition) -> None:
|
||||
pass
|
||||
|
||||
def not_implemented(_:NVS_Partition) -> None:
|
||||
raise RuntimeError(f'{args.dump} is not implemented')
|
||||
|
||||
cmds = {
|
||||
'all': dump_everything,
|
||||
'written': dump_written_entries,
|
||||
'minimal': dump_key_value_pairs,
|
||||
'namespaces': list_namespaces,
|
||||
'blobs': dump_written_blobs,
|
||||
'storage_info': storage_stats,
|
||||
'none': noop,
|
||||
}
|
||||
cmds.get(args.dump, not_implemented)(nvs) # type: ignore
|
||||
|
||||
if args.integrity_check:
|
||||
log.info()
|
||||
integrity_check(nvs)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except ValueError:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except Constants.ConstantError:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
sys.exit(1)
|
@@ -6,7 +6,7 @@ NVS Partition Parser Utility
|
||||
Introduction
|
||||
------------
|
||||
|
||||
The utility :component_file:`nvs_flash/nvs_partition_parser/nvs_read.py` loads and parses an NVS storage partition for easier debugging and data extraction. The utility also features integrity check which scans the partition for potential errors.
|
||||
The utility :component_file:`nvs_flash/nvs_partition_parser/nvs_tool.py` loads and parses an NVS storage partition for easier debugging and data extraction. The utility also features integrity check which scans the partition for potential errors. Data blobs are encoded in `base64` format.
|
||||
|
||||
Encrypted Partitions
|
||||
--------------------
|
||||
@@ -16,7 +16,11 @@ This utility does not support decryption. To decrypt the NVS partition, please u
|
||||
Usage
|
||||
-----
|
||||
|
||||
The utility provides six different output styles with `-d` or `--dump` option:
|
||||
There are two output format styles available with the `-f` or `--format` option:
|
||||
- `json` - All of the output is printed as a JSON.
|
||||
- `text` - The output is printed as a human-readable text with different selectable output styles mentioned below.
|
||||
|
||||
For the `text` output format, the utility provides six different output styles with the `-d` or `--dump` option:
|
||||
- `all` (default) - Prints all entries with metadata.
|
||||
- `written` - Prints only written entries with metadata.
|
||||
- `minimal` - Prints written `namespace:key = value` pairs.
|
||||
@@ -26,4 +30,4 @@ The utility provides six different output styles with `-d` or `--dump` option:
|
||||
|
||||
.. note:: There is also a `none` option which will not print anything. This can be used with the integrity check option if the NVS partition contents are irrelevant.
|
||||
|
||||
The utility also provides an integrity check feature via the `-i` or `--integrity-check` option. This feature scans through the entire partition and prints potential errors. It can be used with the `-d none` option which will print only the potential errors.
|
||||
The utility also provides an integrity check feature via the `-i` or `--integrity-check` option (available only with the `text` format as it would invalidate the `json` output). This feature scans through the entire partition and prints potential errors. It can be used with the `-d none` option which will print only the potential errors.
|
@@ -6,7 +6,7 @@ NVS 分区解析程序
|
||||
介绍
|
||||
-------
|
||||
|
||||
NVS 分区解析程序 :component_file:`nvs_flash/nvs_partition_parser/nvs_read.py` 加载并解析 NVS 存储分区,以便于调试和数据提取。该程序还支持完整性检查功能,可扫描分区中可能存在的错误。
|
||||
NVS 分区解析程序 :component_file:`nvs_flash/nvs_partition_parser/nvs_read.py` 加载并解析 NVS 存储分区,以便于调试和数据提取。该程序还支持完整性检查功能,可扫描分区中可能存在的错误。Blob 数据以 `base64` 格式进行编码。
|
||||
|
||||
加密分区
|
||||
-----------
|
||||
@@ -16,7 +16,11 @@ NVS 分区解析程序 :component_file:`nvs_flash/nvs_partition_parser/nvs_read.
|
||||
使用方法
|
||||
-----------
|
||||
|
||||
该程序通过 `-d` 或 `-dump` 选项提供了六种不同的输出方式:
|
||||
该程序提供了 `-f` 或 `-format` 选项,对应两种不同的输出格式:
|
||||
- `json` - 所有输出均以 JSON 格式打印。
|
||||
- `text` - 输出以可读文本的格式打印,有以下输出格式可选。
|
||||
|
||||
针对 `text` 输出格式,该程序提供了 `-d` 或 `-dump` 选项,包含六种不同的输出方式:
|
||||
- `all` (默认) - 打印所有带有元数据的条目。
|
||||
- `written` - 只打印带有元数据的写入条目。
|
||||
- `minimal` - 打印写入的 `namespace:key = value` 对。
|
||||
@@ -24,6 +28,6 @@ NVS 分区解析程序 :component_file:`nvs_flash/nvs_partition_parser/nvs_read.
|
||||
- `blobs` - 打印所有 blob 和字符串(若 blob 和字符串是以分块的形式,则对其进行重组)。
|
||||
- `storage_info` - 打印每一页面的条目状态计数。
|
||||
|
||||
.. 注意:: 该程序还提供 `none` 选项,该选项不会打印任何东西。如果 NVS 分区的内容并不相关,可以将该选项和完整性检查选项一起使用。
|
||||
.. 注意:: 该程序还提供 `none` 选项,该选项不会打印任何内容。如果 NVS 分区的内容并不相关,可以将该选项和完整性检查选项一起使用。
|
||||
|
||||
该程序支持完整性检查功能,选择选项 `-i` 或 `--integrity-check` 即可运行。此功能可扫描整个分区,并打印出可能存在的错误。当此功能和 `-d none` 一起使用时,可只打印可能存在的错误。
|
||||
该程序支持完整性检查功能,选择选项 `-i` 或 `--integrity-check` 即可运行(该选项会导致 `json` 输出格式无效,因此只适用于 `text` 格式)。此功能可扫描整个分区,并打印出可能存在的错误。当此功能和 `-d none` 一起使用时,可只打印可能存在的错误。
|
665
components/nvs_flash/nvs_partition_tool/nvs_logger.py
Normal file
665
components/nvs_flash/nvs_partition_tool/nvs_logger.py
Normal file
@@ -0,0 +1,665 @@
|
||||
#!/usr/bin/env python3
|
||||
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
import binascii
|
||||
import json
|
||||
import sys
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
|
||||
|
||||
|
||||
class NVS_Logger:
|
||||
ansi = {
|
||||
'red': '\033[31m',
|
||||
'green': '\033[32m',
|
||||
'yellow': '\033[33m',
|
||||
'blue': '\033[34m',
|
||||
'cyan': '\033[36m',
|
||||
'bold': '\033[1m',
|
||||
'clear': '\033[0m',
|
||||
}
|
||||
|
||||
def __init__(self, *, color: str = 'auto', out_format: str = 'text'):
|
||||
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
|
||||
self.output_format = out_format
|
||||
|
||||
def set_color(self, color: str) -> None:
|
||||
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
|
||||
|
||||
def set_format(self, out_format: str) -> None:
|
||||
self.output_format = out_format
|
||||
|
||||
def info(self, *args, **kwargs) -> None: # type: ignore
|
||||
kwargs['file'] = kwargs.get(
|
||||
'file', sys.stdout
|
||||
) # Set default output to be stdout, but can be overwritten
|
||||
print(*args, **kwargs)
|
||||
|
||||
def error(self, *args, **kwargs) -> None: # type: ignore
|
||||
kwargs['file'] = kwargs.get(
|
||||
'file', sys.stderr
|
||||
) # Set default output to be stderr, but can be overwritten
|
||||
print(*args, **kwargs)
|
||||
|
||||
def red(self, text: str) -> str:
|
||||
if self.color:
|
||||
return NVS_Logger.ansi['red'] + text + NVS_Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def green(self, text: str) -> str:
|
||||
if self.color:
|
||||
return NVS_Logger.ansi['green'] + text + NVS_Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def yellow(self, text: str) -> str:
|
||||
if self.color:
|
||||
return NVS_Logger.ansi['yellow'] + text + NVS_Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def blue(self, text: str) -> str:
|
||||
if self.color:
|
||||
return NVS_Logger.ansi['blue'] + text + NVS_Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def cyan(self, text: str) -> str:
|
||||
if self.color:
|
||||
return NVS_Logger.ansi['cyan'] + text + NVS_Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
def bold(self, text: str) -> str:
|
||||
if self.color:
|
||||
return NVS_Logger.ansi['bold'] + text + NVS_Logger.ansi['clear']
|
||||
return text
|
||||
|
||||
|
||||
nvs_log = NVS_Logger()
|
||||
|
||||
|
||||
def storage_stats(nvs_partition: NVS_Partition) -> None:
|
||||
global_stats = {
|
||||
'written_entries': 0,
|
||||
'free_entries': 0,
|
||||
'erased_entries': 0,
|
||||
'invalid_entries': 0,
|
||||
}
|
||||
|
||||
for page in nvs_partition.pages:
|
||||
written_e = 0
|
||||
free_e = 0
|
||||
erased_e = 0
|
||||
invalid_e = 0
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written':
|
||||
written_e += 1
|
||||
elif entry.state == 'Empty':
|
||||
free_e += 1
|
||||
elif entry.state == 'Erased':
|
||||
erased_e += 1
|
||||
else:
|
||||
invalid_e += 1
|
||||
|
||||
nvs_log.info(nvs_log.bold(f'Page {page.header["status"]}'))
|
||||
nvs_log.info(' Found entries:')
|
||||
nvs_log.info(f' Written: {written_e: 5d}')
|
||||
nvs_log.info(f' Erased: {erased_e: 5d}')
|
||||
nvs_log.info(f' Empty: {free_e: 5d}')
|
||||
nvs_log.info(f' Invalid: {invalid_e: 5d}')
|
||||
nvs_log.info(f' Total: {written_e + free_e + erased_e + invalid_e: 5d}')
|
||||
nvs_log.info()
|
||||
global_stats['written_entries'] += written_e
|
||||
global_stats['erased_entries'] += erased_e
|
||||
global_stats['free_entries'] += free_e
|
||||
global_stats['invalid_entries'] += invalid_e
|
||||
|
||||
nvs_log.info(nvs_log.bold('Global'))
|
||||
nvs_log.info(' Config:')
|
||||
nvs_log.info(f' Page size: {nvs_const.page_size: 5d}')
|
||||
nvs_log.info(f' Entry size: {nvs_const.entry_size: 5d}')
|
||||
nvs_log.info(f' Total pages: {len(nvs_partition.pages): 5d}')
|
||||
nvs_log.info(' Entries:')
|
||||
nvs_log.info(f' Written: {global_stats["written_entries"]: 5d}')
|
||||
nvs_log.info(f' Erased: {global_stats["erased_entries"]: 5d}')
|
||||
nvs_log.info(f' Empty: {global_stats["free_entries"]: 5d}')
|
||||
nvs_log.info(f' Invalid: {global_stats["invalid_entries"]: 5d}')
|
||||
nvs_log.info(f' Total: {sum([global_stats[key] for key in global_stats]): 5d}')
|
||||
nvs_log.info()
|
||||
|
||||
|
||||
def dump_everything(nvs_partition: NVS_Partition, written_only: bool = False) -> None:
|
||||
for page in nvs_partition.pages:
|
||||
# Print page header
|
||||
if page.is_empty:
|
||||
nvs_log.info(
|
||||
nvs_log.bold(f'Page Empty, Page address: 0x{page.start_address:x}')
|
||||
)
|
||||
else:
|
||||
if (
|
||||
page.header['crc']['original'] == page.header['crc']['computed']
|
||||
): # Color CRC32
|
||||
crc = nvs_log.green(f'{page.header["crc"]["original"]: >8x}')
|
||||
else:
|
||||
crc = nvs_log.red(f'{page.header["crc"]["original"]: >8x}')
|
||||
nvs_log.info(
|
||||
nvs_log.bold(
|
||||
f'Page no. {page.header["page_index"]}'
|
||||
+ f', Status: {page.header["status"]}'
|
||||
+ f', Version: {page.header["version"]}'
|
||||
+ f', CRC32: {crc}'
|
||||
)
|
||||
+ nvs_log.bold(f', Page address: 0x{page.start_address:x}')
|
||||
)
|
||||
|
||||
nvs_log.info(nvs_log.bold(' Entry state bitmap: '), end='')
|
||||
for x in page.raw_entry_state_bitmap:
|
||||
nvs_log.info(f'{x:02x} ', end='')
|
||||
nvs_log.info()
|
||||
|
||||
# Dump entries
|
||||
empty_entries = []
|
||||
for entry in page.entries:
|
||||
# Skip non-written entries if needed
|
||||
if written_only and not entry.state == 'Written':
|
||||
continue
|
||||
|
||||
# Compress all empty entries
|
||||
if (
|
||||
entry.state == 'Empty' and entry.is_empty
|
||||
): # Gather all subsequent empty entries
|
||||
empty_entries.append(entry)
|
||||
continue
|
||||
else:
|
||||
# Print the empty entries
|
||||
if len(empty_entries) >= 3: # There is enough entries to compress
|
||||
nvs_log.info(
|
||||
nvs_log.bold(f' {empty_entries[0].index:03d}.'), 'Empty'
|
||||
)
|
||||
nvs_log.info(nvs_log.bold(' ...'))
|
||||
nvs_log.info(
|
||||
nvs_log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty'
|
||||
)
|
||||
else: # No need for compression
|
||||
for e in empty_entries:
|
||||
nvs_log.info(nvs_log.bold(f' {e.index:03d}.'), 'Empty')
|
||||
empty_entries.clear()
|
||||
|
||||
# Dump a single entry
|
||||
status = entry.state
|
||||
if status == 'Written':
|
||||
status = nvs_log.green(f'{status: <7}')
|
||||
elif status == 'Erased':
|
||||
status = nvs_log.red(f'{status: <7}')
|
||||
|
||||
crc = ''
|
||||
if (
|
||||
entry.metadata['crc']['original'] == entry.metadata['crc']['computed']
|
||||
): # Color CRC32
|
||||
crc = nvs_log.green(f'{entry.metadata["crc"]["original"]: >8x}')
|
||||
else:
|
||||
crc = nvs_log.red(f'{entry.metadata["crc"]["original"]: >8x}')
|
||||
|
||||
nvs_log.info(
|
||||
nvs_log.bold(f' {entry.index:03d}.')
|
||||
+ ' '
|
||||
+ status
|
||||
+ f', Namespace Index: {entry.metadata["namespace"]:03d}'
|
||||
+ f', Type: {entry.metadata["type"]:<10}'
|
||||
+ f', Span: {entry.metadata["span"]:03d}'
|
||||
+ f', Chunk Index: {entry.metadata["chunk_index"]:03d}'
|
||||
+ f', CRC32: {crc}'
|
||||
+ f' | {entry.key} : ',
|
||||
end='',
|
||||
)
|
||||
|
||||
if entry.metadata['type'] not in [
|
||||
'string',
|
||||
'blob_data',
|
||||
'blob_index',
|
||||
'blob',
|
||||
]: # Entry is non-variable length
|
||||
nvs_log.info(entry.data['value'])
|
||||
else:
|
||||
if entry.metadata['type'] == 'blob_index':
|
||||
nvs_log.info(
|
||||
f'Size={entry.data["size"]}'
|
||||
+ f', ChunkCount={entry.data["chunk_count"]}'
|
||||
+ f', ChunkStart={entry.data["chunk_start"]}'
|
||||
)
|
||||
else:
|
||||
if (
|
||||
entry.metadata['crc']['data_original']
|
||||
== entry.metadata['crc']['data_computed']
|
||||
): # Color CRC32
|
||||
crc = nvs_log.green(
|
||||
f'{entry.metadata["crc"]["data_original"]:x}'
|
||||
)
|
||||
else:
|
||||
crc = nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}')
|
||||
nvs_log.info(f'Size={entry.data["size"]}, CRC32={crc}')
|
||||
|
||||
# Dump all children entries
|
||||
if entry.metadata['span'] != 1:
|
||||
for i, data in enumerate(entry.children):
|
||||
nvs_log.info(
|
||||
f'{"": >6}0x{(i*nvs_const.entry_size):03x} {data.dump_raw()}'
|
||||
)
|
||||
|
||||
# Dump trailing empty entries
|
||||
if len(empty_entries) >= 3:
|
||||
nvs_log.info(nvs_log.bold(f' {empty_entries[0].index:03d}.'), 'Empty')
|
||||
nvs_log.info(nvs_log.bold(' ...'))
|
||||
nvs_log.info(nvs_log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty')
|
||||
else:
|
||||
for e in empty_entries:
|
||||
nvs_log.info(nvs_log.bold(f' {e.index:03d}.'), 'Empty')
|
||||
empty_entries.clear()
|
||||
nvs_log.info()
|
||||
|
||||
|
||||
def dump_written_entries(nvs_partition: NVS_Partition) -> None:
|
||||
dump_everything(nvs_partition, True)
|
||||
|
||||
|
||||
def list_namespaces(nvs_partition: NVS_Partition) -> None:
|
||||
# Gather namespaces
|
||||
ns = {}
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
|
||||
ns[entry.data['value']] = entry.key
|
||||
|
||||
# Print found namespaces
|
||||
nvs_log.info(nvs_log.bold(f'Index : Namespace'))
|
||||
for ns_index in sorted(ns):
|
||||
nvs_log.info(f' {ns_index:03d} :', nvs_log.cyan(ns[ns_index]))
|
||||
|
||||
|
||||
def dump_key_value_pairs(nvs_partition: NVS_Partition) -> None:
|
||||
# Get namespace list
|
||||
ns = {}
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
|
||||
ns[entry.data['value']] = entry.key
|
||||
|
||||
# Print key-value pairs
|
||||
for page in nvs_partition.pages:
|
||||
# Print page header
|
||||
if page.is_empty:
|
||||
nvs_log.info(nvs_log.bold('Page Empty'))
|
||||
else:
|
||||
nvs_log.info(
|
||||
nvs_log.bold(
|
||||
f'Page no. {page.header["page_index"]}'
|
||||
+ f', Status: {page.header["status"]}'
|
||||
)
|
||||
)
|
||||
|
||||
# Print entries
|
||||
for entry in page.entries:
|
||||
if (
|
||||
entry.state == 'Written' and entry.metadata['namespace'] != 0
|
||||
): # Ignore non-written entries
|
||||
chunk_index = ''
|
||||
data = ''
|
||||
if entry.metadata['type'] not in [
|
||||
'string',
|
||||
'blob_data',
|
||||
'blob_index',
|
||||
'blob',
|
||||
]: # Non-variable length entry
|
||||
data = entry.data['value']
|
||||
elif entry.metadata['type'] == 'blob_index':
|
||||
continue
|
||||
else: # Variable length entries
|
||||
tmp = b''
|
||||
for e in entry.children: # Merge all children entries
|
||||
tmp += bytes(e.raw)
|
||||
tmp = tmp[: entry.data['size']] # Discard padding
|
||||
if entry.metadata['type'] == 'blob_data':
|
||||
if entry.metadata['chunk_index'] >= 128: # Get real chunk index
|
||||
chunk_index = f'[{entry.metadata["chunk_index"] - 128}]'
|
||||
else:
|
||||
chunk_index = f'[{entry.metadata["chunk_index"]}]'
|
||||
data = str(tmp)
|
||||
|
||||
if entry.metadata['namespace'] not in ns:
|
||||
continue
|
||||
else:
|
||||
nvs_log.info(
|
||||
' '
|
||||
+ nvs_log.cyan(ns[entry.metadata['namespace']])
|
||||
+ ':'
|
||||
+ nvs_log.yellow(entry.key)
|
||||
+ f'{chunk_index} = {data}'
|
||||
)
|
||||
nvs_log.info()
|
||||
|
||||
|
||||
def dump_written_blobs(nvs_partition: NVS_Partition) -> None:
|
||||
blobs: Dict = {}
|
||||
strings: List[NVS_Entry] = []
|
||||
legacy_blobs: List[NVS_Entry] = []
|
||||
ns = {}
|
||||
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
|
||||
|
||||
# Gather namespaces, blob indexes and legacy blobs
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
if entry.state == 'Written':
|
||||
if entry.metadata['type'] == 'blob_index':
|
||||
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
|
||||
empty_entry
|
||||
] * entry.data['chunk_count']
|
||||
elif entry.metadata['type'] == 'blob':
|
||||
legacy_blobs.append(entry)
|
||||
elif entry.metadata['type'] == 'string':
|
||||
strings.append(entry)
|
||||
elif entry.metadata['namespace'] == 0:
|
||||
ns[entry.data['value']] = entry.key
|
||||
|
||||
# Dump blobs
|
||||
for key in blobs:
|
||||
for page in nvs_partition.pages:
|
||||
for entry in page.entries:
|
||||
# Gather all blob chunks
|
||||
if (
|
||||
entry.state == 'Written'
|
||||
and entry.metadata['type'] != 'blob_index'
|
||||
and entry.metadata['namespace']
|
||||
== blobs[key][0].metadata['namespace']
|
||||
and entry.key == blobs[key][0].key
|
||||
):
|
||||
blobs[key][
|
||||
1
|
||||
+ entry.metadata['chunk_index']
|
||||
- blobs[key][0].data['chunk_start']
|
||||
] = entry
|
||||
|
||||
blob_index = blobs[key][0]
|
||||
blob_chunks = blobs[key][1:]
|
||||
|
||||
# Print blob info
|
||||
nvs_log.info(
|
||||
nvs_log.cyan(
|
||||
ns.get(
|
||||
blob_index.metadata['namespace'], blob_index.metadata['namespace']
|
||||
)
|
||||
)
|
||||
+ ':'
|
||||
+ nvs_log.yellow(blob_index.key)
|
||||
+ ' - '
|
||||
+ f'Type: Blob (Version 2), '
|
||||
+ f'Size: {blob_index.data["size"]}'
|
||||
)
|
||||
|
||||
# Print blob data
|
||||
raw_entries = []
|
||||
for kid in blob_chunks: # Gather all chunk entries
|
||||
if kid is empty_entry:
|
||||
raw_entries += [empty_entry]
|
||||
else:
|
||||
raw_entries += kid.children
|
||||
|
||||
for i, entry in enumerate(raw_entries):
|
||||
if entry is empty_entry:
|
||||
nvs_log.info(nvs_log.yellow(f' {"":->63} Missing data {"":-<64}'))
|
||||
else:
|
||||
nvs_log.info(
|
||||
f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}'
|
||||
)
|
||||
nvs_log.info()
|
||||
|
||||
# Dump strings
|
||||
for string in strings:
|
||||
nvs_log.info(
|
||||
nvs_log.cyan(
|
||||
ns.get(string.metadata['namespace'], string.metadata['namespace'])
|
||||
)
|
||||
+ ':'
|
||||
+ nvs_log.yellow(string.key)
|
||||
+ ' - '
|
||||
+ 'Type: String, '
|
||||
+ f'Size: {string.data["size"]}'
|
||||
)
|
||||
for i, entry in enumerate(string.children):
|
||||
nvs_log.info(f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}')
|
||||
nvs_log.info()
|
||||
|
||||
# Dump legacy blobs
|
||||
for blob in legacy_blobs:
|
||||
nvs_log.info(
|
||||
nvs_log.cyan(ns.get(blob.metadata['namespace'], blob.metadata['namespace']))
|
||||
+ ':'
|
||||
+ nvs_log.yellow(blob.key)
|
||||
+ ' - '
|
||||
+ 'Type: Blob (Version 1), '
|
||||
+ f'Size: {blob.data["size"]}'
|
||||
)
|
||||
for i, entry in enumerate(blob.children):
|
||||
nvs_log.info(f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}')
|
||||
nvs_log.info()
|
||||
|
||||
|
||||
def integrity_check(nvs_partition: NVS_Partition) -> None:
|
||||
used_namespaces: Dict[int, None] = {}
|
||||
found_namespaces: Dict[int, str] = {}
|
||||
blobs: Dict = {}
|
||||
blob_chunks: List[NVS_Entry] = []
|
||||
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
|
||||
|
||||
# Partition size
|
||||
if len(nvs_partition.pages) < 3:
|
||||
nvs_log.info(
|
||||
nvs_log.yellow(
|
||||
'Partition has to have at least 3 pages to function properly!'
|
||||
)
|
||||
)
|
||||
|
||||
# Check if there's any free/empty page
|
||||
if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
|
||||
nvs_log.info(
|
||||
nvs_log.red(
|
||||
'There are no free (empty) pages in the partition, there needs to be at least one free page!'
|
||||
)
|
||||
)
|
||||
nvs_log.info(nvs_log.red('Has the NVS partition been truncated?\n'))
|
||||
|
||||
for page in nvs_partition.pages:
|
||||
# Print page header
|
||||
if page.header['status'] == 'Empty':
|
||||
nvs_log.info(nvs_log.cyan('Page Empty'))
|
||||
|
||||
# Check if page is truly empty
|
||||
if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
|
||||
nvs_log.info(
|
||||
nvs_log.red(
|
||||
'Page is reported as empty but entry state bitmap is not empty!'
|
||||
)
|
||||
)
|
||||
if any([not e.is_empty for e in page.entries]):
|
||||
nvs_log.info(
|
||||
nvs_log.red('Page is reported as empty but there are data written!')
|
||||
)
|
||||
else:
|
||||
# Check page header CRC32
|
||||
if page.header['crc']['original'] == page.header['crc']['computed']:
|
||||
nvs_log.info(
|
||||
nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK'
|
||||
)
|
||||
else:
|
||||
nvs_log.info(
|
||||
nvs_log.cyan(f'Page no. {page.header["page_index"]}'),
|
||||
f'Original CRC32:',
|
||||
nvs_log.red(f'{page.header["crc"]["original"]:x}'),
|
||||
f'Generated CRC32:',
|
||||
nvs_log.green(f'{page.header["crc"]["computed"]:x}'),
|
||||
)
|
||||
|
||||
# Check all entries
|
||||
for entry in page.entries:
|
||||
# Entry state check
|
||||
if entry.is_empty:
|
||||
if entry.state == 'Written':
|
||||
nvs_log.info(
|
||||
nvs_log.red(
|
||||
f' Entry #{entry.index:03d} is reported as written but is empty!'
|
||||
)
|
||||
)
|
||||
continue
|
||||
elif entry.state == 'Erased':
|
||||
nvs_log.info(
|
||||
nvs_log.yellow(
|
||||
f' Entry #{entry.index:03d} is reported as erased but is empty!'
|
||||
)
|
||||
)
|
||||
|
||||
if entry.state == 'Written':
|
||||
# Entry CRC32 check
|
||||
if (
|
||||
entry.metadata['crc']['original']
|
||||
!= entry.metadata['crc']['computed']
|
||||
):
|
||||
nvs_log.info(
|
||||
nvs_log.red(
|
||||
f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
|
||||
),
|
||||
f'Written:',
|
||||
nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
|
||||
f'Generated:',
|
||||
nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
|
||||
)
|
||||
|
||||
# Entry children CRC32 check
|
||||
if (
|
||||
entry.metadata['span'] > 1
|
||||
and entry.metadata['crc']['data_original']
|
||||
!= entry.metadata['crc']['data_computed']
|
||||
):
|
||||
nvs_log.info(
|
||||
nvs_log.red(
|
||||
f' Entry #{entry.index:03d} {entry.key} data has wrong CRC32!'
|
||||
),
|
||||
f'Written:',
|
||||
nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
|
||||
f'Generated:',
|
||||
nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
|
||||
)
|
||||
|
||||
# Entry type check
|
||||
if entry.metadata['type'] not in [
|
||||
nvs_const.item_type[key] for key in nvs_const.item_type
|
||||
]:
|
||||
nvs_log.info(
|
||||
nvs_log.yellow(
|
||||
f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
|
||||
),
|
||||
f'Type: {entry.metadata["type"]}',
|
||||
)
|
||||
|
||||
# Span check
|
||||
if (
|
||||
entry.index + entry.metadata['span'] - 1
|
||||
>= int(nvs_const.page_size / nvs_const.entry_size) - 2
|
||||
):
|
||||
nvs_log.info(
|
||||
nvs_log.red(
|
||||
f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
|
||||
)
|
||||
)
|
||||
# Spanned entry state checks
|
||||
elif entry.metadata['span'] > 1:
|
||||
parent_state = entry.state
|
||||
for kid in entry.children:
|
||||
if parent_state != kid.state:
|
||||
nvs_log.info(
|
||||
nvs_log.yellow(' Inconsistent data state!'),
|
||||
f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
|
||||
f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
|
||||
)
|
||||
|
||||
# Gather blobs & namespaces
|
||||
if entry.metadata['type'] == 'blob_index':
|
||||
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
|
||||
empty_entry
|
||||
] * entry.data['chunk_count']
|
||||
elif entry.metadata['type'] == 'blob_data':
|
||||
blob_chunks.append(entry)
|
||||
|
||||
if entry.metadata['namespace'] == 0:
|
||||
found_namespaces[entry.data['value']] = entry.key
|
||||
else:
|
||||
used_namespaces[entry.metadata['namespace']] = None
|
||||
nvs_log.info()
|
||||
|
||||
# Assemble blobs
|
||||
for chunk in blob_chunks:
|
||||
parent = blobs.get(
|
||||
f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry]
|
||||
)[0]
|
||||
if parent is empty_entry:
|
||||
nvs_log.info(
|
||||
nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
|
||||
f'Namespace index: {chunk.metadata["namespace"]:03d}',
|
||||
f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
|
||||
f'Chunk Index: {chunk.metadata["chunk_index"]:03d}',
|
||||
)
|
||||
else:
|
||||
blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
|
||||
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
|
||||
blobs[blob_key][chunk_index + 1] = chunk
|
||||
|
||||
# Check all blobs
|
||||
for blob_key in blobs:
|
||||
blob_index = blobs[blob_key][0]
|
||||
blob_chunks = blobs[blob_key][1:]
|
||||
blob_size = blob_index.data['size']
|
||||
|
||||
# Blob data check
|
||||
for i, chunk in enumerate(blob_chunks):
|
||||
if chunk is empty_entry:
|
||||
nvs_log.info(
|
||||
nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
|
||||
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
|
||||
f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
|
||||
f'Chunk Index: {i:03d}',
|
||||
)
|
||||
else:
|
||||
blob_size -= len(chunk.children) * nvs_const.entry_size
|
||||
|
||||
if blob_size > 0:
|
||||
nvs_log.info(
|
||||
nvs_log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
|
||||
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
|
||||
)
|
||||
|
||||
# Namespaces
|
||||
for used_ns in used_namespaces:
|
||||
key = found_namespaces.pop(used_ns, '')
|
||||
if key == '':
|
||||
nvs_log.info(
|
||||
nvs_log.red('Undefined namespace index!'),
|
||||
f'Namespace index: {used_ns:03d}',
|
||||
f'[undefined]',
|
||||
)
|
||||
|
||||
for unused_ns in found_namespaces:
|
||||
nvs_log.info(
|
||||
nvs_log.yellow('Found unused namespace.'),
|
||||
f'Namespace index: {unused_ns:03d}',
|
||||
f'[{found_namespaces[unused_ns]}]',
|
||||
)
|
||||
|
||||
|
||||
def print_json(nvs: NVS_Partition) -> None:
|
||||
class NVSEncoder(json.JSONEncoder):
|
||||
def default(self, obj: Any) -> Union[Any, Dict[str, Any], str]:
|
||||
if hasattr(obj, 'toJSON'):
|
||||
return obj.toJSON()
|
||||
if isinstance(obj, bytearray):
|
||||
return binascii.b2a_base64(obj, newline=False).decode(
|
||||
'ascii'
|
||||
) # Binary to Base64 ASCII representation
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
|
||||
print(json.dumps(nvs.toJSON(), cls=NVSEncoder, indent=2))
|
285
components/nvs_flash/nvs_partition_tool/nvs_parser.py
Normal file
285
components/nvs_flash/nvs_partition_tool/nvs_parser.py
Normal file
@@ -0,0 +1,285 @@
|
||||
#!/usr/bin/env python3
|
||||
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
from typing import Any, Dict, List, Optional
|
||||
from zlib import crc32
|
||||
|
||||
|
||||
# Constants
|
||||
class NVS_Constants:
|
||||
class ConstantError(AttributeError):
|
||||
pass
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.page_size = 4096
|
||||
self.entry_size = 32
|
||||
self.item_type = {
|
||||
0x01: 'uint8_t',
|
||||
0x11: 'int8_t',
|
||||
0x02: 'uint16_t',
|
||||
0x12: 'int16_t',
|
||||
0x04: 'uint32_t',
|
||||
0x14: 'int32_t',
|
||||
0x08: 'uint64_t',
|
||||
0x18: 'int64_t',
|
||||
0x21: 'string',
|
||||
0x41: 'blob',
|
||||
0x42: 'blob_data',
|
||||
0x48: 'blob_index',
|
||||
}
|
||||
self.page_status = {
|
||||
0xFFFFFFFF: 'Empty',
|
||||
0xFFFFFFFE: 'Active',
|
||||
0xFFFFFFFC: 'Full',
|
||||
0xFFFFFFF8: 'Erasing',
|
||||
0x00000000: 'Corrupted',
|
||||
}
|
||||
self.entry_status = {
|
||||
0b11: 'Empty',
|
||||
0b10: 'Written',
|
||||
0b00: 'Erased',
|
||||
}
|
||||
|
||||
def __setattr__(self, key: str, val: Any) -> None:
|
||||
if self.__dict__.get(key, None) is None:
|
||||
self.__dict__[key] = val
|
||||
else:
|
||||
raise NVS_Constants.ConstantError('Cannot change a constant!')
|
||||
|
||||
|
||||
nvs_const = NVS_Constants()
|
||||
|
||||
|
||||
class NotAlignedError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class NVS_Partition:
|
||||
def __init__(self, name: str, raw_data: bytearray):
|
||||
if len(raw_data) % nvs_const.page_size != 0:
|
||||
raise NotAlignedError(
|
||||
f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})'
|
||||
)
|
||||
|
||||
# Divide partition into pages
|
||||
self.name = name
|
||||
self.pages = []
|
||||
for i in range(0, len(raw_data), nvs_const.page_size):
|
||||
self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i))
|
||||
|
||||
def toJSON(self) -> Dict[str, Any]:
|
||||
return dict(name=self.name, pages=self.pages)
|
||||
|
||||
|
||||
class NVS_Page:
|
||||
def __init__(self, page_data: bytearray, address: int):
|
||||
if len(page_data) != nvs_const.page_size:
|
||||
raise NotAlignedError(
|
||||
f'Size of given page does not match page size ({len(page_data)} != {nvs_const.page_size})'
|
||||
)
|
||||
|
||||
# Initialize class
|
||||
self.is_empty = (
|
||||
page_data[0: nvs_const.entry_size]
|
||||
== bytearray({0xFF}) * nvs_const.entry_size
|
||||
)
|
||||
self.start_address = address
|
||||
self.raw_header = page_data[0: nvs_const.entry_size]
|
||||
self.raw_entry_state_bitmap = page_data[
|
||||
nvs_const.entry_size: 2 * nvs_const.entry_size
|
||||
]
|
||||
self.entries = []
|
||||
|
||||
# Load header
|
||||
self.header: Dict[str, Any] = {
|
||||
'status': nvs_const.page_status.get(
|
||||
int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'
|
||||
),
|
||||
'page_index': int.from_bytes(page_data[4:8], byteorder='little'),
|
||||
'version': 256 - page_data[8],
|
||||
'crc': {
|
||||
'original': int.from_bytes(page_data[28:32], byteorder='little'),
|
||||
'computed': crc32(page_data[4:28], 0xFFFFFFFF),
|
||||
},
|
||||
}
|
||||
|
||||
# Load entry state bitmap
|
||||
entry_states = []
|
||||
for c in self.raw_entry_state_bitmap:
|
||||
for index in range(0, 8, 2):
|
||||
entry_states.append(
|
||||
nvs_const.entry_status.get((c >> index) & 3, 'Invalid')
|
||||
)
|
||||
entry_states = entry_states[:-2]
|
||||
|
||||
# Load entries
|
||||
i = 2
|
||||
while i < int(
|
||||
nvs_const.page_size / nvs_const.entry_size
|
||||
): # Loop through every entry
|
||||
span = page_data[(i * nvs_const.entry_size) + 2]
|
||||
if span in [0xFF, 0]: # 'Default' span length to prevent span overflow
|
||||
span = 1
|
||||
|
||||
# Load an entry
|
||||
entry = NVS_Entry(
|
||||
i - 2,
|
||||
page_data[i * nvs_const.entry_size: (i + 1) * nvs_const.entry_size],
|
||||
entry_states[i - 2],
|
||||
)
|
||||
self.entries.append(entry)
|
||||
|
||||
# Load all children entries
|
||||
if span != 1:
|
||||
for span_idx in range(1, span):
|
||||
page_addr = i + span_idx
|
||||
entry_idx = page_addr - 2
|
||||
if page_addr * nvs_const.entry_size >= nvs_const.page_size:
|
||||
break
|
||||
child_entry = NVS_Entry(
|
||||
entry_idx,
|
||||
page_data[
|
||||
page_addr
|
||||
* nvs_const.entry_size: (page_addr + 1)
|
||||
* nvs_const.entry_size
|
||||
],
|
||||
entry_states[entry_idx],
|
||||
)
|
||||
entry.child_assign(child_entry)
|
||||
entry.compute_crc()
|
||||
i += span
|
||||
|
||||
def toJSON(self) -> Dict[str, Any]:
|
||||
return dict(
|
||||
is_empty=self.is_empty,
|
||||
start_address=self.start_address,
|
||||
raw_header=self.raw_header,
|
||||
raw_entry_state_bitmap=self.raw_entry_state_bitmap,
|
||||
header=self.header,
|
||||
entries=self.entries,
|
||||
)
|
||||
|
||||
|
||||
class NVS_Entry:
|
||||
def __init__(self, index: int, entry_data: bytearray, entry_state: str):
|
||||
if len(entry_data) != nvs_const.entry_size:
|
||||
raise NotAlignedError(
|
||||
f'Given entry is not aligned to entry size ({len(entry_data)} % {nvs_const.entry_size} = {len(entry_data)%nvs_const.entry_size})'
|
||||
)
|
||||
|
||||
def item_convert(i_type: int, data: bytearray) -> Dict:
|
||||
byte_size_mask = 0x0F
|
||||
number_sign_mask = 0xF0
|
||||
fixed_entry_length_threshold = (
|
||||
0x20 # Fixed length entry type number is always smaller than this
|
||||
)
|
||||
if i_type in nvs_const.item_type:
|
||||
# Deal with non variable length entries
|
||||
if i_type < fixed_entry_length_threshold:
|
||||
size = i_type & byte_size_mask
|
||||
num = int.from_bytes(
|
||||
data[:size],
|
||||
byteorder='little',
|
||||
signed=bool(i_type & number_sign_mask),
|
||||
)
|
||||
return {'value': num}
|
||||
|
||||
# Deal with variable length entries
|
||||
if nvs_const.item_type[i_type] in ['string', 'blob_data', 'blob']:
|
||||
size = int.from_bytes(data[:2], byteorder='little')
|
||||
crc = int.from_bytes(data[4:8], byteorder='little')
|
||||
return {'value': [size, crc], 'size': size, 'crc': crc}
|
||||
if nvs_const.item_type[i_type] == 'blob_index':
|
||||
size = int.from_bytes(data[:4], byteorder='little')
|
||||
chunk_count = data[4]
|
||||
chunk_start = data[5]
|
||||
return {
|
||||
'value': [size, chunk_count, chunk_start],
|
||||
'size': size,
|
||||
'chunk_count': chunk_count,
|
||||
'chunk_start': chunk_start,
|
||||
}
|
||||
|
||||
return {'value': None}
|
||||
|
||||
def key_decode(data: bytearray) -> Optional[str]:
|
||||
decoded = ''
|
||||
for n in data.rstrip(b'\x00'):
|
||||
char = chr(n)
|
||||
if char.isascii():
|
||||
decoded += char
|
||||
else:
|
||||
return None
|
||||
return decoded
|
||||
|
||||
self.raw = entry_data
|
||||
self.state = entry_state
|
||||
self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size
|
||||
self.index = index
|
||||
|
||||
namespace = self.raw[0]
|
||||
entry_type = self.raw[1]
|
||||
span = self.raw[2]
|
||||
chunk_index = self.raw[3]
|
||||
crc = self.raw[4:8]
|
||||
key = self.raw[8:24]
|
||||
data = self.raw[24:32]
|
||||
raw_without_crc = self.raw[:4] + self.raw[8:32]
|
||||
self.metadata: Dict[str, Any] = {
|
||||
'namespace': namespace,
|
||||
'type': nvs_const.item_type.get(entry_type, f'0x{entry_type:02x}'),
|
||||
'span': span,
|
||||
'chunk_index': chunk_index,
|
||||
'crc': {
|
||||
'original': int.from_bytes(crc, byteorder='little'),
|
||||
'computed': crc32(raw_without_crc, 0xFFFFFFFF),
|
||||
'data_original': int.from_bytes(data[-4:], byteorder='little'),
|
||||
'data_computed': 0,
|
||||
},
|
||||
}
|
||||
self.children: List['NVS_Entry'] = []
|
||||
self.key = key_decode(key)
|
||||
if self.key is None:
|
||||
self.data = None
|
||||
else:
|
||||
self.data = item_convert(entry_type, data)
|
||||
|
||||
def dump_raw(self) -> str:
|
||||
hex_bytes = ''
|
||||
decoded = ''
|
||||
for i, c in enumerate(self.raw):
|
||||
middle_index = int(len(self.raw) / 2)
|
||||
if i == middle_index: # Add a space in the middle
|
||||
hex_bytes += ' '
|
||||
decoded += ' '
|
||||
hex_bytes += f'{c:02x} '
|
||||
decoded += chr(c) if chr(c).isprintable() else '.'
|
||||
return hex_bytes + ' ' + decoded
|
||||
|
||||
def child_assign(self, entry: 'NVS_Entry') -> None:
|
||||
if not isinstance(entry, type(self)):
|
||||
raise ValueError('You can assign only NVS_Entry')
|
||||
self.children.append(entry)
|
||||
|
||||
def compute_crc(self) -> None:
|
||||
if self.metadata['span'] == 1:
|
||||
return
|
||||
# Merge entries into one buffer
|
||||
children_data = bytearray()
|
||||
for entry in self.children:
|
||||
children_data += entry.raw
|
||||
if self.data:
|
||||
children_data = children_data[: self.data['size']] # Discard padding
|
||||
self.metadata['crc']['data_computed'] = crc32(children_data, 0xFFFFFFFF)
|
||||
|
||||
def toJSON(self) -> Dict[str, Any]:
|
||||
return dict(
|
||||
raw=self.raw,
|
||||
state=self.state,
|
||||
is_empty=self.is_empty,
|
||||
index=self.index,
|
||||
metadata=self.metadata,
|
||||
children=self.children,
|
||||
key=self.key,
|
||||
data=self.data,
|
||||
)
|
123
components/nvs_flash/nvs_partition_tool/nvs_tool.py
Normal file
123
components/nvs_flash/nvs_partition_tool/nvs_tool.py
Normal file
@@ -0,0 +1,123 @@
|
||||
#!/usr/bin/env python3
|
||||
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import nvs_logger
|
||||
import nvs_parser
|
||||
from nvs_logger import nvs_log
|
||||
from nvs_parser import nvs_const
|
||||
|
||||
|
||||
def program_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Parse NVS partition', formatter_class=argparse.RawTextHelpFormatter
|
||||
)
|
||||
parser.add_argument('file', help='Path to dumped NVS partition')
|
||||
parser.add_argument(
|
||||
'-i',
|
||||
'--integrity-check',
|
||||
action='store_true',
|
||||
help='check partition for potential errors',
|
||||
)
|
||||
tmp = {
|
||||
'all': 'print everything',
|
||||
'written': 'print only currently written entries',
|
||||
'minimal': 'print only namespace:key=value pairs',
|
||||
'namespaces': 'list all written namespaces',
|
||||
'blobs': 'print all blobs and strings',
|
||||
'storage_info': 'print storage related information (free/used entries, etc)',
|
||||
'none': 'do not print anything (if you only want to do integrity check)',
|
||||
}
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--dump',
|
||||
choices=tmp,
|
||||
default='all',
|
||||
metavar='type',
|
||||
help=(
|
||||
f"""type: {str(list(tmp.keys()))[1:-1]}
|
||||
{os.linesep.join([f'{opt} - {tmp[opt]}' for opt in tmp])}"""
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--color',
|
||||
choices=['never', 'auto', 'always'],
|
||||
default='auto',
|
||||
help='Enable color (ANSI)',
|
||||
)
|
||||
tmp = {
|
||||
'text': 'print output as a human-readable text',
|
||||
'json': 'print output as JSON and exit',
|
||||
}
|
||||
parser.add_argument(
|
||||
'-f', '--format', choices=tmp, default='text', help='Output format'
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = program_args()
|
||||
|
||||
if nvs_const.entry_size != 32:
|
||||
raise ValueError(f'Entry size is not 32B! This is currently non negotiable.')
|
||||
|
||||
nvs_log.set_color(args.color)
|
||||
nvs_log.set_format(args.format)
|
||||
|
||||
try:
|
||||
with open(args.file, 'rb') as f:
|
||||
partition = f.read()
|
||||
except IndexError:
|
||||
nvs_log.error('No file given')
|
||||
raise
|
||||
except FileNotFoundError:
|
||||
nvs_log.error('Bad filename')
|
||||
raise
|
||||
|
||||
nvs = nvs_parser.NVS_Partition(args.file.split('/')[-1], bytearray(partition))
|
||||
|
||||
def noop(_: nvs_parser.NVS_Partition) -> None:
|
||||
pass
|
||||
|
||||
def format_not_implemented(_: nvs_parser.NVS_Partition) -> None:
|
||||
raise RuntimeError(f'{args.format} is not implemented')
|
||||
|
||||
def cmd_not_implemented(_: nvs_parser.NVS_Partition) -> None:
|
||||
raise RuntimeError(f'{args.dump} is not implemented')
|
||||
|
||||
formats = {
|
||||
'text': noop,
|
||||
'json': nvs_logger.print_json,
|
||||
}
|
||||
formats.get(args.format, format_not_implemented)(nvs)
|
||||
|
||||
if args.format == 'text':
|
||||
cmds = {
|
||||
'all': nvs_logger.dump_everything,
|
||||
'written': nvs_logger.dump_written_entries,
|
||||
'minimal': nvs_logger.dump_key_value_pairs,
|
||||
'namespaces': nvs_logger.list_namespaces,
|
||||
'blobs': nvs_logger.dump_written_blobs,
|
||||
'storage_info': nvs_logger.storage_stats,
|
||||
'none': noop,
|
||||
}
|
||||
cmds.get(args.dump, cmd_not_implemented)(nvs) # type: ignore
|
||||
|
||||
if args.integrity_check:
|
||||
nvs_log.info()
|
||||
nvs_logger.integrity_check(nvs)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except ValueError:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except nvs_parser.NVS_Constants.ConstantError:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
sys.exit(1)
|
Reference in New Issue
Block a user