mirror of
https://github.com/home-assistant/core.git
synced 2025-07-31 19:25:12 +02:00
Inverse json import logic (#88099)
* Fix helpers and util * Adjust components * Move back errors * Add report * mypy * mypy * Assert deprecation messages * Move test_json_loads_object * Adjust tests * Fix rebase * Adjust pylint plugin * Fix plugin * Adjust references * Adjust backup tests
This commit is contained in:
@@ -1,21 +1,25 @@
|
||||
"""Helpers to help with encoding Home Assistant objects in JSON."""
|
||||
from collections import deque
|
||||
from collections.abc import Callable
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Final
|
||||
|
||||
import orjson
|
||||
|
||||
JsonValueType = (
|
||||
dict[str, "JsonValueType"] | list["JsonValueType"] | str | int | float | bool | None
|
||||
from homeassistant.core import Event, State
|
||||
from homeassistant.util.file import write_utf8_file, write_utf8_file_atomic
|
||||
from homeassistant.util.json import ( # pylint: disable=unused-import # noqa: F401
|
||||
JSON_DECODE_EXCEPTIONS,
|
||||
JSON_ENCODE_EXCEPTIONS,
|
||||
SerializationError,
|
||||
format_unserializable_data,
|
||||
json_loads,
|
||||
)
|
||||
"""Any data that can be returned by the standard JSON deserializing process."""
|
||||
JsonObjectType = dict[str, JsonValueType]
|
||||
"""Dictionary that can be returned by the standard JSON deserializing process."""
|
||||
|
||||
JSON_ENCODE_EXCEPTIONS = (TypeError, ValueError)
|
||||
JSON_DECODE_EXCEPTIONS = (orjson.JSONDecodeError,)
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JSONEncoder(json.JSONEncoder):
|
||||
@@ -140,18 +144,99 @@ def json_dumps_sorted(data: Any) -> str:
|
||||
).decode("utf-8")
|
||||
|
||||
|
||||
json_loads: Callable[[bytes | bytearray | memoryview | str], JsonValueType]
|
||||
json_loads = orjson.loads
|
||||
"""Parse JSON data."""
|
||||
|
||||
|
||||
def json_loads_object(__obj: bytes | bytearray | memoryview | str) -> JsonObjectType:
|
||||
"""Parse JSON data and ensure result is a dictionary."""
|
||||
value: JsonValueType = json_loads(__obj)
|
||||
# Avoid isinstance overhead as we are not interested in dict subclasses
|
||||
if type(value) is dict: # pylint: disable=unidiomatic-typecheck
|
||||
return value
|
||||
raise ValueError(f"Expected JSON to be parsed as a dict got {type(value)}")
|
||||
|
||||
|
||||
JSON_DUMP: Final = json_dumps
|
||||
|
||||
|
||||
def _orjson_default_encoder(data: Any) -> str:
|
||||
"""JSON encoder that uses orjson with hass defaults."""
|
||||
return orjson.dumps(
|
||||
data,
|
||||
option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS,
|
||||
default=json_encoder_default,
|
||||
).decode("utf-8")
|
||||
|
||||
|
||||
def save_json(
|
||||
filename: str,
|
||||
data: list | dict,
|
||||
private: bool = False,
|
||||
*,
|
||||
encoder: type[json.JSONEncoder] | None = None,
|
||||
atomic_writes: bool = False,
|
||||
) -> None:
|
||||
"""Save JSON data to a file."""
|
||||
dump: Callable[[Any], Any]
|
||||
try:
|
||||
# For backwards compatibility, if they pass in the
|
||||
# default json encoder we use _orjson_default_encoder
|
||||
# which is the orjson equivalent to the default encoder.
|
||||
if encoder and encoder is not JSONEncoder:
|
||||
# If they pass a custom encoder that is not the
|
||||
# default JSONEncoder, we use the slow path of json.dumps
|
||||
dump = json.dumps
|
||||
json_data = json.dumps(data, indent=2, cls=encoder)
|
||||
else:
|
||||
dump = _orjson_default_encoder
|
||||
json_data = _orjson_default_encoder(data)
|
||||
except TypeError as error:
|
||||
formatted_data = format_unserializable_data(
|
||||
find_paths_unserializable_data(data, dump=dump)
|
||||
)
|
||||
msg = f"Failed to serialize to JSON: {filename}. Bad data at {formatted_data}"
|
||||
_LOGGER.error(msg)
|
||||
raise SerializationError(msg) from error
|
||||
|
||||
if atomic_writes:
|
||||
write_utf8_file_atomic(filename, json_data, private)
|
||||
else:
|
||||
write_utf8_file(filename, json_data, private)
|
||||
|
||||
|
||||
def find_paths_unserializable_data(
|
||||
bad_data: Any, *, dump: Callable[[Any], str] = json.dumps
|
||||
) -> dict[str, Any]:
|
||||
"""Find the paths to unserializable data.
|
||||
|
||||
This method is slow! Only use for error handling.
|
||||
"""
|
||||
to_process = deque([(bad_data, "$")])
|
||||
invalid = {}
|
||||
|
||||
while to_process:
|
||||
obj, obj_path = to_process.popleft()
|
||||
|
||||
try:
|
||||
dump(obj)
|
||||
continue
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# We convert objects with as_dict to their dict values
|
||||
# so we can find bad data inside it
|
||||
if hasattr(obj, "as_dict"):
|
||||
desc = obj.__class__.__name__
|
||||
if isinstance(obj, State):
|
||||
desc += f": {obj.entity_id}"
|
||||
elif isinstance(obj, Event):
|
||||
desc += f": {obj.event_type}"
|
||||
|
||||
obj_path += f"({desc})"
|
||||
obj = obj.as_dict()
|
||||
|
||||
if isinstance(obj, dict):
|
||||
for key, value in obj.items():
|
||||
try:
|
||||
# Is key valid?
|
||||
dump({key: None})
|
||||
except TypeError:
|
||||
invalid[f"{obj_path}<key: {key}>"] = key
|
||||
else:
|
||||
# Process value
|
||||
to_process.append((value, f"{obj_path}.{key}"))
|
||||
elif isinstance(obj, list):
|
||||
for idx, value in enumerate(obj):
|
||||
to_process.append((value, f"{obj_path}[{idx}]"))
|
||||
else:
|
||||
invalid[obj_path] = obj
|
||||
|
||||
return invalid
|
||||
|
Reference in New Issue
Block a user