2023-08-24 15:37:04 +02:00
|
|
|
"""TemplateEntity utility class."""
|
2024-03-08 16:36:11 +01:00
|
|
|
|
2023-08-24 15:37:04 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
import itertools
|
2023-08-24 15:37:04 +02:00
|
|
|
import logging
|
|
|
|
from typing import Any
|
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
import jinja2
|
2023-08-24 15:37:04 +02:00
|
|
|
import voluptuous as vol
|
|
|
|
|
|
|
|
from homeassistant.components.sensor import (
|
|
|
|
CONF_STATE_CLASS,
|
|
|
|
DEVICE_CLASSES_SCHEMA,
|
|
|
|
STATE_CLASSES_SCHEMA,
|
|
|
|
SensorEntity,
|
|
|
|
)
|
|
|
|
from homeassistant.const import (
|
|
|
|
ATTR_ENTITY_PICTURE,
|
|
|
|
ATTR_FRIENDLY_NAME,
|
|
|
|
ATTR_ICON,
|
|
|
|
CONF_DEVICE_CLASS,
|
|
|
|
CONF_ICON,
|
|
|
|
CONF_NAME,
|
|
|
|
CONF_UNIQUE_ID,
|
|
|
|
CONF_UNIT_OF_MEASUREMENT,
|
|
|
|
)
|
|
|
|
from homeassistant.core import HomeAssistant, State, callback
|
|
|
|
from homeassistant.exceptions import TemplateError
|
|
|
|
from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
|
|
|
|
|
|
|
|
from . import config_validation as cv
|
|
|
|
from .entity import Entity
|
2025-04-25 07:17:25 -04:00
|
|
|
from .template import (
|
|
|
|
_SENTINEL,
|
|
|
|
Template,
|
|
|
|
TemplateStateFromEntityId,
|
|
|
|
_render_with_context,
|
|
|
|
render_complex,
|
|
|
|
result_as_boolean,
|
|
|
|
)
|
2023-08-24 15:37:04 +02:00
|
|
|
from .typing import ConfigType
|
|
|
|
|
|
|
|
CONF_AVAILABILITY = "availability"
|
|
|
|
CONF_ATTRIBUTES = "attributes"
|
|
|
|
CONF_PICTURE = "picture"
|
|
|
|
|
|
|
|
CONF_TO_ATTRIBUTE = {
|
|
|
|
CONF_ICON: ATTR_ICON,
|
|
|
|
CONF_NAME: ATTR_FRIENDLY_NAME,
|
|
|
|
CONF_PICTURE: ATTR_ENTITY_PICTURE,
|
|
|
|
}
|
|
|
|
|
|
|
|
TEMPLATE_ENTITY_BASE_SCHEMA = vol.Schema(
|
|
|
|
{
|
|
|
|
vol.Optional(CONF_ICON): cv.template,
|
|
|
|
vol.Optional(CONF_NAME): cv.template,
|
|
|
|
vol.Optional(CONF_PICTURE): cv.template,
|
|
|
|
vol.Optional(CONF_UNIQUE_ID): cv.string,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def make_template_entity_base_schema(default_name: str) -> vol.Schema:
|
|
|
|
"""Return a schema with default name."""
|
|
|
|
return vol.Schema(
|
|
|
|
{
|
|
|
|
vol.Optional(CONF_ICON): cv.template,
|
|
|
|
vol.Optional(CONF_NAME, default=default_name): cv.template,
|
|
|
|
vol.Optional(CONF_PICTURE): cv.template,
|
|
|
|
vol.Optional(CONF_UNIQUE_ID): cv.string,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
def log_triggered_template_error(
|
|
|
|
entity_id: str,
|
|
|
|
err: TemplateError,
|
|
|
|
key: str | None = None,
|
|
|
|
attribute: str | None = None,
|
|
|
|
) -> None:
|
|
|
|
"""Log a trigger entity template error."""
|
|
|
|
target = ""
|
|
|
|
if key:
|
|
|
|
target = f" {key}"
|
|
|
|
elif attribute:
|
|
|
|
target = f" {CONF_ATTRIBUTES}.{attribute}"
|
|
|
|
|
|
|
|
logging.getLogger(f"{__package__}.{entity_id.split('.')[0]}").error(
|
|
|
|
"Error rendering%s template for %s: %s",
|
|
|
|
target,
|
|
|
|
entity_id,
|
|
|
|
err,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2023-08-24 15:37:04 +02:00
|
|
|
TEMPLATE_SENSOR_BASE_SCHEMA = vol.Schema(
|
|
|
|
{
|
|
|
|
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
|
|
|
|
vol.Optional(CONF_STATE_CLASS): STATE_CLASSES_SCHEMA,
|
|
|
|
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
|
|
|
|
}
|
|
|
|
).extend(TEMPLATE_ENTITY_BASE_SCHEMA.schema)
|
|
|
|
|
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
class ValueTemplate(Template):
|
|
|
|
"""Class to hold a value_template and manage caching and rendering it with 'value' in variables."""
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_template(cls, template: Template) -> ValueTemplate:
|
|
|
|
"""Create a ValueTemplate object from a Template object."""
|
|
|
|
return cls(template.template, template.hass)
|
|
|
|
|
|
|
|
@callback
|
|
|
|
def async_render_as_value_template(
|
|
|
|
self, entity_id: str, variables: dict[str, Any], error_value: Any
|
|
|
|
) -> Any:
|
|
|
|
"""Render template that requires 'value' and optionally 'value_json'.
|
|
|
|
|
|
|
|
Template errors will be suppressed when an error_value is supplied.
|
|
|
|
|
|
|
|
This method must be run in the event loop.
|
|
|
|
"""
|
|
|
|
self._renders += 1
|
|
|
|
|
|
|
|
if self.is_static:
|
|
|
|
return self.template
|
|
|
|
|
|
|
|
compiled = self._compiled or self._ensure_compiled()
|
|
|
|
|
|
|
|
try:
|
|
|
|
render_result = _render_with_context(
|
|
|
|
self.template, compiled, **variables
|
|
|
|
).strip()
|
|
|
|
except jinja2.TemplateError as ex:
|
|
|
|
message = f"Error parsing value for {entity_id}: {ex} (value: {variables['value']}, template: {self.template})"
|
|
|
|
logger = logging.getLogger(f"{__package__}.{entity_id.split('.')[0]}")
|
|
|
|
logger.debug(message)
|
|
|
|
return error_value
|
|
|
|
|
|
|
|
return render_result
|
|
|
|
|
|
|
|
|
2023-08-24 15:37:04 +02:00
|
|
|
class TriggerBaseEntity(Entity):
|
|
|
|
"""Template Base entity based on trigger data."""
|
|
|
|
|
|
|
|
domain: str
|
2023-09-06 09:49:42 +02:00
|
|
|
extra_template_keys: tuple[str, ...] | None = None
|
|
|
|
extra_template_keys_complex: tuple[str, ...] | None = None
|
2023-08-24 15:37:04 +02:00
|
|
|
_unique_id: str | None
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
hass: HomeAssistant,
|
|
|
|
config: ConfigType,
|
|
|
|
) -> None:
|
|
|
|
"""Initialize the entity."""
|
|
|
|
self.hass = hass
|
|
|
|
|
|
|
|
self._set_unique_id(config.get(CONF_UNIQUE_ID))
|
|
|
|
|
|
|
|
self._config = config
|
|
|
|
|
|
|
|
self._static_rendered = {}
|
2023-09-06 09:49:42 +02:00
|
|
|
self._to_render_simple: list[str] = []
|
2023-08-24 15:37:04 +02:00
|
|
|
self._to_render_complex: list[str] = []
|
|
|
|
|
|
|
|
for itm in (
|
|
|
|
CONF_AVAILABILITY,
|
|
|
|
CONF_ICON,
|
|
|
|
CONF_NAME,
|
|
|
|
CONF_PICTURE,
|
|
|
|
):
|
|
|
|
if itm not in config or config[itm] is None:
|
|
|
|
continue
|
|
|
|
if config[itm].is_static:
|
|
|
|
self._static_rendered[itm] = config[itm].template
|
|
|
|
else:
|
|
|
|
self._to_render_simple.append(itm)
|
|
|
|
|
|
|
|
if self.extra_template_keys is not None:
|
|
|
|
self._to_render_simple.extend(self.extra_template_keys)
|
|
|
|
|
|
|
|
if self.extra_template_keys_complex is not None:
|
|
|
|
self._to_render_complex.extend(self.extra_template_keys_complex)
|
|
|
|
|
|
|
|
# We make a copy so our initial render is 'unknown' and not 'unavailable'
|
|
|
|
self._rendered = dict(self._static_rendered)
|
|
|
|
self._parse_result = {CONF_AVAILABILITY}
|
2023-09-12 12:36:56 -05:00
|
|
|
self._attr_device_class = config.get(CONF_DEVICE_CLASS)
|
2023-08-24 15:37:04 +02:00
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
self._availability_template = config.get(CONF_AVAILABILITY)
|
|
|
|
self._available = True
|
|
|
|
|
2023-08-24 15:37:04 +02:00
|
|
|
@property
|
|
|
|
def name(self) -> str | None:
|
|
|
|
"""Name of the entity."""
|
|
|
|
return self._rendered.get(CONF_NAME)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def unique_id(self) -> str | None:
|
|
|
|
"""Return unique ID of the entity."""
|
|
|
|
return self._unique_id
|
|
|
|
|
|
|
|
@property
|
|
|
|
def icon(self) -> str | None:
|
|
|
|
"""Return icon."""
|
|
|
|
return self._rendered.get(CONF_ICON)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def entity_picture(self) -> str | None:
|
|
|
|
"""Return entity picture."""
|
|
|
|
return self._rendered.get(CONF_PICTURE)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def available(self) -> bool:
|
|
|
|
"""Return availability of the entity."""
|
2025-04-25 07:17:25 -04:00
|
|
|
if self._availability_template is None:
|
|
|
|
return True
|
|
|
|
|
|
|
|
return self._available
|
2023-08-24 15:37:04 +02:00
|
|
|
|
|
|
|
@property
|
|
|
|
def extra_state_attributes(self) -> dict[str, Any] | None:
|
|
|
|
"""Return extra attributes."""
|
|
|
|
return self._rendered.get(CONF_ATTRIBUTES)
|
|
|
|
|
|
|
|
def _set_unique_id(self, unique_id: str | None) -> None:
|
|
|
|
"""Set unique id."""
|
|
|
|
self._unique_id = unique_id
|
|
|
|
|
|
|
|
def restore_attributes(self, last_state: State) -> None:
|
|
|
|
"""Restore attributes."""
|
|
|
|
for conf_key, attr in CONF_TO_ATTRIBUTE.items():
|
|
|
|
if conf_key not in self._config or attr not in last_state.attributes:
|
|
|
|
continue
|
|
|
|
self._rendered[conf_key] = last_state.attributes[attr]
|
|
|
|
|
|
|
|
if CONF_ATTRIBUTES in self._config:
|
|
|
|
extra_state_attributes = {}
|
|
|
|
for attr in self._config[CONF_ATTRIBUTES]:
|
|
|
|
if attr not in last_state.attributes:
|
|
|
|
continue
|
|
|
|
extra_state_attributes[attr] = last_state.attributes[attr]
|
|
|
|
self._rendered[CONF_ATTRIBUTES] = extra_state_attributes
|
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
def _template_variables(self, run_variables: dict[str, Any] | None = None) -> dict:
|
|
|
|
"""Render template variables."""
|
|
|
|
return {
|
|
|
|
"this": TemplateStateFromEntityId(self.hass, self.entity_id),
|
|
|
|
**(run_variables or {}),
|
|
|
|
}
|
|
|
|
|
|
|
|
def _render_single_template(
|
|
|
|
self,
|
|
|
|
key: str,
|
|
|
|
variables: dict[str, Any],
|
|
|
|
strict: bool = False,
|
|
|
|
) -> Any:
|
|
|
|
"""Render a single template."""
|
2023-08-24 15:37:04 +02:00
|
|
|
try:
|
2025-04-25 07:17:25 -04:00
|
|
|
if key in self._to_render_complex:
|
|
|
|
return render_complex(self._config[key], variables)
|
2023-08-24 15:37:04 +02:00
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
return self._config[key].async_render(
|
|
|
|
variables, parse_result=key in self._parse_result, strict=strict
|
|
|
|
)
|
|
|
|
except TemplateError as err:
|
|
|
|
log_triggered_template_error(self.entity_id, err, key=key)
|
2023-08-24 15:37:04 +02:00
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
return _SENTINEL
|
|
|
|
|
|
|
|
def _render_availability_template(self, variables: dict[str, Any]) -> bool:
|
|
|
|
"""Render availability template."""
|
|
|
|
if not self._availability_template:
|
|
|
|
return True
|
2023-08-24 15:37:04 +02:00
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
try:
|
|
|
|
if (
|
|
|
|
available := self._availability_template.async_render(
|
|
|
|
variables, parse_result=True, strict=True
|
2023-08-24 15:37:04 +02:00
|
|
|
)
|
2025-04-25 07:17:25 -04:00
|
|
|
) is False:
|
|
|
|
self._rendered = dict(self._static_rendered)
|
|
|
|
|
|
|
|
self._available = result_as_boolean(available)
|
2023-08-24 15:37:04 +02:00
|
|
|
|
|
|
|
except TemplateError as err:
|
2025-04-25 07:17:25 -04:00
|
|
|
# The entity will be available when an error is rendered. This
|
|
|
|
# ensures functionality is consistent between template and trigger template
|
|
|
|
# entities.
|
|
|
|
self._available = True
|
|
|
|
log_triggered_template_error(self.entity_id, err, key=CONF_AVAILABILITY)
|
|
|
|
|
|
|
|
return self._available
|
|
|
|
|
|
|
|
def _render_attributes(self, rendered: dict, variables: dict[str, Any]) -> None:
|
|
|
|
"""Render template attributes."""
|
|
|
|
if CONF_ATTRIBUTES in self._config:
|
|
|
|
attributes = {}
|
|
|
|
for attribute, attribute_template in self._config[CONF_ATTRIBUTES].items():
|
|
|
|
try:
|
|
|
|
value = render_complex(attribute_template, variables)
|
|
|
|
attributes[attribute] = value
|
|
|
|
variables.update({attribute: value})
|
|
|
|
except TemplateError as err:
|
|
|
|
log_triggered_template_error(
|
|
|
|
self.entity_id, err, attribute=attribute
|
|
|
|
)
|
|
|
|
rendered[CONF_ATTRIBUTES] = attributes
|
|
|
|
|
|
|
|
def _render_single_templates(
|
|
|
|
self,
|
|
|
|
rendered: dict,
|
|
|
|
variables: dict[str, Any],
|
|
|
|
filtered: list[str] | None = None,
|
|
|
|
) -> None:
|
|
|
|
"""Render all single templates."""
|
|
|
|
for key in itertools.chain(self._to_render_simple, self._to_render_complex):
|
|
|
|
if filtered and key in filtered:
|
|
|
|
continue
|
|
|
|
|
|
|
|
if (
|
|
|
|
result := self._render_single_template(key, variables)
|
|
|
|
) is not _SENTINEL:
|
|
|
|
rendered[key] = result
|
|
|
|
|
|
|
|
def _render_templates(self, variables: dict[str, Any]) -> None:
|
|
|
|
"""Render templates."""
|
|
|
|
rendered = dict(self._static_rendered)
|
|
|
|
self._render_single_templates(rendered, variables)
|
|
|
|
self._render_attributes(rendered, variables)
|
|
|
|
self._rendered = rendered
|
2023-08-24 15:37:04 +02:00
|
|
|
|
|
|
|
|
|
|
|
class ManualTriggerEntity(TriggerBaseEntity):
|
|
|
|
"""Template entity based on manual trigger data."""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
hass: HomeAssistant,
|
|
|
|
config: ConfigType,
|
|
|
|
) -> None:
|
|
|
|
"""Initialize the entity."""
|
|
|
|
TriggerBaseEntity.__init__(self, hass, config)
|
|
|
|
# Need initial rendering on `name` as it influence the `entity_id`
|
|
|
|
self._rendered[CONF_NAME] = config[CONF_NAME].async_render(
|
|
|
|
{},
|
|
|
|
parse_result=CONF_NAME in self._parse_result,
|
|
|
|
)
|
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
def _template_variables_with_value(
|
|
|
|
self, value: str | None = None
|
|
|
|
) -> dict[str, Any]:
|
|
|
|
"""Render template variables.
|
2023-08-24 15:37:04 +02:00
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
Implementing class should call this first in update method to render variables for templates.
|
|
|
|
Ex: variables = self._render_template_variables_with_value(payload)
|
2023-08-24 15:37:04 +02:00
|
|
|
"""
|
|
|
|
run_variables: dict[str, Any] = {"value": value}
|
2025-04-25 07:17:25 -04:00
|
|
|
|
2023-08-24 15:37:04 +02:00
|
|
|
# Silently try if variable is a json and store result in `value_json` if it is.
|
2025-04-25 07:17:25 -04:00
|
|
|
try: # noqa: SIM105 - suppress is much slower
|
|
|
|
run_variables["value_json"] = json_loads(value) # type: ignore[arg-type]
|
|
|
|
except JSON_DECODE_EXCEPTIONS:
|
|
|
|
pass
|
|
|
|
|
|
|
|
return self._template_variables(run_variables)
|
2023-08-24 15:37:04 +02:00
|
|
|
|
2025-04-25 07:17:25 -04:00
|
|
|
@callback
|
|
|
|
def _process_manual_data(self, variables: dict[str, Any]) -> None:
|
|
|
|
"""Process new data manually.
|
|
|
|
|
|
|
|
Implementing class should call this last in update method to render templates.
|
|
|
|
Ex: self._process_manual_data(variables)
|
|
|
|
"""
|
2023-08-24 15:37:04 +02:00
|
|
|
self._render_templates(variables)
|
|
|
|
|
|
|
|
|
|
|
|
class ManualTriggerSensorEntity(ManualTriggerEntity, SensorEntity):
|
|
|
|
"""Template entity based on manual trigger data for sensor."""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
hass: HomeAssistant,
|
|
|
|
config: ConfigType,
|
|
|
|
) -> None:
|
|
|
|
"""Initialize the sensor entity."""
|
|
|
|
ManualTriggerEntity.__init__(self, hass, config)
|
|
|
|
self._attr_native_unit_of_measurement = config.get(CONF_UNIT_OF_MEASUREMENT)
|
|
|
|
self._attr_state_class = config.get(CONF_STATE_CLASS)
|