Extract functional utility template functions into a functional Jinja2 extension (#167357)

This commit is contained in:
Franck Nijhof
2026-04-06 12:48:15 +02:00
committed by GitHub
parent 2216fcccc7
commit ddc00f6924
6 changed files with 773 additions and 682 deletions

View File

@@ -6,15 +6,12 @@ from ast import literal_eval
import asyncio
import collections.abc
from collections.abc import Callable, Generator, Iterable
from copy import deepcopy
from datetime import datetime, timedelta
from enum import Enum
from functools import cache, lru_cache, partial, wraps
import logging
import math
from operator import contains
import pathlib
import random
import re
import sys
from types import CodeType
@@ -45,7 +42,6 @@ from homeassistant.const import (
from homeassistant.core import (
Context,
HomeAssistant,
ServiceResponse,
State,
callback,
valid_domain,
@@ -1460,164 +1456,11 @@ def add(value, amount, default=_SENTINEL):
return default
def apply(value, fn, *args, **kwargs):
"""Call the given callable with the provided arguments and keyword arguments."""
return fn(value, *args, **kwargs)
def as_function(macro: jinja2.runtime.Macro) -> Callable[..., Any]:
"""Turn a macro with a 'returns' keyword argument into a function that returns what that argument is called with."""
def wrapper(*args, **kwargs):
return_value = None
def returns(value):
nonlocal return_value
return_value = value
return value
# Call the callable with the value and other args
macro(*args, **kwargs, returns=returns)
return return_value
# Remove "macro_" from the macro's name to avoid confusion in the wrapper's name
trimmed_name = macro.name.removeprefix("macro_")
wrapper.__name__ = trimmed_name
wrapper.__qualname__ = trimmed_name
return wrapper
def version(value):
"""Filter and function to get version object of the value."""
return AwesomeVersion(value)
def merge_response(value: ServiceResponse) -> list[Any]:
"""Merge action responses into single list.
Checks that the input is a correct service response:
{
"entity_id": {str: dict[str, Any]},
}
If response is a single list, it will extend the list with the items
and add the entity_id and value_key to each dictionary for reference.
If response is a dictionary or multiple lists,
it will append the dictionary/lists to the list
and add the entity_id to each dictionary for reference.
"""
if not isinstance(value, dict):
raise TypeError("Response is not a dictionary")
if not value:
# Bail out early if response is an empty dictionary
return []
is_single_list = False
response_items: list = []
input_service_response = deepcopy(value)
for entity_id, entity_response in input_service_response.items(): # pylint: disable=too-many-nested-blocks
if not isinstance(entity_response, dict):
raise TypeError("Response is not a dictionary")
for value_key, type_response in entity_response.items():
if len(entity_response) == 1 and isinstance(type_response, list):
# Provides special handling for responses such as calendar events
# and weather forecasts where the response contains a single list with multiple
# dictionaries inside.
is_single_list = True
for dict_in_list in type_response:
if isinstance(dict_in_list, dict):
if ATTR_ENTITY_ID in dict_in_list:
raise ValueError(
f"Response dictionary already contains key '{ATTR_ENTITY_ID}'"
)
dict_in_list[ATTR_ENTITY_ID] = entity_id
dict_in_list["value_key"] = value_key
response_items.extend(type_response)
else:
# Break the loop if not a single list as the logic is then managed in the outer loop
# which handles both dictionaries and in the case of multiple lists.
break
if not is_single_list:
_response = entity_response.copy()
if ATTR_ENTITY_ID in _response:
raise ValueError(
f"Response dictionary already contains key '{ATTR_ENTITY_ID}'"
)
_response[ATTR_ENTITY_ID] = entity_id
response_items.append(_response)
return response_items
def fail_when_undefined(value):
"""Filter to force a failure when the value is undefined."""
if isinstance(value, jinja2.Undefined):
value()
return value
@pass_context
def random_every_time(context, values):
"""Choose a random value.
Unlike Jinja's random filter,
this is context-dependent to avoid caching the chosen value.
"""
return random.choice(values)
def iif(
value: Any, if_true: Any = True, if_false: Any = False, if_none: Any = _SENTINEL
) -> Any:
"""Immediate if function/filter that allow for common if/else constructs.
https://en.wikipedia.org/wiki/IIf
Examples:
{{ is_state("device_tracker.frenck", "home") | iif("yes", "no") }}
{{ iif(1==2, "yes", "no") }}
{{ (1 == 1) | iif("yes", "no") }}
"""
if value is None and if_none is not _SENTINEL:
return if_none
if bool(value):
return if_true
return if_false
def typeof(value: Any) -> Any:
"""Return the type of value passed to debug types."""
return value.__class__.__name__
def combine(*args: Any, recursive: bool = False) -> dict[Any, Any]:
"""Combine multiple dictionaries into one."""
if not args:
raise TypeError("combine expected at least 1 argument, got 0")
result: dict[Any, Any] = {}
for arg in args:
if not isinstance(arg, dict):
raise TypeError(f"combine expected a dict, got {type(arg).__name__}")
if recursive:
for key, value in arg.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = combine(result[key], value, recursive=True)
else:
result[key] = value
else:
result |= arg
return result
def make_logging_undefined(
strict: bool | None, log_fn: Callable[[int, str], None] | None
) -> type[jinja2.Undefined]:
@@ -1754,6 +1597,9 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
)
self.add_extension("homeassistant.helpers.template.extensions.DeviceExtension")
self.add_extension("homeassistant.helpers.template.extensions.FloorExtension")
self.add_extension(
"homeassistant.helpers.template.extensions.FunctionalExtension"
)
self.add_extension("homeassistant.helpers.template.extensions.IssuesExtension")
self.add_extension("homeassistant.helpers.template.extensions.LabelExtension")
self.add_extension("homeassistant.helpers.template.extensions.MathExtension")
@@ -1766,32 +1612,13 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
"homeassistant.helpers.template.extensions.TypeCastExtension"
)
self.globals["apply"] = apply
self.globals["as_function"] = as_function
self.globals["combine"] = combine
self.globals["iif"] = iif
self.globals["merge_response"] = merge_response
self.globals["typeof"] = typeof
self.globals["version"] = version
self.globals["zip"] = zip
self.filters["add"] = add
self.filters["apply"] = apply
self.filters["as_function"] = as_function
self.filters["combine"] = combine
self.filters["contains"] = contains
self.filters["iif"] = iif
self.filters["is_defined"] = fail_when_undefined
self.filters["multiply"] = multiply
self.filters["ord"] = ord
self.filters["random"] = random_every_time
self.filters["round"] = forgiving_round
self.filters["typeof"] = typeof
self.filters["version"] = version
self.tests["apply"] = apply
self.tests["contains"] = contains
if hass is None:
return

View File

@@ -7,6 +7,7 @@ from .crypto import CryptoExtension
from .datetime import DateTimeExtension
from .devices import DeviceExtension
from .floors import FloorExtension
from .functional import FunctionalExtension
from .issues import IssuesExtension
from .labels import LabelExtension
from .math import MathExtension
@@ -23,6 +24,7 @@ __all__ = [
"DateTimeExtension",
"DeviceExtension",
"FloorExtension",
"FunctionalExtension",
"IssuesExtension",
"LabelExtension",
"MathExtension",

View File

@@ -0,0 +1,247 @@
"""Functional utility functions for Home Assistant templates."""
from __future__ import annotations
from collections.abc import Callable
from copy import deepcopy
from operator import contains
import random
from typing import TYPE_CHECKING, Any
import jinja2
from jinja2 import pass_context
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import ServiceResponse
from .base import BaseTemplateExtension, TemplateFunction
if TYPE_CHECKING:
from homeassistant.helpers.template import TemplateEnvironment
_SENTINEL = object()
class FunctionalExtension(BaseTemplateExtension):
"""Jinja2 extension for functional utility functions."""
def __init__(self, environment: TemplateEnvironment) -> None:
"""Initialize the functional extension."""
super().__init__(
environment,
functions=[
TemplateFunction(
"apply",
self.apply,
as_global=True,
as_filter=True,
as_test=True,
),
TemplateFunction(
"as_function",
self.as_function,
as_global=True,
as_filter=True,
),
TemplateFunction(
"iif",
self.iif,
as_global=True,
as_filter=True,
),
TemplateFunction(
"merge_response",
self.merge_response,
as_global=True,
),
TemplateFunction(
"combine",
self.combine,
as_global=True,
as_filter=True,
),
TemplateFunction(
"typeof",
self.typeof,
as_global=True,
as_filter=True,
),
TemplateFunction(
"is_defined",
self.fail_when_undefined,
as_filter=True,
),
TemplateFunction(
"random",
_random_every_time,
as_filter=True,
),
TemplateFunction(
"zip",
zip,
as_global=True,
),
TemplateFunction(
"ord",
ord,
as_filter=True,
),
TemplateFunction(
"contains",
contains,
as_filter=True,
as_test=True,
),
],
)
@staticmethod
def apply(value: Any, fn: Any, *args: Any, **kwargs: Any) -> Any:
"""Call the given callable with the provided arguments and keyword arguments."""
return fn(value, *args, **kwargs)
@staticmethod
def as_function(macro: jinja2.runtime.Macro) -> Callable[..., Any]:
"""Turn a macro with a 'returns' keyword argument into a function."""
def wrapper(*args: Any, **kwargs: Any) -> Any:
return_value = None
def returns(value: Any) -> Any:
nonlocal return_value
return_value = value
return value
# Call the callable with the value and other args
macro(*args, **kwargs, returns=returns)
return return_value
# Remove "macro_" from the macro's name to avoid confusion
trimmed_name = macro.name.removeprefix("macro_")
wrapper.__name__ = trimmed_name
wrapper.__qualname__ = trimmed_name
return wrapper
@staticmethod
def iif(
value: Any,
if_true: Any = True,
if_false: Any = False,
if_none: Any = _SENTINEL,
) -> Any:
"""Immediate if function/filter that allow for common if/else constructs.
https://en.wikipedia.org/wiki/IIf
Examples:
{{ is_state("device_tracker.frenck", "home") | iif("yes", "no") }}
{{ iif(1==2, "yes", "no") }}
{{ (1 == 1) | iif("yes", "no") }}
"""
if value is None and if_none is not _SENTINEL:
return if_none
if bool(value):
return if_true
return if_false
@staticmethod
def merge_response(value: ServiceResponse) -> list[Any]:
"""Merge action responses into single list.
Checks that the input is a correct service response:
{
"entity_id": {str: dict[str, Any]},
}
If response is a single list, it will extend the list with the items
and add the entity_id and value_key to each dictionary for reference.
If response is a dictionary or multiple lists,
it will append the dictionary/lists to the list
and add the entity_id to each dictionary for reference.
"""
if not isinstance(value, dict):
raise TypeError("Response is not a dictionary")
if not value:
return []
is_single_list = False
response_items: list = []
input_service_response = deepcopy(value)
for entity_id, entity_response in input_service_response.items(): # pylint: disable=too-many-nested-blocks
if not isinstance(entity_response, dict):
raise TypeError("Response is not a dictionary")
for value_key, type_response in entity_response.items():
if len(entity_response) == 1 and isinstance(type_response, list):
is_single_list = True
for dict_in_list in type_response:
if isinstance(dict_in_list, dict):
if ATTR_ENTITY_ID in dict_in_list:
raise ValueError(
f"Response dictionary already contains key '{ATTR_ENTITY_ID}'"
)
dict_in_list[ATTR_ENTITY_ID] = entity_id
dict_in_list["value_key"] = value_key
response_items.extend(type_response)
else:
break
if not is_single_list:
_response = entity_response.copy()
if ATTR_ENTITY_ID in _response:
raise ValueError(
f"Response dictionary already contains key '{ATTR_ENTITY_ID}'"
)
_response[ATTR_ENTITY_ID] = entity_id
response_items.append(_response)
return response_items
@staticmethod
def combine(*args: Any, recursive: bool = False) -> dict[Any, Any]:
"""Combine multiple dictionaries into one."""
if not args:
raise TypeError("combine expected at least 1 argument, got 0")
result: dict[Any, Any] = {}
for arg in args:
if not isinstance(arg, dict):
raise TypeError(f"combine expected a dict, got {type(arg).__name__}")
if recursive:
for key, value in arg.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = FunctionalExtension.combine(
result[key], value, recursive=True
)
else:
result[key] = value
else:
result |= arg
return result
@staticmethod
def typeof(value: Any) -> Any:
"""Return the type of value passed to debug types."""
return value.__class__.__name__
@staticmethod
def fail_when_undefined(value: Any) -> Any:
"""Filter to force a failure when the value is undefined."""
if isinstance(value, jinja2.Undefined):
value()
return value
@pass_context
def _random_every_time(context: Any, values: Any) -> Any:
"""Choose a random value.
Unlike Jinja's random filter,
this is context-dependent to avoid caching the chosen value.
"""
return random.choice(values)

View File

@@ -0,0 +1,521 @@
"""Test functional utility functions for Home Assistant templates."""
from __future__ import annotations
import random
from unittest.mock import MagicMock, patch
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.helpers import template
from tests.helpers.template.helpers import render
def test_apply(hass: HomeAssistant) -> None:
"""Test apply."""
tpl = """
{%- macro add_foo(arg) -%}
{{arg}}foo
{%- endmacro -%}
{{ ["a", "b", "c"] | map('apply', add_foo) | list }}
"""
assert render(hass, tpl) == ["afoo", "bfoo", "cfoo"]
assert render(
hass, "{{ ['1', '2', '3', '4', '5'] | map('apply', int) | list }}"
) == [1, 2, 3, 4, 5]
def test_apply_macro_with_arguments(hass: HomeAssistant) -> None:
"""Test apply macro with positional, named, and mixed arguments."""
# Test macro with positional arguments
tpl = """
{%- macro add_numbers(a, b, c) -%}
{{ a + b + c }}
{%- endmacro -%}
{{ apply(5, add_numbers, 10, 15) }}
"""
assert render(hass, tpl) == 30
# Test macro with named arguments
tpl = """
{%- macro greet(name, greeting="Hello") -%}
{{ greeting }}, {{ name }}!
{%- endmacro -%}
{{ apply("World", greet, greeting="Hi") }}
"""
assert render(hass, tpl) == "Hi, World!"
# Test macro with mixed arguments
tpl = """
{%- macro format_message(prefix, name, suffix="!") -%}
{{ prefix }} {{ name }}{{ suffix }}
{%- endmacro -%}
{{ apply("Welcome", format_message, "John", suffix="...") }}
"""
assert render(hass, tpl) == "Welcome John..."
def test_as_function(hass: HomeAssistant) -> None:
"""Test as_function."""
tpl = """
{%- macro macro_double(num, returns) -%}
{%- do returns(num * 2) -%}
{%- endmacro -%}
{%- set double = macro_double | as_function -%}
{{ double(5) }}
"""
assert render(hass, tpl) == 10
def test_as_function_no_arguments(hass: HomeAssistant) -> None:
"""Test as_function with no arguments."""
tpl = """
{%- macro macro_get_hello(returns) -%}
{%- do returns("Hello") -%}
{%- endmacro -%}
{%- set get_hello = macro_get_hello | as_function -%}
{{ get_hello() }}
"""
assert render(hass, tpl) == "Hello"
def test_ord(hass: HomeAssistant) -> None:
"""Test the ord filter."""
assert render(hass, '{{ "d" | ord }}') == 100
@patch.object(random, "choice")
def test_random_every_time(test_choice: MagicMock, hass: HomeAssistant) -> None:
"""Ensure the random filter runs every time, not just once."""
tpl = template.Template("{{ [1,2] | random }}", hass)
test_choice.return_value = "foo"
assert tpl.async_render() == "foo"
test_choice.return_value = "bar"
assert tpl.async_render() == "bar"
def test_render_with_possible_json_value_valid_with_is_defined(
hass: HomeAssistant,
) -> None:
"""Render with possible JSON value with known JSON object."""
tpl = template.Template("{{ value_json.hello|is_defined }}", hass)
assert tpl.async_render_with_possible_json_value('{"hello": "world"}') == "world"
def test_render_with_possible_json_value_undefined_json(hass: HomeAssistant) -> None:
"""Render with possible JSON value with unknown JSON object."""
tpl = template.Template("{{ value_json.bye|is_defined }}", hass)
assert (
tpl.async_render_with_possible_json_value('{"hello": "world"}')
== '{"hello": "world"}'
)
def test_render_with_possible_json_value_undefined_json_error_value(
hass: HomeAssistant,
) -> None:
"""Render with possible JSON value with unknown JSON object."""
tpl = template.Template("{{ value_json.bye|is_defined }}", hass)
assert tpl.async_render_with_possible_json_value('{"hello": "world"}', "") == ""
def test_iif(hass: HomeAssistant) -> None:
"""Test the immediate if function/filter."""
result = render(hass, "{{ (1 == 1) | iif }}")
assert result is True
result = render(hass, "{{ (1 == 2) | iif }}")
assert result is False
result = render(hass, "{{ (1 == 1) | iif('yes') }}")
assert result == "yes"
result = render(hass, "{{ (1 == 2) | iif('yes') }}")
assert result is False
result = render(hass, "{{ (1 == 2) | iif('yes', 'no') }}")
assert result == "no"
result = render(hass, "{{ not_exists | default(None) | iif('yes', 'no') }}")
assert result == "no"
result = render(
hass, "{{ not_exists | default(None) | iif('yes', 'no', 'unknown') }}"
)
assert result == "unknown"
result = render(hass, "{{ iif(1 == 1) }}")
assert result is True
result = render(hass, "{{ iif(1 == 2, 'yes', 'no') }}")
assert result == "no"
@pytest.mark.parametrize(
("seq", "value", "expected"),
[
([0], 0, True),
([1], 0, False),
([False], 0, True),
([True], 0, False),
([0], [0], False),
(["toto", 1], "toto", True),
(["toto", 1], "tata", False),
([], 0, False),
([], None, False),
],
)
def test_contains(
hass: HomeAssistant, seq: list, value: object, expected: bool
) -> None:
"""Test contains."""
assert (
render(hass, "{{ seq | contains(value) }}", {"seq": seq, "value": value})
== expected
)
assert (
render(hass, "{{ seq is contains(value) }}", {"seq": seq, "value": value})
== expected
)
@pytest.mark.parametrize(
("service_response"),
[
{
"calendar.sports": {
"events": [
{
"start": "2024-02-27T17:00:00-06:00",
"end": "2024-02-27T18:00:00-06:00",
"summary": "Basketball vs. Rockets",
"description": "",
}
]
},
"calendar.local_furry_events": {"events": []},
"calendar.yap_house_schedules": {
"events": [
{
"start": "2024-02-26T08:00:00-06:00",
"end": "2024-02-26T09:00:00-06:00",
"summary": "Dr. Appt",
"description": "",
},
{
"start": "2024-02-28T20:00:00-06:00",
"end": "2024-02-28T21:00:00-06:00",
"summary": "Bake a cake",
"description": "something good",
},
]
},
},
{
"binary_sensor.workday": {"workday": True},
"binary_sensor.workday2": {"workday": False},
},
{
"weather.smhi_home": {
"forecast": [
{
"datetime": "2024-03-31T16:00:00",
"condition": "cloudy",
"wind_bearing": 79,
"cloud_coverage": 100,
"temperature": 10,
"templow": 4,
"pressure": 998,
"wind_gust_speed": 21.6,
"wind_speed": 11.88,
"precipitation": 0.2,
"humidity": 87,
},
{
"datetime": "2024-04-01T12:00:00",
"condition": "rainy",
"wind_bearing": 17,
"cloud_coverage": 100,
"temperature": 6,
"templow": 1,
"pressure": 999,
"wind_gust_speed": 20.52,
"wind_speed": 8.64,
"precipitation": 2.2,
"humidity": 88,
},
{
"datetime": "2024-04-02T12:00:00",
"condition": "cloudy",
"wind_bearing": 17,
"cloud_coverage": 100,
"temperature": 0,
"templow": -3,
"pressure": 1003,
"wind_gust_speed": 57.24,
"wind_speed": 30.6,
"precipitation": 1.3,
"humidity": 71,
},
]
},
"weather.forecast_home": {
"forecast": [
{
"condition": "cloudy",
"precipitation_probability": 6.6,
"datetime": "2024-03-31T10:00:00+00:00",
"wind_bearing": 71.8,
"temperature": 10.9,
"templow": 6.5,
"wind_gust_speed": 24.1,
"wind_speed": 13.7,
"precipitation": 0,
"humidity": 71,
},
{
"condition": "cloudy",
"precipitation_probability": 8,
"datetime": "2024-04-01T10:00:00+00:00",
"wind_bearing": 350.6,
"temperature": 10.2,
"templow": 3.4,
"wind_gust_speed": 38.2,
"wind_speed": 21.6,
"precipitation": 0,
"humidity": 79,
},
{
"condition": "snowy",
"precipitation_probability": 67.4,
"datetime": "2024-04-02T10:00:00+00:00",
"wind_bearing": 24.5,
"temperature": 3,
"templow": 0,
"wind_gust_speed": 64.8,
"wind_speed": 37.4,
"precipitation": 2.3,
"humidity": 77,
},
]
},
},
{
"vacuum.deebot_n8_plus_1": {
"payloadType": "j",
"resp": {
"body": {
"msg": "ok",
}
},
"header": {
"ver": "0.0.1",
},
},
"vacuum.deebot_n8_plus_2": {
"payloadType": "j",
"resp": {
"body": {
"msg": "ok",
}
},
"header": {
"ver": "0.0.1",
},
},
},
],
ids=["calendar", "workday", "weather", "vacuum"],
)
async def test_merge_response(
hass: HomeAssistant,
service_response: dict,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter."""
_template = "{{ merge_response(" + str(service_response) + ") }}"
assert service_response == snapshot(name="a_response")
assert render(
hass,
_template,
) == snapshot(name="b_rendered")
async def test_merge_response_with_entity_id_in_response(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with empty lists."""
service_response = {
"test.response": {"some_key": True, "entity_id": "test.response"},
"test.response2": {"some_key": False, "entity_id": "test.response2"},
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(
TemplateError,
match="ValueError: Response dictionary already contains key 'entity_id'",
):
render(hass, _template)
service_response = {
"test.response": {
"happening": [
{
"start": "2024-02-27T17:00:00-06:00",
"end": "2024-02-27T18:00:00-06:00",
"summary": "Magic day",
"entity_id": "test.response",
}
]
}
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(
TemplateError,
match="ValueError: Response dictionary already contains key 'entity_id'",
):
render(hass, _template)
async def test_merge_response_with_empty_response(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with empty lists."""
service_response = {
"calendar.sports": {"events": []},
"calendar.local_furry_events": {"events": []},
"calendar.yap_house_schedules": {"events": []},
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
assert service_response == snapshot(name="a_response")
assert render(hass, _template) == snapshot(name="b_rendered")
async def test_response_empty_dict(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with empty dict."""
service_response = {}
_template = "{{ merge_response(" + str(service_response) + ") }}"
result = render(hass, _template)
assert result == []
async def test_response_incorrect_value(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with incorrect response."""
service_response = "incorrect"
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(TemplateError, match="TypeError: Response is not a dictionary"):
render(hass, _template)
async def test_merge_response_with_incorrect_response(hass: HomeAssistant) -> None:
"""Test the merge_response function/filter with empty response should raise."""
service_response = {"calendar.sports": []}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(TemplateError, match="TypeError: Response is not a dictionary"):
render(hass, _template)
service_response = {
"binary_sensor.workday": [],
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(TemplateError, match="TypeError: Response is not a dictionary"):
render(hass, _template)
async def test_merge_response_not_mutate_original_object(
hass: HomeAssistant, snapshot: SnapshotAssertion
) -> None:
"""Test the merge_response does not mutate original service response value."""
value = '{"calendar.family": {"events": [{"summary": "An event"}]}'
_template = (
"{% set calendar_response = " + value + "} %}"
"{{ merge_response(calendar_response) }}"
# We should be able to merge the same response again
# as the merge is working on a copy of the original object (response)
"{{ merge_response(calendar_response) }}"
)
assert render(hass, _template)
def test_typeof(hass: HomeAssistant) -> None:
"""Test the typeof debug filter/function."""
assert render(hass, "{{ True | typeof }}") == "bool"
assert render(hass, "{{ typeof(True) }}") == "bool"
assert render(hass, "{{ [1, 2, 3] | typeof }}") == "list"
assert render(hass, "{{ typeof([1, 2, 3]) }}") == "list"
assert render(hass, "{{ 1 | typeof }}") == "int"
assert render(hass, "{{ typeof(1) }}") == "int"
assert render(hass, "{{ 1.1 | typeof }}") == "float"
assert render(hass, "{{ typeof(1.1) }}") == "float"
assert render(hass, "{{ None | typeof }}") == "NoneType"
assert render(hass, "{{ typeof(None) }}") == "NoneType"
assert render(hass, "{{ 'Home Assistant' | typeof }}") == "str"
assert render(hass, "{{ typeof('Home Assistant') }}") == "str"
def test_combine(hass: HomeAssistant) -> None:
"""Test combine filter and function."""
assert render(hass, "{{ {'a': 1, 'b': 2} | combine({'b': 3, 'c': 4}) }}") == {
"a": 1,
"b": 3,
"c": 4,
}
assert render(hass, "{{ combine({'a': 1, 'b': 2}, {'b': 3, 'c': 4}) }}") == {
"a": 1,
"b": 3,
"c": 4,
}
assert render(
hass,
"{{ combine({'a': 1, 'b': {'x': 1}}, {'b': {'y': 2}, 'c': 4}, recursive=True) }}",
) == {"a": 1, "b": {"x": 1, "y": 2}, "c": 4}
# Test that recursive=False does not merge nested dictionaries
assert render(
hass,
"{{ combine({'a': 1, 'b': {'x': 1}}, {'b': {'y': 2}, 'c': 4}, recursive=False) }}",
) == {"a": 1, "b": {"y": 2}, "c": 4}
# Test that None values are handled correctly in recursive merge
assert render(
hass,
"{{ combine({'a': 1, 'b': none}, {'b': {'y': 2}, 'c': 4}, recursive=True) }}",
) == {"a": 1, "b": {"y": 2}, "c": 4}
with pytest.raises(
TemplateError, match="combine expected at least 1 argument, got 0"
):
render(hass, "{{ combine() }}")
with pytest.raises(TemplateError, match="combine expected a dict, got str"):
render(hass, "{{ {'a': 1} | combine('not a dict') }}")

View File

@@ -6,12 +6,10 @@ from collections.abc import Iterable
from datetime import datetime, timedelta
import json
import logging
import random
from unittest.mock import patch
from freezegun import freeze_time
import pytest
from syrupy.assertion import SnapshotAssertion
import voluptuous as vol
from homeassistant import config_entries
@@ -376,90 +374,6 @@ def test_add(hass: HomeAssistant) -> None:
assert render(hass, "{{ 'no_number' | add(10, default=1) }}") == 1
def test_apply(hass: HomeAssistant) -> None:
"""Test apply."""
tpl = """
{%- macro add_foo(arg) -%}
{{arg}}foo
{%- endmacro -%}
{{ ["a", "b", "c"] | map('apply', add_foo) | list }}
"""
assert render(hass, tpl) == ["afoo", "bfoo", "cfoo"]
assert render(
hass, "{{ ['1', '2', '3', '4', '5'] | map('apply', int) | list }}"
) == [1, 2, 3, 4, 5]
def test_apply_macro_with_arguments(hass: HomeAssistant) -> None:
"""Test apply macro with positional, named, and mixed arguments."""
# Test macro with positional arguments
tpl = """
{%- macro add_numbers(a, b, c) -%}
{{ a + b + c }}
{%- endmacro -%}
{{ apply(5, add_numbers, 10, 15) }}
"""
assert render(hass, tpl) == 30
# Test macro with named arguments
tpl = """
{%- macro greet(name, greeting="Hello") -%}
{{ greeting }}, {{ name }}!
{%- endmacro -%}
{{ apply("World", greet, greeting="Hi") }}
"""
assert render(hass, tpl) == "Hi, World!"
# Test macro with mixed arguments
tpl = """
{%- macro format_message(prefix, name, suffix="!") -%}
{{ prefix }} {{ name }}{{ suffix }}
{%- endmacro -%}
{{ apply("Welcome", format_message, "John", suffix="...") }}
"""
assert render(hass, tpl) == "Welcome John..."
def test_as_function(hass: HomeAssistant) -> None:
"""Test as_function."""
tpl = """
{%- macro macro_double(num, returns) -%}
{%- do returns(num * 2) -%}
{%- endmacro -%}
{%- set double = macro_double | as_function -%}
{{ double(5) }}
"""
assert render(hass, tpl) == 10
def test_as_function_no_arguments(hass: HomeAssistant) -> None:
"""Test as_function with no arguments."""
tpl = """
{%- macro macro_get_hello(returns) -%}
{%- do returns("Hello") -%}
{%- endmacro -%}
{%- set get_hello = macro_get_hello | as_function -%}
{{ get_hello() }}
"""
assert render(hass, tpl) == "Hello"
def test_ord(hass: HomeAssistant) -> None:
"""Test the ord filter."""
assert render(hass, '{{ "d" | ord }}') == 100
@patch.object(random, "choice")
def test_random_every_time(test_choice, hass: HomeAssistant) -> None:
"""Ensure the random filter runs every time, not just once."""
tpl = template.Template("{{ [1,2] | random }}", hass)
test_choice.return_value = "foo"
assert tpl.async_render() == "foo"
test_choice.return_value = "bar"
assert tpl.async_render() == "bar"
def test_passing_vars_as_keywords(hass: HomeAssistant) -> None:
"""Test passing variables as keywords."""
assert render(hass, "{{ hello }}", hello=127) == 127
@@ -523,31 +437,6 @@ def test_render_with_possible_json_value_with_missing_json_value(
assert tpl.async_render_with_possible_json_value('{"hello": "world"}') == ""
def test_render_with_possible_json_value_valid_with_is_defined(
hass: HomeAssistant,
) -> None:
"""Render with possible JSON value with known JSON object."""
tpl = template.Template("{{ value_json.hello|is_defined }}", hass)
assert tpl.async_render_with_possible_json_value('{"hello": "world"}') == "world"
def test_render_with_possible_json_value_undefined_json(hass: HomeAssistant) -> None:
"""Render with possible JSON value with unknown JSON object."""
tpl = template.Template("{{ value_json.bye|is_defined }}", hass)
assert (
tpl.async_render_with_possible_json_value('{"hello": "world"}')
== '{"hello": "world"}'
)
def test_render_with_possible_json_value_undefined_json_error_value(
hass: HomeAssistant,
) -> None:
"""Render with possible JSON value with unknown JSON object."""
tpl = template.Template("{{ value_json.bye|is_defined }}", hass)
assert tpl.async_render_with_possible_json_value('{"hello": "world"}', "") == ""
def test_render_with_possible_json_value_non_string_value(hass: HomeAssistant) -> None:
"""Render with possible JSON value with non-string value."""
tpl = template.Template(
@@ -2321,39 +2210,6 @@ def test_render_complex_handling_non_template_values(hass: HomeAssistant) -> Non
) == {True: 1, False: 2}
def test_iif(hass: HomeAssistant) -> None:
"""Test the immediate if function/filter."""
result = render(hass, "{{ (1 == 1) | iif }}")
assert result is True
result = render(hass, "{{ (1 == 2) | iif }}")
assert result is False
result = render(hass, "{{ (1 == 1) | iif('yes') }}")
assert result == "yes"
result = render(hass, "{{ (1 == 2) | iif('yes') }}")
assert result is False
result = render(hass, "{{ (1 == 2) | iif('yes', 'no') }}")
assert result == "no"
result = render(hass, "{{ not_exists | default(None) | iif('yes', 'no') }}")
assert result == "no"
result = render(
hass, "{{ not_exists | default(None) | iif('yes', 'no', 'unknown') }}"
)
assert result == "unknown"
result = render(hass, "{{ iif(1 == 1) }}")
assert result is True
result = render(hass, "{{ iif(1 == 2, 'yes', 'no') }}")
assert result == "no"
@pytest.mark.usefixtures("hass")
async def test_cache_garbage_collection() -> None:
"""Test caching a template."""
@@ -2733,32 +2589,6 @@ async def test_template_states_can_serialize(hass: HomeAssistant) -> None:
assert json_dumps(template_state) == json_dumps(template_state)
@pytest.mark.parametrize(
("seq", "value", "expected"),
[
([0], 0, True),
([1], 0, False),
([False], 0, True),
([True], 0, False),
([0], [0], False),
(["toto", 1], "toto", True),
(["toto", 1], "tata", False),
([], 0, False),
([], None, False),
],
)
def test_contains(hass: HomeAssistant, seq, value, expected) -> None:
"""Test contains."""
assert (
render(hass, "{{ seq | contains(value) }}", {"seq": seq, "value": value})
== expected
)
assert (
render(hass, "{{ seq is contains(value) }}", {"seq": seq, "value": value})
== expected
)
async def test_render_to_info_with_exception(hass: HomeAssistant) -> None:
"""Test info is still available if the template has an exception."""
hass.states.async_set("test_domain.object", "dog")
@@ -2836,264 +2666,6 @@ def test_template_output_exceeds_maximum_size(hass: HomeAssistant) -> None:
render(hass, "{{ 'a' * 1024 * 257 }}")
@pytest.mark.parametrize(
("service_response"),
[
{
"calendar.sports": {
"events": [
{
"start": "2024-02-27T17:00:00-06:00",
"end": "2024-02-27T18:00:00-06:00",
"summary": "Basketball vs. Rockets",
"description": "",
}
]
},
"calendar.local_furry_events": {"events": []},
"calendar.yap_house_schedules": {
"events": [
{
"start": "2024-02-26T08:00:00-06:00",
"end": "2024-02-26T09:00:00-06:00",
"summary": "Dr. Appt",
"description": "",
},
{
"start": "2024-02-28T20:00:00-06:00",
"end": "2024-02-28T21:00:00-06:00",
"summary": "Bake a cake",
"description": "something good",
},
]
},
},
{
"binary_sensor.workday": {"workday": True},
"binary_sensor.workday2": {"workday": False},
},
{
"weather.smhi_home": {
"forecast": [
{
"datetime": "2024-03-31T16:00:00",
"condition": "cloudy",
"wind_bearing": 79,
"cloud_coverage": 100,
"temperature": 10,
"templow": 4,
"pressure": 998,
"wind_gust_speed": 21.6,
"wind_speed": 11.88,
"precipitation": 0.2,
"humidity": 87,
},
{
"datetime": "2024-04-01T12:00:00",
"condition": "rainy",
"wind_bearing": 17,
"cloud_coverage": 100,
"temperature": 6,
"templow": 1,
"pressure": 999,
"wind_gust_speed": 20.52,
"wind_speed": 8.64,
"precipitation": 2.2,
"humidity": 88,
},
{
"datetime": "2024-04-02T12:00:00",
"condition": "cloudy",
"wind_bearing": 17,
"cloud_coverage": 100,
"temperature": 0,
"templow": -3,
"pressure": 1003,
"wind_gust_speed": 57.24,
"wind_speed": 30.6,
"precipitation": 1.3,
"humidity": 71,
},
]
},
"weather.forecast_home": {
"forecast": [
{
"condition": "cloudy",
"precipitation_probability": 6.6,
"datetime": "2024-03-31T10:00:00+00:00",
"wind_bearing": 71.8,
"temperature": 10.9,
"templow": 6.5,
"wind_gust_speed": 24.1,
"wind_speed": 13.7,
"precipitation": 0,
"humidity": 71,
},
{
"condition": "cloudy",
"precipitation_probability": 8,
"datetime": "2024-04-01T10:00:00+00:00",
"wind_bearing": 350.6,
"temperature": 10.2,
"templow": 3.4,
"wind_gust_speed": 38.2,
"wind_speed": 21.6,
"precipitation": 0,
"humidity": 79,
},
{
"condition": "snowy",
"precipitation_probability": 67.4,
"datetime": "2024-04-02T10:00:00+00:00",
"wind_bearing": 24.5,
"temperature": 3,
"templow": 0,
"wind_gust_speed": 64.8,
"wind_speed": 37.4,
"precipitation": 2.3,
"humidity": 77,
},
]
},
},
{
"vacuum.deebot_n8_plus_1": {
"payloadType": "j",
"resp": {
"body": {
"msg": "ok",
}
},
"header": {
"ver": "0.0.1",
},
},
"vacuum.deebot_n8_plus_2": {
"payloadType": "j",
"resp": {
"body": {
"msg": "ok",
}
},
"header": {
"ver": "0.0.1",
},
},
},
],
ids=["calendar", "workday", "weather", "vacuum"],
)
async def test_merge_response(
hass: HomeAssistant,
service_response: dict,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter."""
_template = "{{ merge_response(" + str(service_response) + ") }}"
assert service_response == snapshot(name="a_response")
assert render(
hass,
_template,
) == snapshot(name="b_rendered")
async def test_merge_response_with_entity_id_in_response(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with empty lists."""
service_response = {
"test.response": {"some_key": True, "entity_id": "test.response"},
"test.response2": {"some_key": False, "entity_id": "test.response2"},
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(
TemplateError,
match="ValueError: Response dictionary already contains key 'entity_id'",
):
render(hass, _template)
service_response = {
"test.response": {
"happening": [
{
"start": "2024-02-27T17:00:00-06:00",
"end": "2024-02-27T18:00:00-06:00",
"summary": "Magic day",
"entity_id": "test.response",
}
]
}
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(
TemplateError,
match="ValueError: Response dictionary already contains key 'entity_id'",
):
render(hass, _template)
async def test_merge_response_with_empty_response(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with empty lists."""
service_response = {
"calendar.sports": {"events": []},
"calendar.local_furry_events": {"events": []},
"calendar.yap_house_schedules": {"events": []},
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
assert service_response == snapshot(name="a_response")
assert render(hass, _template) == snapshot(name="b_rendered")
async def test_response_empty_dict(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with empty dict."""
service_response = {}
_template = "{{ merge_response(" + str(service_response) + ") }}"
result = render(hass, _template)
assert result == []
async def test_response_incorrect_value(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
) -> None:
"""Test the merge_response function/filter with incorrect response."""
service_response = "incorrect"
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(TemplateError, match="TypeError: Response is not a dictionary"):
render(hass, _template)
async def test_merge_response_with_incorrect_response(hass: HomeAssistant) -> None:
"""Test the merge_response function/filter with empty response should raise."""
service_response = {"calendar.sports": []}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(TemplateError, match="TypeError: Response is not a dictionary"):
render(hass, _template)
service_response = {
"binary_sensor.workday": [],
}
_template = "{{ merge_response(" + str(service_response) + ") }}"
with pytest.raises(TemplateError, match="TypeError: Response is not a dictionary"):
render(hass, _template)
def test_warn_no_hass(hass: HomeAssistant, caplog: pytest.LogCaptureFixture) -> None:
"""Test deprecation warning when instantiating Template without hass."""
@@ -3109,81 +2681,3 @@ def test_warn_no_hass(hass: HomeAssistant, caplog: pytest.LogCaptureFixture) ->
template.Template("blah", hass)
assert message not in caplog.text
caplog.clear()
async def test_merge_response_not_mutate_original_object(
hass: HomeAssistant, snapshot: SnapshotAssertion
) -> None:
"""Test the merge_response does not mutate original service response value."""
value = '{"calendar.family": {"events": [{"summary": "An event"}]}'
_template = (
"{% set calendar_response = " + value + "} %}"
"{{ merge_response(calendar_response) }}"
# We should be able to merge the same response again
# as the merge is working on a copy of the original object (response)
"{{ merge_response(calendar_response) }}"
)
assert render(hass, _template)
def test_typeof(hass: HomeAssistant) -> None:
"""Test the typeof debug filter/function."""
assert render(hass, "{{ True | typeof }}") == "bool"
assert render(hass, "{{ typeof(True) }}") == "bool"
assert render(hass, "{{ [1, 2, 3] | typeof }}") == "list"
assert render(hass, "{{ typeof([1, 2, 3]) }}") == "list"
assert render(hass, "{{ 1 | typeof }}") == "int"
assert render(hass, "{{ typeof(1) }}") == "int"
assert render(hass, "{{ 1.1 | typeof }}") == "float"
assert render(hass, "{{ typeof(1.1) }}") == "float"
assert render(hass, "{{ None | typeof }}") == "NoneType"
assert render(hass, "{{ typeof(None) }}") == "NoneType"
assert render(hass, "{{ 'Home Assistant' | typeof }}") == "str"
assert render(hass, "{{ typeof('Home Assistant') }}") == "str"
def test_combine(hass: HomeAssistant) -> None:
"""Test combine filter and function."""
assert render(hass, "{{ {'a': 1, 'b': 2} | combine({'b': 3, 'c': 4}) }}") == {
"a": 1,
"b": 3,
"c": 4,
}
assert render(hass, "{{ combine({'a': 1, 'b': 2}, {'b': 3, 'c': 4}) }}") == {
"a": 1,
"b": 3,
"c": 4,
}
assert render(
hass,
"{{ combine({'a': 1, 'b': {'x': 1}}, {'b': {'y': 2}, 'c': 4}, recursive=True) }}",
) == {"a": 1, "b": {"x": 1, "y": 2}, "c": 4}
# Test that recursive=False does not merge nested dictionaries
assert render(
hass,
"{{ combine({'a': 1, 'b': {'x': 1}}, {'b': {'y': 2}, 'c': 4}, recursive=False) }}",
) == {"a": 1, "b": {"y": 2}, "c": 4}
# Test that None values are handled correctly in recursive merge
assert render(
hass,
"{{ combine({'a': 1, 'b': none}, {'b': {'y': 2}, 'c': 4}, recursive=True) }}",
) == {"a": 1, "b": {"y": 2}, "c": 4}
with pytest.raises(
TemplateError, match="combine expected at least 1 argument, got 0"
):
render(hass, "{{ combine() }}")
with pytest.raises(TemplateError, match="combine expected a dict, got str"):
render(hass, "{{ {'a': 1} | combine('not a dict') }}")