Files
core/homeassistant/helpers/trigger_template_entity.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

392 lines
12 KiB
Python
Raw Permalink Normal View History

"""TemplateEntity utility class."""
from __future__ import annotations
import itertools
import logging
from typing import Any
import jinja2
import voluptuous as vol
from homeassistant.components.sensor import (
CONF_STATE_CLASS,
DEVICE_CLASSES_SCHEMA,
STATE_CLASSES_SCHEMA,
SensorEntity,
)
from homeassistant.const import (
ATTR_ENTITY_PICTURE,
ATTR_FRIENDLY_NAME,
ATTR_ICON,
CONF_DEVICE_CLASS,
CONF_ICON,
CONF_NAME,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
)
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.exceptions import TemplateError
from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
from . import config_validation as cv
from .entity import Entity
from .template import (
_SENTINEL,
Template,
TemplateStateFromEntityId,
_render_with_context,
render_complex,
result_as_boolean,
)
from .typing import ConfigType
CONF_AVAILABILITY = "availability"
CONF_ATTRIBUTES = "attributes"
CONF_PICTURE = "picture"
CONF_TO_ATTRIBUTE = {
CONF_ICON: ATTR_ICON,
CONF_NAME: ATTR_FRIENDLY_NAME,
CONF_PICTURE: ATTR_ENTITY_PICTURE,
}
TEMPLATE_ENTITY_BASE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ICON): cv.template,
vol.Optional(CONF_NAME): cv.template,
vol.Optional(CONF_PICTURE): cv.template,
vol.Optional(CONF_UNIQUE_ID): cv.string,
}
)
def make_template_entity_base_schema(default_name: str) -> vol.Schema:
"""Return a schema with default name."""
return vol.Schema(
{
vol.Optional(CONF_ICON): cv.template,
vol.Optional(CONF_NAME, default=default_name): cv.template,
vol.Optional(CONF_PICTURE): cv.template,
vol.Optional(CONF_UNIQUE_ID): cv.string,
}
)
def log_triggered_template_error(
entity_id: str,
err: TemplateError,
key: str | None = None,
attribute: str | None = None,
) -> None:
"""Log a trigger entity template error."""
target = ""
if key:
target = f" {key}"
elif attribute:
target = f" {CONF_ATTRIBUTES}.{attribute}"
logging.getLogger(f"{__package__}.{entity_id.split('.')[0]}").error(
"Error rendering%s template for %s: %s",
target,
entity_id,
err,
)
TEMPLATE_SENSOR_BASE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_STATE_CLASS): STATE_CLASSES_SCHEMA,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
}
).extend(TEMPLATE_ENTITY_BASE_SCHEMA.schema)
class ValueTemplate(Template):
"""Class to hold a value_template and manage caching and rendering it with 'value' in variables."""
@classmethod
def from_template(cls, template: Template) -> ValueTemplate:
"""Create a ValueTemplate object from a Template object."""
return cls(template.template, template.hass)
@callback
def async_render_as_value_template(
self, entity_id: str, variables: dict[str, Any], error_value: Any
) -> Any:
"""Render template that requires 'value' and optionally 'value_json'.
Template errors will be suppressed when an error_value is supplied.
This method must be run in the event loop.
"""
self._renders += 1
if self.is_static:
return self.template
compiled = self._compiled or self._ensure_compiled()
try:
render_result = _render_with_context(
self.template, compiled, **variables
).strip()
except jinja2.TemplateError as ex:
message = f"Error parsing value for {entity_id}: {ex} (value: {variables['value']}, template: {self.template})"
logger = logging.getLogger(f"{__package__}.{entity_id.split('.')[0]}")
logger.debug(message)
return error_value
return render_result
class TriggerBaseEntity(Entity):
"""Template Base entity based on trigger data."""
domain: str
extra_template_keys: tuple[str, ...] | None = None
extra_template_keys_complex: tuple[str, ...] | None = None
_unique_id: str | None
def __init__(
self,
hass: HomeAssistant,
config: ConfigType,
) -> None:
"""Initialize the entity."""
self.hass = hass
self._set_unique_id(config.get(CONF_UNIQUE_ID))
self._config = config
self._static_rendered = {}
self._to_render_simple: list[str] = []
self._to_render_complex: list[str] = []
for itm in (
CONF_AVAILABILITY,
CONF_ICON,
CONF_NAME,
CONF_PICTURE,
):
if itm not in config or config[itm] is None:
continue
if config[itm].is_static:
self._static_rendered[itm] = config[itm].template
else:
self._to_render_simple.append(itm)
if self.extra_template_keys is not None:
self._to_render_simple.extend(self.extra_template_keys)
if self.extra_template_keys_complex is not None:
self._to_render_complex.extend(self.extra_template_keys_complex)
# We make a copy so our initial render is 'unknown' and not 'unavailable'
self._rendered = dict(self._static_rendered)
self._parse_result = {CONF_AVAILABILITY}
self._attr_device_class = config.get(CONF_DEVICE_CLASS)
self._availability_template = config.get(CONF_AVAILABILITY)
self._available = True
@property
def name(self) -> str | None:
"""Name of the entity."""
return self._rendered.get(CONF_NAME)
@property
def unique_id(self) -> str | None:
"""Return unique ID of the entity."""
return self._unique_id
@property
def icon(self) -> str | None:
"""Return icon."""
return self._rendered.get(CONF_ICON)
@property
def entity_picture(self) -> str | None:
"""Return entity picture."""
return self._rendered.get(CONF_PICTURE)
@property
def available(self) -> bool:
"""Return availability of the entity."""
if self._availability_template is None:
return True
return self._available
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return extra attributes."""
return self._rendered.get(CONF_ATTRIBUTES)
def _set_unique_id(self, unique_id: str | None) -> None:
"""Set unique id."""
self._unique_id = unique_id
def restore_attributes(self, last_state: State) -> None:
"""Restore attributes."""
for conf_key, attr in CONF_TO_ATTRIBUTE.items():
if conf_key not in self._config or attr not in last_state.attributes:
continue
self._rendered[conf_key] = last_state.attributes[attr]
if CONF_ATTRIBUTES in self._config:
extra_state_attributes = {}
for attr in self._config[CONF_ATTRIBUTES]:
if attr not in last_state.attributes:
continue
extra_state_attributes[attr] = last_state.attributes[attr]
self._rendered[CONF_ATTRIBUTES] = extra_state_attributes
def _template_variables(self, run_variables: dict[str, Any] | None = None) -> dict:
"""Render template variables."""
return {
"this": TemplateStateFromEntityId(self.hass, self.entity_id),
**(run_variables or {}),
}
def _render_single_template(
self,
key: str,
variables: dict[str, Any],
strict: bool = False,
) -> Any:
"""Render a single template."""
try:
if key in self._to_render_complex:
return render_complex(self._config[key], variables)
return self._config[key].async_render(
variables, parse_result=key in self._parse_result, strict=strict
)
except TemplateError as err:
log_triggered_template_error(self.entity_id, err, key=key)
return _SENTINEL
def _render_availability_template(self, variables: dict[str, Any]) -> bool:
"""Render availability template."""
if not self._availability_template:
return True
try:
if (
available := self._availability_template.async_render(
variables, parse_result=True, strict=True
)
) is False:
self._rendered = dict(self._static_rendered)
self._available = result_as_boolean(available)
except TemplateError as err:
# The entity will be available when an error is rendered. This
# ensures functionality is consistent between template and trigger template
# entities.
self._available = True
log_triggered_template_error(self.entity_id, err, key=CONF_AVAILABILITY)
return self._available
def _render_attributes(self, rendered: dict, variables: dict[str, Any]) -> None:
"""Render template attributes."""
if CONF_ATTRIBUTES in self._config:
attributes = {}
for attribute, attribute_template in self._config[CONF_ATTRIBUTES].items():
try:
value = render_complex(attribute_template, variables)
attributes[attribute] = value
variables.update({attribute: value})
except TemplateError as err:
log_triggered_template_error(
self.entity_id, err, attribute=attribute
)
rendered[CONF_ATTRIBUTES] = attributes
def _render_single_templates(
self,
rendered: dict,
variables: dict[str, Any],
filtered: list[str] | None = None,
) -> None:
"""Render all single templates."""
for key in itertools.chain(self._to_render_simple, self._to_render_complex):
if filtered and key in filtered:
continue
if (
result := self._render_single_template(key, variables)
) is not _SENTINEL:
rendered[key] = result
def _render_templates(self, variables: dict[str, Any]) -> None:
"""Render templates."""
rendered = dict(self._static_rendered)
self._render_single_templates(rendered, variables)
self._render_attributes(rendered, variables)
self._rendered = rendered
class ManualTriggerEntity(TriggerBaseEntity):
"""Template entity based on manual trigger data."""
def __init__(
self,
hass: HomeAssistant,
config: ConfigType,
) -> None:
"""Initialize the entity."""
TriggerBaseEntity.__init__(self, hass, config)
# Need initial rendering on `name` as it influence the `entity_id`
self._rendered[CONF_NAME] = config[CONF_NAME].async_render(
{},
parse_result=CONF_NAME in self._parse_result,
)
def _template_variables_with_value(
self, value: str | None = None
) -> dict[str, Any]:
"""Render template variables.
Implementing class should call this first in update method to render variables for templates.
Ex: variables = self._render_template_variables_with_value(payload)
"""
run_variables: dict[str, Any] = {"value": value}
# Silently try if variable is a json and store result in `value_json` if it is.
try: # noqa: SIM105 - suppress is much slower
run_variables["value_json"] = json_loads(value) # type: ignore[arg-type]
except JSON_DECODE_EXCEPTIONS:
pass
return self._template_variables(run_variables)
@callback
def _process_manual_data(self, variables: dict[str, Any]) -> None:
"""Process new data manually.
Implementing class should call this last in update method to render templates.
Ex: self._process_manual_data(variables)
"""
self._render_templates(variables)
class ManualTriggerSensorEntity(ManualTriggerEntity, SensorEntity):
"""Template entity based on manual trigger data for sensor."""
def __init__(
self,
hass: HomeAssistant,
config: ConfigType,
) -> None:
"""Initialize the sensor entity."""
ManualTriggerEntity.__init__(self, hass, config)
self._attr_native_unit_of_measurement = config.get(CONF_UNIT_OF_MEASUREMENT)
self._attr_state_class = config.get(CONF_STATE_CLASS)