Merge branch 'fix/nvs_logger_json_output_v5.4' into 'release/v5.4'

fix(nvs_flash/nvs_tool): Fix nvs_tool.py to output required values (v5.4)

See merge request espressif/esp-idf!38681
This commit is contained in:
Martin Vychodil
2025-04-25 00:46:40 +08:00
3 changed files with 151 additions and 19 deletions

View File

@ -1,12 +1,17 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import binascii
import json
import sys
from typing import Any, Dict, List, Union
from typing import Any
from typing import Dict
from typing import List
from typing import Union
from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
from nvs_parser import nvs_const
from nvs_parser import NVS_Entry
from nvs_parser import NVS_Partition
class NVS_Logger:
@ -207,7 +212,7 @@ def dump_everything(nvs_partition: NVS_Partition, written_only: bool = False) ->
+ f', Span: {entry.metadata["span"]:03d}'
+ f', Chunk Index: {entry.metadata["chunk_index"]:03d}'
+ f', CRC32: {crc}'
+ f' | {entry.key} : ',
+ f' | {entry.key}: ',
end='',
)
@ -244,7 +249,7 @@ def dump_everything(nvs_partition: NVS_Partition, written_only: bool = False) ->
if entry.metadata['span'] != 1:
for i, data in enumerate(entry.children):
nvs_log.info(
f'{"": >6}0x{(i*nvs_const.entry_size):03x} {data.dump_raw()}'
f'{"": >6}0x{(i * nvs_const.entry_size):03x} {data.dump_raw()}'
)
# Dump trailing empty entries
@ -272,9 +277,9 @@ def list_namespaces(nvs_partition: NVS_Partition) -> None:
ns[entry.data['value']] = entry.key
# Print found namespaces
nvs_log.info(nvs_log.bold(f'Index : Namespace'))
nvs_log.info(nvs_log.bold(f'Index: Namespace'))
for ns_index in sorted(ns):
nvs_log.info(f' {ns_index:03d} :', nvs_log.cyan(ns[ns_index]))
nvs_log.info(f' {ns_index:03d}:', nvs_log.cyan(ns[ns_index]))
def dump_key_value_pairs(nvs_partition: NVS_Partition) -> None:
@ -324,7 +329,10 @@ def dump_key_value_pairs(nvs_partition: NVS_Partition) -> None:
chunk_index = f'[{entry.metadata["chunk_index"] - 128}]'
else:
chunk_index = f'[{entry.metadata["chunk_index"]}]'
data = str(tmp)
elif entry.metadata['type'] == 'string':
data = str(tmp, 'utf-8')
else:
data = str(tmp)
if entry.metadata['namespace'] not in ns:
continue
@ -456,3 +464,49 @@ def print_json(nvs: NVS_Partition) -> None:
return json.JSONEncoder.default(self, obj)
print(json.dumps(nvs.toJSON(), cls=NVSEncoder, indent=2))
def print_minimal_json(nvs_partition: NVS_Partition) -> None:
# Get namespace list
ns = {}
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
ns[entry.data['value']] = entry.key
# Prepare key-value pairs for JSON output
key_value_pairs = []
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written' and entry.metadata['namespace'] != 0:
data = ''
entry_type = entry.metadata['type']
if entry_type not in [
'string',
'blob_data',
'blob_index',
'blob',
]: # Non-variable length entry
data = entry.data['value']
elif entry_type == 'blob_index':
continue
else: # Variable length entries
tmp = b''
for e in entry.children: # Merge all children entries
tmp += bytes(e.raw)
tmp = tmp[: entry.data['size']] # Discard padding
if entry_type == 'string':
data = str(tmp.rstrip(b'\x00'), 'utf-8')
else:
data = binascii.b2a_base64(tmp, newline=False).decode('ascii')
if entry.metadata['namespace'] in ns:
key_value_pairs.append({
'namespace': ns[entry.metadata['namespace']],
'key': entry.key,
'encoding': entry_type, # Add type of data
'data': data, # Ensure data ends with a newline
'state': entry.state,
'is_empty': entry.is_empty if hasattr(entry, 'is_empty') else None,
})
nvs_log.info(json.dumps(key_value_pairs, indent=4))

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
@ -90,12 +90,10 @@ def main() -> None:
def cmd_not_implemented(_: nvs_parser.NVS_Partition) -> None:
raise RuntimeError(f'{args.dump} is not implemented')
formats = {
'text': noop,
'json': nvs_logger.print_json,
}
formats.get(args.format, format_not_implemented)(nvs)
if args.format not in ['text', 'json']:
format_not_implemented(nvs)
cmds = {}
if args.format == 'text':
cmds = {
'all': nvs_logger.dump_everything,
@ -106,11 +104,19 @@ def main() -> None:
'storage_info': nvs_logger.storage_stats,
'none': noop,
}
cmds.get(args.dump, cmd_not_implemented)(nvs) # type: ignore
if args.integrity_check:
nvs_log.info()
nvs_check.integrity_check(nvs, nvs_log)
if args.format == 'json':
cmds = {
'all': nvs_logger.print_json,
'minimal': nvs_logger.print_minimal_json,
'none': noop,
}
cmds.get(args.dump, cmd_not_implemented)(nvs)
if args.integrity_check:
nvs_log.info()
nvs_check.integrity_check(nvs, nvs_log)
if __name__ == '__main__':

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import base64
import json
from importlib.metadata import version
from io import BufferedRandom
from io import BytesIO
@ -19,12 +21,12 @@ import pytest
from esp_idf_nvs_partition_gen.nvs_partition_gen import NVS
from nvs_logger import nvs_log
from nvs_logger import NVS_Logger
from nvs_logger import print_minimal_json
from nvs_parser import nvs_const
from nvs_parser import NVS_Entry
from nvs_parser import NVS_Partition
from packaging.version import Version
NVS_PART_GEN_VERSION_SKIP = '0.1.8'
@ -346,6 +348,29 @@ def setup_read_only(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
return nvs_obj
def setup_minimal_json(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None,
read_only=read_only
)
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'int32_test', 'data', 'i32', str(-42))
nvs_partition_gen.write_entry(nvs_obj, 'uint32_test', 'data', 'u32', str(96))
nvs_partition_gen.write_entry(nvs_obj, 'int8_test', 'data', 'i8', str(100))
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'short_str_key', 'data', 'string', 'Another string data')
nvs_partition_gen.write_entry(nvs_obj, 'long_str_key', 'data', 'string', LOREM_STRING)
return nvs_obj
# Helper functions
def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]:
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
@ -441,3 +466,50 @@ def test_check_read_only_partition(generate_nvs: Callable, setup_func: Callable)
assert len(nvs.raw_data) == 0x1000
assert nvs_check.check_partition_size(nvs, logger, read_only=True)
assert not nvs_check.check_empty_page_present(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_minimal_json])
def test_print_minimal_json(generate_nvs: Callable, setup_func: Callable, capsys: pytest.CaptureFixture) -> None:
nvs = generate_nvs(setup_func)
logger.set_format('json')
print_minimal_json(nvs)
captured = capsys.readouterr()
assert captured.out.startswith('[')
assert captured.out.endswith(']\n')
assert '"namespace"' in captured.out
assert '"key"' in captured.out
assert '"encoding"' in captured.out
assert '"data"' in captured.out
assert '"state"' in captured.out
assert '"is_empty"' in captured.out
# Check if the data is correct
assert '100' in captured.out and '-42' in captured.out and '96' in captured.out
assert 'Another string data' in captured.out
# Check if the LOREM_STRING is present and properly formatted
lorem_string_escaped = LOREM_STRING.replace('\n', '\\n')
assert lorem_string_escaped in captured.out
# Check if the blob key data is present and correct
assert captured.out.count('"key": "blob_key"') == 2
# Load the captured output as JSON
output_json = json.loads(captured.out)
# Gather all entries with the key 'blob_key' and decode them from base64
blob_key_data_binary = b''.join(
base64.b64decode(entry['data']) for entry in output_json if entry['key'] == 'blob_key'
)
# Read the sample multipage blob data from the binary file
with open('../nvs_partition_generator/testdata/sample_multipage_blob.bin', 'rb') as f:
sample_blob_data = f.read()
# Check if the gathered blob_key data matches the sample multipage blob data
assert sample_blob_data == blob_key_data_binary
# Check if all keys are present
assert captured.out.count('blob_key') == 2
assert captured.out.count('short_str_key') == 1
assert captured.out.count('long_str_key') == 1
assert captured.out.count('int32_test') == 2 # 2 entries for int32_test
assert captured.out.count('uint32_test') == 1
assert captured.out.count('int8_test') == 1